From 76e06bedcde673192c4b6cc71b5f0c2fa7d4843d Mon Sep 17 00:00:00 2001 From: john Date: Wed, 19 Sep 2018 15:08:33 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E7=AC=AC=E4=B8=89=E6=96=B9=E5=BA=93sarama?= =?UTF-8?q?-cluster=E8=B0=83=E6=95=B4=E4=B8=BAjohng-cn/sarama-cluster?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/container/gmap/interface_interface_map.go | 57 ++++++++++++++-- g/container/gmap/string_interface_map.go | 72 ++++++++++++++------- g/database/gkafka/gkafka.go | 31 +++++++-- g/os/gcache/gcache_mem_cache.go | 19 ++++-- geg/database/kafka/gkafka_consumer.go | 6 +- geg/database/kafka/gkafka_producer.go | 7 ++ geg/other/test.go | 70 ++++++++++++++++++-- 7 files changed, 214 insertions(+), 48 deletions(-) diff --git a/g/container/gmap/interface_interface_map.go b/g/container/gmap/interface_interface_map.go index d1de40ce4..32a05a048 100644 --- a/g/container/gmap/interface_interface_map.go +++ b/g/container/gmap/interface_interface_map.go @@ -69,16 +69,59 @@ func (this *InterfaceInterfaceMap) Get(key interface{}) interface{} { return val } -// 获取键值,如果键值不存在则写入默认值 -func (this *InterfaceInterfaceMap) GetWithDefault(key interface{}, value interface{}) interface{} { +// 设置kv缓存键值对,内部会对键名的存在性使用写锁进行二次检索确认,如果存在则不再写入;返回键名对应的键值。 +// 在高并发下有用,防止数据写入的并发逻辑错误。 +func (this *InterfaceInterfaceMap) doSetWithLockCheck(key interface{}, value interface{}) interface{} { this.mu.Lock() - val, ok := this.m[key] - if !ok { - this.m[key] = value - val = value + if v, ok := this.m[key]; ok { + this.mu.Unlock() + return v } + if f, ok := value.(func() interface {}); ok { + value = f() + } + this.m[key] = value this.mu.Unlock() - return val + return value +} + +// 当键名存在时返回其键值,否则写入指定的键值 +func (this *InterfaceInterfaceMap) GetOrSet(key interface{}, value interface{}) interface{} { + if v := this.Get(key); v == nil { + return this.doSetWithLockCheck(key, value) + } else { + return v + } +} + +// 当键名存在时返回其键值,否则写入指定的键值,键值由指定的函数生成 +func (this *InterfaceInterfaceMap) GetOrSetFunc(key interface{}, f func() interface{}) interface{} { + if v := this.Get(key); v == nil { + v = f() + this.doSetWithLockCheck(key, v) + return v + } else { + return v + } +} + +// 与GetOrSetFunc不同的是,f是在写锁机制内执行 +func (this *InterfaceInterfaceMap) GetOrSetFuncLock(key interface{}, f func() interface{}) interface{} { + if v := this.Get(key); v == nil { + this.doSetWithLockCheck(key, f) + return v + } else { + return v + } +} + +// 当键名不存在时写入,并返回true;否则返回false。 +func (this *InterfaceInterfaceMap) SetIfNotExist(key interface{}, value interface{}) bool { + if !this.Contains(key) { + this.doSetWithLockCheck(key, value) + return true + } + return false } // 删除键值对 diff --git a/g/container/gmap/string_interface_map.go b/g/container/gmap/string_interface_map.go index 4fb7e17a5..5e76e9201 100644 --- a/g/container/gmap/string_interface_map.go +++ b/g/container/gmap/string_interface_map.go @@ -69,33 +69,59 @@ func (this *StringInterfaceMap) Get(key string) interface{} { return val } -// 获取键值,如果键值不存在则写入默认值 -func (this *StringInterfaceMap) GetWithDefault(key string, value interface{}) interface{} { - this.mu.Lock() - val, ok := this.m[key] - if !ok { - this.m[key] = value - val = value +// 设置kv缓存键值对,内部会对键名的存在性使用写锁进行二次检索确认,如果存在则不再写入;返回键名对应的键值。 +// 在高并发下有用,防止数据写入的并发逻辑错误。 +func (this *StringInterfaceMap) doSetWithLockCheck(key string, value interface{}) interface{} { + this.mu.Lock() + if v, ok := this.m[key]; ok { + this.mu.Unlock() + return v } - this.mu.Unlock() - return val + if f, ok := value.(func() interface {}); ok { + value = f() + } + this.m[key] = value + this.mu.Unlock() + return value } +// 当键名存在时返回其键值,否则写入指定的键值 +func (this *StringInterfaceMap) GetOrSet(key string, value interface{}) interface{} { + if v := this.Get(key); v == nil { + return this.doSetWithLockCheck(key, value) + } else { + return v + } +} + +// 当键名存在时返回其键值,否则写入指定的键值,键值由指定的函数生成 func (this *StringInterfaceMap) GetOrSetFunc(key string, f func() interface{}) interface{} { - if v := this.Get(key); v == nil { - value := f() - this.mu.Lock() - defer this.mu.Unlock() - // 写锁二次检索确认 - if v, ok := this.m[key]; !ok { - this.m[key] = value - return value - } else { - return v - } - } else { - return v - } + if v := this.Get(key); v == nil { + v = f() + this.doSetWithLockCheck(key, v) + return v + } else { + return v + } +} + +// 与GetOrSetFunc不同的是,f是在写锁机制内执行 +func (this *StringInterfaceMap) GetOrSetFuncLock(key string, f func() interface{}) interface{} { + if v := this.Get(key); v == nil { + this.doSetWithLockCheck(key, f) + return v + } else { + return v + } +} + +// 当键名不存在时写入,并返回true;否则返回false。 +func (this *StringInterfaceMap) SetIfNotExist(key string, value interface{}) bool { + if !this.Contains(key) { + this.doSetWithLockCheck(key, value) + return true + } + return false } // 删除键值对 diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index 9c1967641..74814b6e6 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -11,7 +11,7 @@ import ( "time" "strings" "github.com/Shopify/sarama" - "github.com/bsm/sarama-cluster" + "github.com/johng-cn/sarama-cluster" "errors" ) @@ -115,20 +115,41 @@ func (client *Client) Topics() ([]string, error) { } } -// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically. -func (client *Client) Receive() (*Message, error) { +// 初始化内部消费客户端 +func (client *Client) initConsumer() error { if client.consumer == nil { config := cluster.NewConfig() config.Config = client.Config.Config config.Group.Return.Notifications = false - c, err := cluster.NewConsumer(strings.Split(client.Config.Servers, ","), client.Config.GroupId, strings.Split(client.Config.Topics, ","), config) if err != nil { - return nil, err + return err } else { client.consumer = c } } + return nil +} + +// 标记指定topic 分区开始读取位置 +func (client *Client) MarkOffset(topic string, partition int, offset int, metadata...string) error { + if err := client.initConsumer(); err != nil { + return err + } + meta := "" + if len(metadata) > 0 { + meta = metadata[0] + } + client.consumer.MarkPartitionOffset(topic, int32(partition), int64(offset), meta) + return nil +} + + +// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically. +func (client *Client) Receive() (*Message, error) { + if err := client.initConsumer(); err != nil { + return nil, err + } errorsChan := client.consumer.Errors() notifyChan := client.consumer.Notifications() messageChan := client.consumer.Messages() diff --git a/g/os/gcache/gcache_mem_cache.go b/g/os/gcache/gcache_mem_cache.go index 79af2ccb6..98da545d4 100644 --- a/g/os/gcache/gcache_mem_cache.go +++ b/g/os/gcache/gcache_mem_cache.go @@ -39,8 +39,8 @@ type memCacheItem struct { // 异步队列数据项 type memCacheEvent struct { - k interface{} // 键名 - e int64 // 过期时间 + k interface{} // 键名 + e int64 // 过期时间 } // 创建底层的缓存对象 @@ -112,6 +112,9 @@ func (c *memCache) doSetWithLockCheck(key interface{}, value interface{}, expire c.dmu.Unlock() return v } + if f, ok := value.(func() interface {}); ok { + value = f() + } c.data[key] = memCacheItem{v : value, e : expireTimestamp} c.dmu.Unlock() c.eventList.PushBack(memCacheEvent{k : key, e : expireTimestamp}) @@ -127,9 +130,7 @@ func (c *memCache) getInternalExpire(expire int) int64 { } } - // 当键名不存在时写入,并返回true;否则返回false。 -// 常用来做对并发性要求不高的内存锁。 func (c *memCache) SetIfNotExist(key interface{}, value interface{}, expire int) bool { if !c.Contains(key) { c.doSetWithLockCheck(key, value, expire) @@ -185,6 +186,16 @@ func (c *memCache) GetOrSetFunc(key interface{}, f func() interface{}, expire in } } +// 与GetOrSetFunc不同的是,f是在写锁机制内执行 +func (c *memCache) GetOrSetFuncLock(key interface{}, f func() interface{}, expire int) interface{} { + if v := c.Get(key); v == nil { + c.doSetWithLockCheck(key, f, expire) + return v + } else { + return v + } +} + // 是否存在指定的键名,true表示存在,false表示不存在。 func (c *memCache) Contains(key interface{}) bool { return c.Get(key) != nil diff --git a/geg/database/kafka/gkafka_consumer.go b/geg/database/kafka/gkafka_consumer.go index 4b960b2f3..a5d76beca 100644 --- a/geg/database/kafka/gkafka_consumer.go +++ b/geg/database/kafka/gkafka_consumer.go @@ -17,10 +17,12 @@ func newKafkaClientConsumer(topic, group string) *gkafka.Client { } func main () { - group := "test-group-11" - client := newKafkaClientConsumer("test", group) + group := "test-group-206" + topic := "test" + client := newKafkaClientConsumer(topic, group) defer client.Close() + client.MarkOffset(topic, 0, 6) for { fmt.Println(group + " reading...") for { diff --git a/geg/database/kafka/gkafka_producer.go b/geg/database/kafka/gkafka_producer.go index a1192ab1b..a13356066 100644 --- a/geg/database/kafka/gkafka_producer.go +++ b/geg/database/kafka/gkafka_producer.go @@ -4,6 +4,7 @@ import ( "gitee.com/johng/gf/g/database/gkafka" "fmt" "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/util/gconv" ) // 创建kafka生产客户端 @@ -19,6 +20,12 @@ func main () { client := newKafkaClientProducer("test") defer client.Close() + for i := 1; i < 10; i++ { + if err := client.SyncSend(&gkafka.Message{Value: []byte(gconv.String(i))}); err != nil { + fmt.Println(err) + } + } + if err := client.SyncSend(&gkafka.Message{Value: []byte(gtime.Now().String())}); err != nil { fmt.Println(err) } diff --git a/geg/other/test.go b/geg/other/test.go index 81a3fb2a7..ee634b889 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,14 +1,70 @@ package main import ( - "gitee.com/johng/gf/g/container/gmap" - "fmt" + "github.com/Shopify/sarama" + "log" + "sync" ) func main() { - m1 := gmap.NewIntInterfaceMap() - m1.Set(1, gmap.NewIntIntMap()) - v1 := m1.Get(1).(*gmap.IntIntMap) - v1.Set(2, 2) - fmt.Println(m1.Get(1).(*gmap.IntIntMap).Size()) + group := "test-group-100" + topic := "test" + config := sarama.NewConfig() + config.Version = sarama.V0_10_2_0 + client, err := sarama.NewClient([]string{"localhost:9092"}, config) + if err != nil { + log.Fatalln(err) + } + + offsetManager, err := sarama.NewOffsetManagerFromClient(group, client) + if err != nil { + log.Fatalln(err) + } + + pids, err := client.Partitions(topic) + if err != nil { + log.Fatalln(err) + } + + consumer, err := sarama.NewConsumerFromClient(client) + if err != nil { + log.Fatalln(err) + } + + defer consumer.Close() + + wg := &sync.WaitGroup{} + + for _, v := range pids { + wg.Add(1) + go consume(wg, consumer, offsetManager, v) + } + + wg.Wait() +} + +func consume(wg *sync.WaitGroup, c sarama.Consumer, om sarama.OffsetManager, p int32) { + defer wg.Done() + + pom, err := om.ManagePartition("test", p) + if err != nil { + log.Fatalln(err) + } + defer pom.Close() + + offset, _ := pom.NextOffset() + if offset == -1 { + offset = sarama.OffsetOldest + } + + pc, err := c.ConsumePartition("test", p, 6) + if err != nil { + log.Fatalln(err) + } + defer pc.Close() + + for msg := range pc.Messages() { + log.Printf("[%v] Consumed message offset %v\n", p, msg.Offset) + pom.MarkOffset(msg.Offset + 1, "") + } } \ No newline at end of file