diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index 24bc8ba52..c98a26c73 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -17,9 +17,10 @@ import ( // kafka Client based on sarama.Config type Config struct { - GroupId string // group id for consumer. - Servers string // server list, multiple servers joined by ','. - Topics string // topic list, multiple topics joined by ','. + GroupId string // group id for consumer. + Servers string // server list, multiple servers joined by ','. + Topics string // topic list, multiple topics joined by ','. + AutoMarkOffset bool // auto mark message read after consumer message from server sarama.Config } @@ -38,6 +39,8 @@ type Message struct { Topic string Partition int Offset int + client *Client + consumerMsg *sarama.ConsumerMessage } @@ -52,7 +55,6 @@ func NewClient(config *Config) *Client { func NewConfig() *Config { config := &Config{} config.Config = *sarama.NewConfig() - config.Config = *sarama.NewConfig() // default config for consumer config.Consumer.Return.Errors = true @@ -63,6 +65,8 @@ func NewConfig() *Config { config.Producer.Return.Errors = true config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second + + config.AutoMarkOffset = true return config } @@ -108,13 +112,17 @@ func (client *Client) Receive() (*Message, error) { } msg := <- client.consumer.Messages() - client.consumer.MarkOffset(msg, "") + if client.Config.AutoMarkOffset { + client.consumer.MarkOffset(msg, "") + } return &Message { - Value : msg.Value, - Key : msg.Key, - Topic : msg.Topic, - Partition : int(msg.Partition), - Offset : int(msg.Offset), + Value : msg.Value, + Key : msg.Key, + Topic : msg.Topic, + Partition : int(msg.Partition), + Offset : int(msg.Offset), + client : client, + consumerMsg : msg, }, nil } diff --git a/g/database/gkafka/gkafka_message.go b/g/database/gkafka/gkafka_message.go new file mode 100644 index 000000000..3d26621a6 --- /dev/null +++ b/g/database/gkafka/gkafka_message.go @@ -0,0 +1,14 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gkafka + +// 自动标记已读取 +func (msg *Message) MarkOffset() { + if msg.consumerMsg != nil && msg.client != nil && msg.client.consumer != nil { + msg.client.consumer.MarkOffset(msg.consumerMsg, "") + } +} \ No newline at end of file diff --git a/geg/database/kafka/gkafka_consumer.go b/geg/database/kafka/gkafka_consumer.go index a6fcd704e..26126e470 100644 --- a/geg/database/kafka/gkafka_consumer.go +++ b/geg/database/kafka/gkafka_consumer.go @@ -11,6 +11,7 @@ func main () { config.GroupId = "group_1" config.Servers = "localhost:9092" config.Topics = "test" + config.AutoMarkOffset = false client := gkafka.NewClient(config) defer client.Close() @@ -18,5 +19,6 @@ func main () { for { msg, err := client.Receive() fmt.Printf("%s value: %s, err: %v\n", gtime.Datetime(), string(msg.Value), err) + msg.MarkOffset() } }