mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
改进gkafka,获取topics使用临时的消费对象
This commit is contained in:
@ -96,23 +96,22 @@ func (client *Client) Close() {
|
||||
}
|
||||
|
||||
// Get all topics from kafka server.
|
||||
// 这里创建独立的消费客户端获取topics,获取完之后销毁该客户端对象。
|
||||
func (client *Client) Topics() ([]string, error) {
|
||||
if client.rawConsumer == nil {
|
||||
if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
client.rawConsumer = c
|
||||
}
|
||||
}
|
||||
if topics, err := client.rawConsumer.Topics(); err == nil {
|
||||
for k, v := range topics {
|
||||
if _, ok := ignoreTopics[v]; ok {
|
||||
topics = append(topics[ : k], topics[k + 1 : ]...)
|
||||
}
|
||||
}
|
||||
return topics, nil
|
||||
} else {
|
||||
if c, err := sarama.NewConsumer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
if topics, err := c.Topics(); err == nil {
|
||||
for k, v := range topics {
|
||||
if _, ok := ignoreTopics[v]; ok {
|
||||
topics = append(topics[ : k], topics[k + 1 : ]...)
|
||||
}
|
||||
}
|
||||
c.Close()
|
||||
return topics, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user