diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index dbd596017..16981fdaa 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -96,23 +96,22 @@ func (client *Client) Close() { } // Get all topics from kafka server. +// 这里创建独立的消费客户端获取topics,获取完之后销毁该客户端对象。 func (client *Client) Topics() ([]string, error) { - if client.rawConsumer == nil { - if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil { - return nil, err - } else { - client.rawConsumer = c - } - } - if topics, err := client.rawConsumer.Topics(); err == nil { - for k, v := range topics { - if _, ok := ignoreTopics[v]; ok { - topics = append(topics[ : k], topics[k + 1 : ]...) - } - } - return topics, nil - } else { + if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil { return nil, err + } else { + if topics, err := c.Topics(); err == nil { + for k, v := range topics { + if _, ok := ignoreTopics[v]; ok { + topics = append(topics[ : k], topics[k + 1 : ]...) + } + } + c.Close() + return topics, nil + } else { + return nil, err + } } }