From 016cb3c0f3b1222a603fd1f4f8a48179f1849993 Mon Sep 17 00:00:00 2001 From: john Date: Thu, 21 Jun 2018 13:56:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0gkafka=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=E5=8C=85=E5=88=B0vendor=E7=9B=AE=E5=BD=95=EF=BC=8C=E5=B9=B6?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0Topics=E6=96=B9=E6=B3=95=E8=8E=B7=E5=8F=96kaf?= =?UTF-8?q?a=E6=89=80=E6=9C=89=E7=9A=84topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/database/gkafka/gkafka.go | 13 +++++++++++++ geg/database/kafka/gkafka_topics.go | 16 ++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 geg/database/kafka/gkafka_topics.go diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index c98a26c73..4f85044c0 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -28,6 +28,7 @@ type Config struct { type Client struct { Config *Config consumer *cluster.Consumer + rawConsumer sarama.Consumer syncProducer sarama.SyncProducer asyncProducer sarama.AsyncProducer } @@ -83,6 +84,18 @@ func (client *Client) Close() { } } +// Get all topics from kafka server. +func (client *Client) Topics() ([]string, error) { + if client.rawConsumer == nil { + if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil { + return nil, err + } else { + client.rawConsumer = c + } + } + return client.rawConsumer.Topics() +} + // Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically. func (client *Client) Receive() (*Message, error) { config := cluster.NewConfig() diff --git a/geg/database/kafka/gkafka_topics.go b/geg/database/kafka/gkafka_topics.go new file mode 100644 index 000000000..b670b4c17 --- /dev/null +++ b/geg/database/kafka/gkafka_topics.go @@ -0,0 +1,16 @@ +package main + +import ( + "gitee.com/johng/gf/g/database/gkafka" + "fmt" +) + +func main () { + config := gkafka.NewConfig() + config.Servers = "localhost:9092" + + client := gkafka.NewClient(config) + defer client.Close() + + fmt.Println(client.Topics()) +}