添加gkafka依赖包到vendor目录,并增加Topics方法获取kafa所有的topic

This commit is contained in:
john
2018-06-21 13:56:10 +08:00
parent 7d5f7973f8
commit 016cb3c0f3
2 changed files with 29 additions and 0 deletions

View File

@ -28,6 +28,7 @@ type Config struct {
type Client struct {
Config *Config
consumer *cluster.Consumer
rawConsumer sarama.Consumer
syncProducer sarama.SyncProducer
asyncProducer sarama.AsyncProducer
}
@ -83,6 +84,18 @@ func (client *Client) Close() {
}
}
// Get all topics from kafka server.
func (client *Client) Topics() ([]string, error) {
if client.rawConsumer == nil {
if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil {
return nil, err
} else {
client.rawConsumer = c
}
}
return client.rawConsumer.Topics()
}
// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.
func (client *Client) Receive() (*Message, error) {
config := cluster.NewConfig()

View File

@ -0,0 +1,16 @@
package main
import (
"gitee.com/johng/gf/g/database/gkafka"
"fmt"
)
func main () {
config := gkafka.NewConfig()
config.Servers = "localhost:9092"
client := gkafka.NewClient(config)
defer client.Close()
fmt.Println(client.Topics())
}