diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index b92b5baf3..4b18b49e5 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -42,28 +42,28 @@ type Message struct { // New a kafka client. -func New(config Config) *Client { +func New(config *Config) *Client { + return &Client { + Config : config, + } +} + +// New a default configuration object. +func NewConfig() *Config { + config := &Config{} + config.Config = *sarama.NewConfig() config.Config = *sarama.NewConfig() // default config for consumer - config.Consumer.Return.Errors = true - if config.Consumer.Offsets.Initial == 0 { - config.Consumer.Offsets.Initial = sarama.OffsetOldest - } - if config.Consumer.Offsets.CommitInterval == 0 { - config.Consumer.Offsets.CommitInterval = 1 * time.Second - } + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.CommitInterval = 1 * time.Second // default config for producer - config.Producer.Return.Errors = true - config.Producer.Return.Successes = true - if config.Producer.Timeout == 0 { - config.Producer.Timeout = 5 * time.Second - } - - return &Client { - Config : &config, - } + config.Producer.Return.Errors = true + config.Producer.Return.Successes = true + config.Producer.Timeout = 5 * time.Second + return config } // Close client. @@ -96,7 +96,10 @@ func (client *Client) Receive() (*Message, error) { for { select { case err := <-errors: - glog.Error(err) + if err != nil { + glog.Error(err) + } + case <-notify: } } @@ -147,7 +150,9 @@ func (client *Client) AsyncSend(message *Message) error { for { select { case err := <-errors: - glog.Error(err) + if err != nil { + glog.Error(err) + } case <-success: } } diff --git a/geg/database/kafka/gkafka_consumer.go b/geg/database/kafka/gkafka_consumer.go index 84e257639..82688977b 100644 --- a/geg/database/kafka/gkafka_consumer.go +++ b/geg/database/kafka/gkafka_consumer.go @@ -6,15 +6,16 @@ import ( ) func main () { - client := gkafka.New(gkafka.Config{ - GroupId : "group_1", - Servers : "localhost:9092", - Topics : "abc", - }) + config := gkafka.NewConfig() + config.GroupId = "group_2" + config.Servers = "localhost:9092" + config.Topics = "abc" + + client := gkafka.New(config) defer client.Close() + for { msg, err := client.Receive() - fmt.Println(err) - fmt.Println(string(msg.Value)) + fmt.Printf("value: %s, err: %v", string(msg.Value), err) } } diff --git a/geg/database/kafka/gkafka_producer.go b/geg/database/kafka/gkafka_producer.go index fa5912ca9..a25d66e0a 100644 --- a/geg/database/kafka/gkafka_producer.go +++ b/geg/database/kafka/gkafka_producer.go @@ -6,10 +6,11 @@ import ( ) func main () { - client := gkafka.New(gkafka.Config{ - Servers : "localhost:9092", - Topics : "abc", - }) + config := gkafka.NewConfig() + config.Servers = "localhost:9092" + config.Topics = "abc" + + client := gkafka.New(config) defer client.Close() err := client.SyncSend(&gkafka.Message{Value: []byte("111")}) fmt.Println(err)