diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index b35d347d0..b92b5baf3 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -25,7 +25,7 @@ type Config struct { // Kafka Client(Consumer/SyncProducer/AsyncProducer) type Client struct { - config *Config + Config *Config consumer *cluster.Consumer syncProducer sarama.SyncProducer asyncProducer sarama.AsyncProducer @@ -45,16 +45,24 @@ type Message struct { func New(config Config) *Client { config.Config = *sarama.NewConfig() - config.Consumer.Return.Errors = true - config.Consumer.Offsets.CommitInterval = 1 * time.Second - config.Consumer.Offsets.Initial = sarama.OffsetOldest + // default config for consumer + config.Consumer.Return.Errors = true + if config.Consumer.Offsets.Initial == 0 { + config.Consumer.Offsets.Initial = sarama.OffsetOldest + } + if config.Consumer.Offsets.CommitInterval == 0 { + config.Consumer.Offsets.CommitInterval = 1 * time.Second + } - config.Producer.Return.Errors = true - config.Producer.Return.Successes = true - config.Producer.Timeout = 5 * time.Second + // default config for producer + config.Producer.Return.Errors = true + config.Producer.Return.Successes = true + if config.Producer.Timeout == 0 { + config.Producer.Timeout = 5 * time.Second + } return &Client { - config : &config, + Config : &config, } } @@ -74,10 +82,10 @@ func (client *Client) Close() { // Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically. func (client *Client) Receive() (*Message, error) { config := cluster.NewConfig() - config.Config = client.config.Config + config.Config = client.Config.Config config.Group.Return.Notifications = true if client.consumer == nil { - c, err := cluster.NewConsumer(strings.Split(client.config.Servers, ","), client.config.GroupId, strings.Split(client.config.Topics, ","), config) + c, err := cluster.NewConsumer(strings.Split(client.Config.Servers, ","), client.Config.GroupId, strings.Split(client.Config.Topics, ","), config) if err != nil { return nil, err } else { @@ -110,13 +118,13 @@ func (client *Client) Receive() (*Message, error) { // Send data to kafka in synchronized way. func (client *Client) SyncSend(message *Message) error { if client.syncProducer == nil { - if p, err := sarama.NewSyncProducer(strings.Split(client.config.Servers, ","), &client.config.Config); err != nil { + if p, err := sarama.NewSyncProducer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil { return err } else { client.syncProducer = p } } - for _, topic := range strings.Split(client.config.Topics, ",") { + for _, topic := range strings.Split(client.Config.Topics, ",") { msg := messageToProducerMessage(message) msg.Topic = topic if _, _, err := client.syncProducer.SendMessage(msg); err != nil { @@ -129,7 +137,7 @@ func (client *Client) SyncSend(message *Message) error { // Send data to kafka in asynchronized way. func (client *Client) AsyncSend(message *Message) error { if client.asyncProducer == nil { - if p, err := sarama.NewAsyncProducer(strings.Split(client.config.Servers, ","), &client.config.Config); err != nil { + if p, err := sarama.NewAsyncProducer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil { return err } else { client.asyncProducer = p @@ -147,7 +155,7 @@ func (client *Client) AsyncSend(message *Message) error { } } - for _, topic := range strings.Split(client.config.Topics, ",") { + for _, topic := range strings.Split(client.Config.Topics, ",") { msg := messageToProducerMessage(message) msg.Topic = topic client.asyncProducer.Input() <- msg