diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index c98a26c73..4f85044c0 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -28,6 +28,7 @@ type Config struct { type Client struct { Config *Config consumer *cluster.Consumer + rawConsumer sarama.Consumer syncProducer sarama.SyncProducer asyncProducer sarama.AsyncProducer } @@ -83,6 +84,18 @@ func (client *Client) Close() { } } +// Get all topics from kafka server. +func (client *Client) Topics() ([]string, error) { + if client.rawConsumer == nil { + if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil { + return nil, err + } else { + client.rawConsumer = c + } + } + return client.rawConsumer.Topics() +} + // Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically. func (client *Client) Receive() (*Message, error) { config := cluster.NewConfig() diff --git a/geg/database/kafka/gkafka_topics.go b/geg/database/kafka/gkafka_topics.go new file mode 100644 index 000000000..b670b4c17 --- /dev/null +++ b/geg/database/kafka/gkafka_topics.go @@ -0,0 +1,16 @@ +package main + +import ( + "gitee.com/johng/gf/g/database/gkafka" + "fmt" +) + +func main () { + config := gkafka.NewConfig() + config.Servers = "localhost:9092" + + client := gkafka.NewClient(config) + defer client.Close() + + fmt.Println(client.Topics()) +}