From d407e8dca784b21450ef7ef2b12de31452b45b11 Mon Sep 17 00:00:00 2001 From: john Date: Fri, 15 Jun 2018 18:31:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90kafka=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/database/gkafka/gkafka.go | 167 ++++++++++++++++++++++++ geg/database/kafka/gkafka_consumer.go | 20 +++ geg/database/kafka/gkafka_producer.go | 16 +++ geg/database/kafka/kafka.go | 124 ++++++++++++++++++ geg/database/mysql/{mysql.go => gdb.go} | 0 5 files changed, 327 insertions(+) create mode 100644 g/database/gkafka/gkafka.go create mode 100644 geg/database/kafka/gkafka_consumer.go create mode 100644 geg/database/kafka/gkafka_producer.go create mode 100644 geg/database/kafka/kafka.go rename geg/database/mysql/{mysql.go => gdb.go} (100%) diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go new file mode 100644 index 000000000..b35d347d0 --- /dev/null +++ b/g/database/gkafka/gkafka.go @@ -0,0 +1,167 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// Kafka客户端. +package gkafka + +import ( + "time" + "strings" + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" + "gitee.com/johng/gf/g/os/glog" +) + +// kafka Client based on sarama.Config +type Config struct { + GroupId string // group id for consumer. + Servers string // server list, multiple servers joined by ','. + Topics string // topic list, multiple topics joined by ','. + sarama.Config +} + +// Kafka Client(Consumer/SyncProducer/AsyncProducer) +type Client struct { + config *Config + consumer *cluster.Consumer + syncProducer sarama.SyncProducer + asyncProducer sarama.AsyncProducer +} + +// Kafka Message. +type Message struct { + Value []byte + Key []byte + Topic string + Partition int + Offset int +} + + +// New a kafka client. +func New(config Config) *Client { + config.Config = *sarama.NewConfig() + + config.Consumer.Return.Errors = true + config.Consumer.Offsets.CommitInterval = 1 * time.Second + config.Consumer.Offsets.Initial = sarama.OffsetOldest + + config.Producer.Return.Errors = true + config.Producer.Return.Successes = true + config.Producer.Timeout = 5 * time.Second + + return &Client { + config : &config, + } +} + +// Close client. +func (client *Client) Close() { + if client.consumer != nil { + client.consumer.Close() + } + if client.syncProducer != nil { + client.syncProducer.Close() + } + if client.asyncProducer != nil { + client.asyncProducer.Close() + } +} + +// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically. +func (client *Client) Receive() (*Message, error) { + config := cluster.NewConfig() + config.Config = client.config.Config + config.Group.Return.Notifications = true + if client.consumer == nil { + c, err := cluster.NewConsumer(strings.Split(client.config.Servers, ","), client.config.GroupId, strings.Split(client.config.Topics, ","), config) + if err != nil { + return nil, err + } else { + client.consumer = c + go func(c *cluster.Consumer) { + errors := c.Errors() + notify := c.Notifications() + for { + select { + case err := <-errors: + glog.Error(err) + case <-notify: + } + } + }(client.consumer) + } + } + + msg := <- client.consumer.Messages() + client.consumer.MarkOffset(msg, "") + return &Message { + Value : msg.Value, + Key : msg.Key, + Topic : msg.Topic, + Partition : int(msg.Partition), + Offset : int(msg.Offset), + }, nil +} + +// Send data to kafka in synchronized way. +func (client *Client) SyncSend(message *Message) error { + if client.syncProducer == nil { + if p, err := sarama.NewSyncProducer(strings.Split(client.config.Servers, ","), &client.config.Config); err != nil { + return err + } else { + client.syncProducer = p + } + } + for _, topic := range strings.Split(client.config.Topics, ",") { + msg := messageToProducerMessage(message) + msg.Topic = topic + if _, _, err := client.syncProducer.SendMessage(msg); err != nil { + return err + } + } + return nil +} + +// Send data to kafka in asynchronized way. +func (client *Client) AsyncSend(message *Message) error { + if client.asyncProducer == nil { + if p, err := sarama.NewAsyncProducer(strings.Split(client.config.Servers, ","), &client.config.Config); err != nil { + return err + } else { + client.asyncProducer = p + go func(p sarama.AsyncProducer) { + errors := p.Errors() + success := p.Successes() + for { + select { + case err := <-errors: + glog.Error(err) + case <-success: + } + } + }(client.asyncProducer) + } + } + + for _, topic := range strings.Split(client.config.Topics, ",") { + msg := messageToProducerMessage(message) + msg.Topic = topic + client.asyncProducer.Input() <- msg + } + return nil +} + +// Convert *gkafka.Message to *sarama.ProducerMessage +func messageToProducerMessage(message *Message) *sarama.ProducerMessage { + return &sarama.ProducerMessage { + Topic : message.Topic, + Key : sarama.ByteEncoder(message.Key), + Value : sarama.ByteEncoder(message.Value), + Partition : int32(message.Partition), + Offset : int64(message.Offset), + } +} diff --git a/geg/database/kafka/gkafka_consumer.go b/geg/database/kafka/gkafka_consumer.go new file mode 100644 index 000000000..84e257639 --- /dev/null +++ b/geg/database/kafka/gkafka_consumer.go @@ -0,0 +1,20 @@ +package main + +import ( + "gitee.com/johng/gf/g/database/gkafka" + "fmt" +) + +func main () { + client := gkafka.New(gkafka.Config{ + GroupId : "group_1", + Servers : "localhost:9092", + Topics : "abc", + }) + defer client.Close() + for { + msg, err := client.Receive() + fmt.Println(err) + fmt.Println(string(msg.Value)) + } +} diff --git a/geg/database/kafka/gkafka_producer.go b/geg/database/kafka/gkafka_producer.go new file mode 100644 index 000000000..fa5912ca9 --- /dev/null +++ b/geg/database/kafka/gkafka_producer.go @@ -0,0 +1,16 @@ +package main + +import ( + "gitee.com/johng/gf/g/database/gkafka" + "fmt" +) + +func main () { + client := gkafka.New(gkafka.Config{ + Servers : "localhost:9092", + Topics : "abc", + }) + defer client.Close() + err := client.SyncSend(&gkafka.Message{Value: []byte("111")}) + fmt.Println(err) +} diff --git a/geg/database/kafka/kafka.go b/geg/database/kafka/kafka.go new file mode 100644 index 000000000..89b4caa28 --- /dev/null +++ b/geg/database/kafka/kafka.go @@ -0,0 +1,124 @@ +package main + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "strings" + "time" + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" +) + +var ( + topics = "abc" +) + +func main() { + for { + fmt.Println("time to check") + syncProducer() + consumer() + time.Sleep(time.Second) + } +} + + +// consumer 消费者 +func consumer() { + groupID := "group-12345" + config := cluster.NewConfig() + config.Group.Return.Notifications = true + config.Consumer.Return.Errors = true + config.Consumer.Offsets.CommitInterval = 1 * time.Second + config.Consumer.Offsets.Initial = sarama.OffsetOldest + + c, err := cluster.NewConsumer(strings.Split("localhost:9092", ","), groupID, strings.Split(topics, ","), config) + if err != nil { + fmt.Errorf("Failed open consumer: %v", err) + return + } + defer c.Close() + + go func(c *cluster.Consumer) { + errors := c.Errors() + notify := c.Notifications() + for { + select { + case err := <-errors: + fmt.Println(err) + case <-notify: + } + } + }(c) + + for msg := range c.Messages() { + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value) + c.MarkOffset(msg, "") + } +} + +// syncProducer 同步生产者 +// 并发量小时,可以用这种方式 +func syncProducer() { + config := sarama.NewConfig() + // config.Producer.RequiredAcks = sarama.WaitForAll + // config.Producer.Partitioner = sarama.NewRandomPartitioner + config.Producer.Return.Successes = true + config.Producer.Timeout = 5 * time.Second + p, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config) + defer p.Close() + if err != nil { + fmt.Println(err) + return + } + + v := "sync: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) + fmt.Fprintln(os.Stdout, v) + + msg := &sarama.ProducerMessage{ + Topic: topics, + Value: sarama.ByteEncoder(v), + } + if _, _, err := p.SendMessage(msg); err != nil { + fmt.Println(err) + return + } +} + +// asyncProducer 异步生产者 +// 并发量大时,必须采用这种方式 +func asyncProducer() { + config := sarama.NewConfig() + config.Producer.Return.Successes = true + config.Producer.Timeout = 5 * time.Second + p, err := sarama.NewAsyncProducer(strings.Split("localhost:9092", ","), config) + defer p.Close() + if err != nil { + return + } + + //必须有这个匿名函数内容 + go func(p sarama.AsyncProducer) { + errors := p.Errors() + success := p.Successes() + for { + select { + case err := <-errors: + if err != nil { + fmt.Println(err) + } + case <-success: + } + } + }(p) + + v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) + fmt.Fprintln(os.Stdout, v) + msg := &sarama.ProducerMessage{ + Topic: topics, + Value: sarama.ByteEncoder(v), + } + p.Input() <- msg +} \ No newline at end of file diff --git a/geg/database/mysql/mysql.go b/geg/database/mysql/gdb.go similarity index 100% rename from geg/database/mysql/mysql.go rename to geg/database/mysql/gdb.go