diff --git a/geg/database/kafka/gkafka_consumer.go b/geg/database/kafka/gkafka_consumer.go index 82688977b..9f702332d 100644 --- a/geg/database/kafka/gkafka_consumer.go +++ b/geg/database/kafka/gkafka_consumer.go @@ -3,19 +3,20 @@ package main import ( "gitee.com/johng/gf/g/database/gkafka" "fmt" + "gitee.com/johng/gf/g/os/gtime" ) func main () { config := gkafka.NewConfig() - config.GroupId = "group_2" + config.GroupId = "group_1" config.Servers = "localhost:9092" - config.Topics = "abc" + config.Topics = "test" client := gkafka.New(config) defer client.Close() for { msg, err := client.Receive() - fmt.Printf("value: %s, err: %v", string(msg.Value), err) + fmt.Printf("%s value: %s, err: %v\n", gtime.Datetime(), string(msg.Value), err) } } diff --git a/geg/database/kafka/gkafka_producer.go b/geg/database/kafka/gkafka_producer.go index a25d66e0a..08ff3c304 100644 --- a/geg/database/kafka/gkafka_producer.go +++ b/geg/database/kafka/gkafka_producer.go @@ -8,10 +8,10 @@ import ( func main () { config := gkafka.NewConfig() config.Servers = "localhost:9092" - config.Topics = "abc" + config.Topics = "test" client := gkafka.New(config) defer client.Close() - err := client.SyncSend(&gkafka.Message{Value: []byte("111")}) + err := client.SyncSend(&gkafka.Message{Value: []byte("1")}) fmt.Println(err) }