From 2d0d7f5a8fa0810651a05e1775dbe99d35be8768 Mon Sep 17 00:00:00 2001 From: john Date: Wed, 20 Jun 2018 11:37:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84gkafka=E7=A4=BA=E4=BE=8B?= =?UTF-8?q?=E7=A8=8B=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- geg/database/kafka/gkafka_consumer.go | 7 ++++--- geg/database/kafka/gkafka_producer.go | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/geg/database/kafka/gkafka_consumer.go b/geg/database/kafka/gkafka_consumer.go index 82688977b..9f702332d 100644 --- a/geg/database/kafka/gkafka_consumer.go +++ b/geg/database/kafka/gkafka_consumer.go @@ -3,19 +3,20 @@ package main import ( "gitee.com/johng/gf/g/database/gkafka" "fmt" + "gitee.com/johng/gf/g/os/gtime" ) func main () { config := gkafka.NewConfig() - config.GroupId = "group_2" + config.GroupId = "group_1" config.Servers = "localhost:9092" - config.Topics = "abc" + config.Topics = "test" client := gkafka.New(config) defer client.Close() for { msg, err := client.Receive() - fmt.Printf("value: %s, err: %v", string(msg.Value), err) + fmt.Printf("%s value: %s, err: %v\n", gtime.Datetime(), string(msg.Value), err) } } diff --git a/geg/database/kafka/gkafka_producer.go b/geg/database/kafka/gkafka_producer.go index a25d66e0a..08ff3c304 100644 --- a/geg/database/kafka/gkafka_producer.go +++ b/geg/database/kafka/gkafka_producer.go @@ -8,10 +8,10 @@ import ( func main () { config := gkafka.NewConfig() config.Servers = "localhost:9092" - config.Topics = "abc" + config.Topics = "test" client := gkafka.New(config) defer client.Close() - err := client.SyncSend(&gkafka.Message{Value: []byte("111")}) + err := client.SyncSend(&gkafka.Message{Value: []byte("1")}) fmt.Println(err) }