From 78cc8533a5331bfeadd19d0ae99a4e36396ac44e Mon Sep 17 00:00:00 2001 From: john Date: Tue, 23 Oct 2018 18:13:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E8=BF=9Bgkafka=EF=BC=8C=E8=8E=B7?= =?UTF-8?q?=E5=8F=96topics=E4=BD=BF=E7=94=A8=E4=B8=B4=E6=97=B6=E7=9A=84?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E5=AF=B9=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/database/gkafka/gkafka.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index dbd596017..16981fdaa 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -96,23 +96,22 @@ func (client *Client) Close() { } // Get all topics from kafka server. +// 这里创建独立的消费客户端获取topics,获取完之后销毁该客户端对象。 func (client *Client) Topics() ([]string, error) { - if client.rawConsumer == nil { - if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil { - return nil, err - } else { - client.rawConsumer = c - } - } - if topics, err := client.rawConsumer.Topics(); err == nil { - for k, v := range topics { - if _, ok := ignoreTopics[v]; ok { - topics = append(topics[ : k], topics[k + 1 : ]...) - } - } - return topics, nil - } else { + if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil { return nil, err + } else { + if topics, err := c.Topics(); err == nil { + for k, v := range topics { + if _, ok := ignoreTopics[v]; ok { + topics = append(topics[ : k], topics[k + 1 : ]...) + } + } + c.Close() + return topics, nil + } else { + return nil, err + } } }