mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
gkafka.Client.Topics方法增加过滤功能,去掉系统topic;完善gtime.StrToTime方法对标注UTC时间的处理
This commit is contained in:
@ -15,6 +15,13 @@ import (
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
var (
|
||||
// 当使用Topics方法获取所有topic后,进行过滤忽略的topic,多个以','号分隔
|
||||
ignoreTopics = map[string]bool {
|
||||
"__consumer_offsets" : true,
|
||||
}
|
||||
)
|
||||
|
||||
// kafka Client based on sarama.Config
|
||||
type Config struct {
|
||||
GroupId string // group id for consumer.
|
||||
@ -93,7 +100,16 @@ func (client *Client) Topics() ([]string, error) {
|
||||
client.rawConsumer = c
|
||||
}
|
||||
}
|
||||
return client.rawConsumer.Topics()
|
||||
if topics, err := client.rawConsumer.Topics(); err == nil {
|
||||
for k, v := range topics {
|
||||
if _, ok := ignoreTopics[v]; ok {
|
||||
topics = append(topics[ : k], topics[k + 1 : ]...)
|
||||
}
|
||||
}
|
||||
return topics, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.
|
||||
|
||||
@ -125,6 +125,9 @@ func StrToTime(str string, format...string) (time.Time, error) {
|
||||
}
|
||||
}
|
||||
// 如果字符串中有时区信息,那么执行时区转换,将时区转成UTC
|
||||
if match[4] != "" && match[6] == "" {
|
||||
match[6] = "000000"
|
||||
}
|
||||
if match[6] != "" {
|
||||
zone := strings.Replace(match[6], ":", "", -1)
|
||||
zone = strings.TrimLeft(zone, "+-")
|
||||
|
||||
@ -16,6 +16,7 @@ func main() {
|
||||
"2014-01-17T01:19:15+08:00",
|
||||
"2018-02-09T20:46:17.897Z",
|
||||
"2018-02-09 20:46:17.897",
|
||||
"2018-02-09T20:46:17Z",
|
||||
"2018-02-09 20:46:17",
|
||||
"2018-02-09",
|
||||
}
|
||||
@ -12,6 +12,7 @@ func main() {
|
||||
"2014-01-17T01:19:15+08:00",
|
||||
"2018-02-09T20:46:17.897Z",
|
||||
"2018-02-09 20:46:17.897",
|
||||
"2018-02-09T20:46:17Z",
|
||||
"2018-02-09 20:46:17",
|
||||
"2018-02-09",
|
||||
}
|
||||
Reference in New Issue
Block a user