diff --git a/geg/database/kafka/gkafka_consumer.go b/geg/database/kafka/gkafka_consumer.go index 26126e470..ba670cf71 100644 --- a/geg/database/kafka/gkafka_consumer.go +++ b/geg/database/kafka/gkafka_consumer.go @@ -1,24 +1,35 @@ package main import ( - "gitee.com/johng/gf/g/database/gkafka" "fmt" - "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/database/gkafka" + "time" ) -func main () { - config := gkafka.NewConfig() - config.GroupId = "group_1" - config.Servers = "localhost:9092" - config.Topics = "test" - config.AutoMarkOffset = false +// 创建kafka消费客户端 +func newKafkaClientConsumer(topic, group string) *gkafka.Client { + kafkaConfig := gkafka.NewConfig() + kafkaConfig.Servers = "localhost:9092" + kafkaConfig.AutoMarkOffset = false + kafkaConfig.Topics = topic + kafkaConfig.GroupId = group + return gkafka.NewClient(kafkaConfig) +} - client := gkafka.NewClient(config) +func main () { + client := newKafkaClientConsumer("test", "test-group-1") defer client.Close() for { - msg, err := client.Receive() - fmt.Printf("%s value: %s, err: %v\n", gtime.Datetime(), string(msg.Value), err) - msg.MarkOffset() + fmt.Println("reading...") + for i := 1; i < 10; i++ { + if msg, err := client.Receive(); err != nil { + fmt.Println(err) + } else { + fmt.Println(string(msg.Value)) + } + } + time.Sleep(3*time.Second) } + } diff --git a/geg/database/kafka/gkafka_producer.go b/geg/database/kafka/gkafka_producer.go index 218e226db..245bcc0f7 100644 --- a/geg/database/kafka/gkafka_producer.go +++ b/geg/database/kafka/gkafka_producer.go @@ -3,15 +3,27 @@ package main import ( "gitee.com/johng/gf/g/database/gkafka" "fmt" + "gitee.com/johng/gf/g/util/gconv" ) -func main () { - config := gkafka.NewConfig() - config.Servers = "localhost:9092" - config.Topics = "test" - - client := gkafka.NewClient(config) - defer client.Close() - err := client.SyncSend(&gkafka.Message{Value: []byte("1")}) - fmt.Println(err) +// 创建kafka生产客户端 +func newKafkaClientProducer(topic string) *gkafka.Client { + kafkaConfig := gkafka.NewConfig() + kafkaConfig.Servers = "localhost:9092" + kafkaConfig.AutoMarkOffset = false + kafkaConfig.Topics = topic + return gkafka.NewClient(kafkaConfig) +} + +func main () { + client := newKafkaClientProducer("test") + defer client.Close() + + for i := 1; i < 10; i++ { + if err := client.SyncSend(&gkafka.Message{Value: []byte(gconv.String(i))}); err != nil { + fmt.Println(err) + } + } + + fmt.Println("done") }