完善gkafka示例程序

This commit is contained in:
john
2018-07-27 09:41:59 +08:00
parent 9815a7d36a
commit c8efea5adc
2 changed files with 44 additions and 21 deletions

View File

@ -1,24 +1,35 @@
package main
import (
"gitee.com/johng/gf/g/database/gkafka"
"fmt"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/database/gkafka"
"time"
)
func main () {
config := gkafka.NewConfig()
config.GroupId = "group_1"
config.Servers = "localhost:9092"
config.Topics = "test"
config.AutoMarkOffset = false
// 创建kafka消费客户端
func newKafkaClientConsumer(topic, group string) *gkafka.Client {
kafkaConfig := gkafka.NewConfig()
kafkaConfig.Servers = "localhost:9092"
kafkaConfig.AutoMarkOffset = false
kafkaConfig.Topics = topic
kafkaConfig.GroupId = group
return gkafka.NewClient(kafkaConfig)
}
client := gkafka.NewClient(config)
func main () {
client := newKafkaClientConsumer("test", "test-group-1")
defer client.Close()
for {
msg, err := client.Receive()
fmt.Printf("%s value: %s, err: %v\n", gtime.Datetime(), string(msg.Value), err)
msg.MarkOffset()
fmt.Println("reading...")
for i := 1; i < 10; i++ {
if msg, err := client.Receive(); err != nil {
fmt.Println(err)
} else {
fmt.Println(string(msg.Value))
}
}
time.Sleep(3*time.Second)
}
}

View File

@ -3,15 +3,27 @@ package main
import (
"gitee.com/johng/gf/g/database/gkafka"
"fmt"
"gitee.com/johng/gf/g/util/gconv"
)
func main () {
config := gkafka.NewConfig()
config.Servers = "localhost:9092"
config.Topics = "test"
client := gkafka.NewClient(config)
defer client.Close()
err := client.SyncSend(&gkafka.Message{Value: []byte("1")})
fmt.Println(err)
// 创建kafka生产客户端
func newKafkaClientProducer(topic string) *gkafka.Client {
kafkaConfig := gkafka.NewConfig()
kafkaConfig.Servers = "localhost:9092"
kafkaConfig.AutoMarkOffset = false
kafkaConfig.Topics = topic
return gkafka.NewClient(kafkaConfig)
}
func main () {
client := newKafkaClientProducer("test")
defer client.Close()
for i := 1; i < 10; i++ {
if err := client.SyncSend(&gkafka.Message{Value: []byte(gconv.String(i))}); err != nil {
fmt.Println(err)
}
}
fmt.Println("done")
}