mirror of
https://gitee.com/johng/gf
synced 2026-06-07 10:22:11 +08:00
gkafka新增手动标记指定消息是否消费完成方法
This commit is contained in:
@ -17,9 +17,10 @@ import (
|
||||
|
||||
// kafka Client based on sarama.Config
|
||||
type Config struct {
|
||||
GroupId string // group id for consumer.
|
||||
Servers string // server list, multiple servers joined by ','.
|
||||
Topics string // topic list, multiple topics joined by ','.
|
||||
GroupId string // group id for consumer.
|
||||
Servers string // server list, multiple servers joined by ','.
|
||||
Topics string // topic list, multiple topics joined by ','.
|
||||
AutoMarkOffset bool // auto mark message read after consumer message from server
|
||||
sarama.Config
|
||||
}
|
||||
|
||||
@ -38,6 +39,8 @@ type Message struct {
|
||||
Topic string
|
||||
Partition int
|
||||
Offset int
|
||||
client *Client
|
||||
consumerMsg *sarama.ConsumerMessage
|
||||
}
|
||||
|
||||
|
||||
@ -52,7 +55,6 @@ func NewClient(config *Config) *Client {
|
||||
func NewConfig() *Config {
|
||||
config := &Config{}
|
||||
config.Config = *sarama.NewConfig()
|
||||
config.Config = *sarama.NewConfig()
|
||||
|
||||
// default config for consumer
|
||||
config.Consumer.Return.Errors = true
|
||||
@ -63,6 +65,8 @@ func NewConfig() *Config {
|
||||
config.Producer.Return.Errors = true
|
||||
config.Producer.Return.Successes = true
|
||||
config.Producer.Timeout = 5 * time.Second
|
||||
|
||||
config.AutoMarkOffset = true
|
||||
return config
|
||||
}
|
||||
|
||||
@ -108,13 +112,17 @@ func (client *Client) Receive() (*Message, error) {
|
||||
}
|
||||
|
||||
msg := <- client.consumer.Messages()
|
||||
client.consumer.MarkOffset(msg, "")
|
||||
if client.Config.AutoMarkOffset {
|
||||
client.consumer.MarkOffset(msg, "")
|
||||
}
|
||||
return &Message {
|
||||
Value : msg.Value,
|
||||
Key : msg.Key,
|
||||
Topic : msg.Topic,
|
||||
Partition : int(msg.Partition),
|
||||
Offset : int(msg.Offset),
|
||||
Value : msg.Value,
|
||||
Key : msg.Key,
|
||||
Topic : msg.Topic,
|
||||
Partition : int(msg.Partition),
|
||||
Offset : int(msg.Offset),
|
||||
client : client,
|
||||
consumerMsg : msg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
14
g/database/gkafka/gkafka_message.go
Normal file
14
g/database/gkafka/gkafka_message.go
Normal file
@ -0,0 +1,14 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
package gkafka
|
||||
|
||||
// 自动标记已读取
|
||||
func (msg *Message) MarkOffset() {
|
||||
if msg.consumerMsg != nil && msg.client != nil && msg.client.consumer != nil {
|
||||
msg.client.consumer.MarkOffset(msg.consumerMsg, "")
|
||||
}
|
||||
}
|
||||
@ -11,6 +11,7 @@ func main () {
|
||||
config.GroupId = "group_1"
|
||||
config.Servers = "localhost:9092"
|
||||
config.Topics = "test"
|
||||
config.AutoMarkOffset = false
|
||||
|
||||
client := gkafka.NewClient(config)
|
||||
defer client.Close()
|
||||
@ -18,5 +19,6 @@ func main () {
|
||||
for {
|
||||
msg, err := client.Receive()
|
||||
fmt.Printf("%s value: %s, err: %v\n", gtime.Datetime(), string(msg.Value), err)
|
||||
msg.MarkOffset()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user