优化gkafka消费端

This commit is contained in:
john
2018-07-26 11:01:17 +08:00
parent eccab347f9
commit 4a0749c209
2 changed files with 60 additions and 46 deletions

View File

@ -12,7 +12,7 @@ import (
"strings"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"gitee.com/johng/gf/g/os/glog"
"errors"
)
var (
@ -114,45 +114,47 @@ func (client *Client) Topics() ([]string, error) {
// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.
func (client *Client) Receive() (*Message, error) {
config := cluster.NewConfig()
config.Config = client.Config.Config
config.Group.Return.Notifications = true
if client.consumer == nil {
config := cluster.NewConfig()
config.Config = client.Config.Config
config.Group.Return.Notifications = false
c, err := cluster.NewConsumer(strings.Split(client.Config.Servers, ","), client.Config.GroupId, strings.Split(client.Config.Topics, ","), config)
if err != nil {
return nil, err
} else {
client.consumer = c
go func(c *cluster.Consumer) {
errors := c.Errors()
notify := c.Notifications()
for {
select {
case err := <-errors:
if err != nil {
glog.Error(err)
}
case <-notify:
}
}
}
errorsChan := client.consumer.Errors()
notifyChan := client.consumer.Notifications()
messageChan := client.consumer.Messages()
for {
select {
case msg := <- messageChan:
if client.Config.AutoMarkOffset {
client.consumer.MarkOffset(msg, "")
}
}(client.consumer)
return &Message {
Value : msg.Value,
Key : msg.Key,
Topic : msg.Topic,
Partition : int(msg.Partition),
Offset : int(msg.Offset),
client : client,
consumerMsg : msg,
}, nil
case err := <-errorsChan:
if err != nil {
return nil, err
}
case <-notifyChan:
}
}
msg := <- client.consumer.Messages()
if client.Config.AutoMarkOffset {
client.consumer.MarkOffset(msg, "")
}
return &Message {
Value : msg.Value,
Key : msg.Key,
Topic : msg.Topic,
Partition : int(msg.Partition),
Offset : int(msg.Offset),
client : client,
consumerMsg : msg,
}, nil
return nil, errors.New("unknown error")
}
// Send data to kafka in synchronized way.
@ -181,24 +183,24 @@ func (client *Client) AsyncSend(message *Message) error {
return err
} else {
client.asyncProducer = p
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
if err != nil {
glog.Error(err)
}
case <-success:
}
}
}(client.asyncProducer)
//go func(p sarama.AsyncProducer) {
// errors := p.Errors()
// success := p.Successes()
// for {
// select {
// case err := <-errors:
// if err != nil {
// glog.Error(err)
// }
// case <-success:
// }
// }
//}(client.asyncProducer)
}
}
for _, topic := range strings.Split(client.Config.Topics, ",") {
msg := messageToProducerMessage(message)
msg := messageToProducerMessage(message)
msg.Topic = topic
client.asyncProducer.Input() <- msg
}

View File

@ -2,10 +2,22 @@ package main
import (
"fmt"
"gitee.com/johng/gf/g/util/gregex"
"time"
)
var c = make(chan int, 10)
func Chan() chan int {
fmt.Println("yes chan")
return c
}
func main() {
fmt.Println(gregex.Quote(`/user/list/1.html`))
for {
select {
case <- Chan():
default:
time.Sleep(time.Second)
}
}
}