kafka第三方库sarama-cluster调整为johng-cn/sarama-cluster

This commit is contained in:
john
2018-09-19 15:08:33 +08:00
parent 0669d6c4b3
commit 76e06bedcd
7 changed files with 214 additions and 48 deletions

View File

@ -69,16 +69,59 @@ func (this *InterfaceInterfaceMap) Get(key interface{}) interface{} {
return val
}
// 获取键值,如果键值不存在则写入默认值
func (this *InterfaceInterfaceMap) GetWithDefault(key interface{}, value interface{}) interface{} {
// 设置kv缓存键值对内部会对键名的存在性使用写锁进行二次检索确认如果存在则不再写入返回键名对应的键值。
// 在高并发下有用,防止数据写入的并发逻辑错误。
func (this *InterfaceInterfaceMap) doSetWithLockCheck(key interface{}, value interface{}) interface{} {
this.mu.Lock()
val, ok := this.m[key]
if !ok {
this.m[key] = value
val = value
if v, ok := this.m[key]; ok {
this.mu.Unlock()
return v
}
if f, ok := value.(func() interface {}); ok {
value = f()
}
this.m[key] = value
this.mu.Unlock()
return val
return value
}
// 当键名存在时返回其键值,否则写入指定的键值
func (this *InterfaceInterfaceMap) GetOrSet(key interface{}, value interface{}) interface{} {
if v := this.Get(key); v == nil {
return this.doSetWithLockCheck(key, value)
} else {
return v
}
}
// 当键名存在时返回其键值,否则写入指定的键值,键值由指定的函数生成
func (this *InterfaceInterfaceMap) GetOrSetFunc(key interface{}, f func() interface{}) interface{} {
if v := this.Get(key); v == nil {
v = f()
this.doSetWithLockCheck(key, v)
return v
} else {
return v
}
}
// 与GetOrSetFunc不同的是f是在写锁机制内执行
func (this *InterfaceInterfaceMap) GetOrSetFuncLock(key interface{}, f func() interface{}) interface{} {
if v := this.Get(key); v == nil {
this.doSetWithLockCheck(key, f)
return v
} else {
return v
}
}
// 当键名不存在时写入并返回true否则返回false。
func (this *InterfaceInterfaceMap) SetIfNotExist(key interface{}, value interface{}) bool {
if !this.Contains(key) {
this.doSetWithLockCheck(key, value)
return true
}
return false
}
// 删除键值对

View File

@ -69,33 +69,59 @@ func (this *StringInterfaceMap) Get(key string) interface{} {
return val
}
// 获取键值,如果键值不存在则写入默认值
func (this *StringInterfaceMap) GetWithDefault(key string, value interface{}) interface{} {
this.mu.Lock()
val, ok := this.m[key]
if !ok {
this.m[key] = value
val = value
// 设置kv缓存键值对内部会对键名的存在性使用写锁进行二次检索确认如果存在则不再写入返回键名对应的键值。
// 在高并发下有用,防止数据写入的并发逻辑错误。
func (this *StringInterfaceMap) doSetWithLockCheck(key string, value interface{}) interface{} {
this.mu.Lock()
if v, ok := this.m[key]; ok {
this.mu.Unlock()
return v
}
this.mu.Unlock()
return val
if f, ok := value.(func() interface {}); ok {
value = f()
}
this.m[key] = value
this.mu.Unlock()
return value
}
// 当键名存在时返回其键值,否则写入指定的键值
func (this *StringInterfaceMap) GetOrSet(key string, value interface{}) interface{} {
if v := this.Get(key); v == nil {
return this.doSetWithLockCheck(key, value)
} else {
return v
}
}
// 当键名存在时返回其键值,否则写入指定的键值,键值由指定的函数生成
func (this *StringInterfaceMap) GetOrSetFunc(key string, f func() interface{}) interface{} {
if v := this.Get(key); v == nil {
value := f()
this.mu.Lock()
defer this.mu.Unlock()
// 写锁二次检索确认
if v, ok := this.m[key]; !ok {
this.m[key] = value
return value
} else {
return v
}
} else {
return v
}
if v := this.Get(key); v == nil {
v = f()
this.doSetWithLockCheck(key, v)
return v
} else {
return v
}
}
// 与GetOrSetFunc不同的是f是在写锁机制内执行
func (this *StringInterfaceMap) GetOrSetFuncLock(key string, f func() interface{}) interface{} {
if v := this.Get(key); v == nil {
this.doSetWithLockCheck(key, f)
return v
} else {
return v
}
}
// 当键名不存在时写入并返回true否则返回false。
func (this *StringInterfaceMap) SetIfNotExist(key string, value interface{}) bool {
if !this.Contains(key) {
this.doSetWithLockCheck(key, value)
return true
}
return false
}
// 删除键值对

View File

@ -11,7 +11,7 @@ import (
"time"
"strings"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/johng-cn/sarama-cluster"
"errors"
)
@ -115,20 +115,41 @@ func (client *Client) Topics() ([]string, error) {
}
}
// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.
func (client *Client) Receive() (*Message, error) {
// 初始化内部消费客户端
func (client *Client) initConsumer() error {
if client.consumer == nil {
config := cluster.NewConfig()
config.Config = client.Config.Config
config.Group.Return.Notifications = false
c, err := cluster.NewConsumer(strings.Split(client.Config.Servers, ","), client.Config.GroupId, strings.Split(client.Config.Topics, ","), config)
if err != nil {
return nil, err
return err
} else {
client.consumer = c
}
}
return nil
}
// 标记指定topic 分区开始读取位置
func (client *Client) MarkOffset(topic string, partition int, offset int, metadata...string) error {
if err := client.initConsumer(); err != nil {
return err
}
meta := ""
if len(metadata) > 0 {
meta = metadata[0]
}
client.consumer.MarkPartitionOffset(topic, int32(partition), int64(offset), meta)
return nil
}
// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.
func (client *Client) Receive() (*Message, error) {
if err := client.initConsumer(); err != nil {
return nil, err
}
errorsChan := client.consumer.Errors()
notifyChan := client.consumer.Notifications()
messageChan := client.consumer.Messages()

View File

@ -39,8 +39,8 @@ type memCacheItem struct {
// 异步队列数据项
type memCacheEvent struct {
k interface{} // 键名
e int64 // 过期时间
k interface{} // 键名
e int64 // 过期时间
}
// 创建底层的缓存对象
@ -112,6 +112,9 @@ func (c *memCache) doSetWithLockCheck(key interface{}, value interface{}, expire
c.dmu.Unlock()
return v
}
if f, ok := value.(func() interface {}); ok {
value = f()
}
c.data[key] = memCacheItem{v : value, e : expireTimestamp}
c.dmu.Unlock()
c.eventList.PushBack(memCacheEvent{k : key, e : expireTimestamp})
@ -127,9 +130,7 @@ func (c *memCache) getInternalExpire(expire int) int64 {
}
}
// 当键名不存在时写入并返回true否则返回false。
// 常用来做对并发性要求不高的内存锁。
func (c *memCache) SetIfNotExist(key interface{}, value interface{}, expire int) bool {
if !c.Contains(key) {
c.doSetWithLockCheck(key, value, expire)
@ -185,6 +186,16 @@ func (c *memCache) GetOrSetFunc(key interface{}, f func() interface{}, expire in
}
}
// 与GetOrSetFunc不同的是f是在写锁机制内执行
func (c *memCache) GetOrSetFuncLock(key interface{}, f func() interface{}, expire int) interface{} {
if v := c.Get(key); v == nil {
c.doSetWithLockCheck(key, f, expire)
return v
} else {
return v
}
}
// 是否存在指定的键名true表示存在false表示不存在。
func (c *memCache) Contains(key interface{}) bool {
return c.Get(key) != nil

View File

@ -17,10 +17,12 @@ func newKafkaClientConsumer(topic, group string) *gkafka.Client {
}
func main () {
group := "test-group-11"
client := newKafkaClientConsumer("test", group)
group := "test-group-206"
topic := "test"
client := newKafkaClientConsumer(topic, group)
defer client.Close()
client.MarkOffset(topic, 0, 6)
for {
fmt.Println(group + " reading...")
for {

View File

@ -4,6 +4,7 @@ import (
"gitee.com/johng/gf/g/database/gkafka"
"fmt"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/util/gconv"
)
// 创建kafka生产客户端
@ -19,6 +20,12 @@ func main () {
client := newKafkaClientProducer("test")
defer client.Close()
for i := 1; i < 10; i++ {
if err := client.SyncSend(&gkafka.Message{Value: []byte(gconv.String(i))}); err != nil {
fmt.Println(err)
}
}
if err := client.SyncSend(&gkafka.Message{Value: []byte(gtime.Now().String())}); err != nil {
fmt.Println(err)
}

View File

@ -1,14 +1,70 @@
package main
import (
"gitee.com/johng/gf/g/container/gmap"
"fmt"
"github.com/Shopify/sarama"
"log"
"sync"
)
func main() {
m1 := gmap.NewIntInterfaceMap()
m1.Set(1, gmap.NewIntIntMap())
v1 := m1.Get(1).(*gmap.IntIntMap)
v1.Set(2, 2)
fmt.Println(m1.Get(1).(*gmap.IntIntMap).Size())
group := "test-group-100"
topic := "test"
config := sarama.NewConfig()
config.Version = sarama.V0_10_2_0
client, err := sarama.NewClient([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln(err)
}
offsetManager, err := sarama.NewOffsetManagerFromClient(group, client)
if err != nil {
log.Fatalln(err)
}
pids, err := client.Partitions(topic)
if err != nil {
log.Fatalln(err)
}
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatalln(err)
}
defer consumer.Close()
wg := &sync.WaitGroup{}
for _, v := range pids {
wg.Add(1)
go consume(wg, consumer, offsetManager, v)
}
wg.Wait()
}
func consume(wg *sync.WaitGroup, c sarama.Consumer, om sarama.OffsetManager, p int32) {
defer wg.Done()
pom, err := om.ManagePartition("test", p)
if err != nil {
log.Fatalln(err)
}
defer pom.Close()
offset, _ := pom.NextOffset()
if offset == -1 {
offset = sarama.OffsetOldest
}
pc, err := c.ConsumePartition("test", p, 6)
if err != nil {
log.Fatalln(err)
}
defer pc.Close()
for msg := range pc.Messages() {
log.Printf("[%v] Consumed message offset %v\n", p, msg.Offset)
pom.MarkOffset(msg.Offset + 1, "")
}
}