From 9efffa49e22b409fa4a46c5cdf6728d33667cabd Mon Sep 17 00:00:00 2001 From: john Date: Thu, 26 Jul 2018 11:01:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96gkafka=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/database/gkafka/gkafka.go | 90 +++++++++++++++++++------------------ geg/other/test.go | 16 ++++++- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/g/database/gkafka/gkafka.go b/g/database/gkafka/gkafka.go index 54e42cad1..4ff086d20 100644 --- a/g/database/gkafka/gkafka.go +++ b/g/database/gkafka/gkafka.go @@ -12,7 +12,7 @@ import ( "strings" "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" - "gitee.com/johng/gf/g/os/glog" + "errors" ) var ( @@ -114,45 +114,47 @@ func (client *Client) Topics() ([]string, error) { // Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically. func (client *Client) Receive() (*Message, error) { - config := cluster.NewConfig() - config.Config = client.Config.Config - config.Group.Return.Notifications = true if client.consumer == nil { + config := cluster.NewConfig() + config.Config = client.Config.Config + config.Group.Return.Notifications = false + c, err := cluster.NewConsumer(strings.Split(client.Config.Servers, ","), client.Config.GroupId, strings.Split(client.Config.Topics, ","), config) if err != nil { return nil, err } else { client.consumer = c - go func(c *cluster.Consumer) { - errors := c.Errors() - notify := c.Notifications() - for { - select { - case err := <-errors: - if err != nil { - glog.Error(err) - } - - case <-notify: - } + } + } + errorsChan := client.consumer.Errors() + notifyChan := client.consumer.Notifications() + messageChan := client.consumer.Messages() + for { + select { + case msg := <- messageChan: + if client.Config.AutoMarkOffset { + client.consumer.MarkOffset(msg, "") } - }(client.consumer) + return &Message { + Value : msg.Value, + Key : msg.Key, + Topic : msg.Topic, + Partition : int(msg.Partition), + Offset : int(msg.Offset), + client : client, + consumerMsg : msg, + }, nil + + case err := <-errorsChan: + if err != nil { + return nil, err + } + + case <-notifyChan: } } - msg := <- client.consumer.Messages() - if client.Config.AutoMarkOffset { - client.consumer.MarkOffset(msg, "") - } - return &Message { - Value : msg.Value, - Key : msg.Key, - Topic : msg.Topic, - Partition : int(msg.Partition), - Offset : int(msg.Offset), - client : client, - consumerMsg : msg, - }, nil + return nil, errors.New("unknown error") } // Send data to kafka in synchronized way. @@ -181,24 +183,24 @@ func (client *Client) AsyncSend(message *Message) error { return err } else { client.asyncProducer = p - go func(p sarama.AsyncProducer) { - errors := p.Errors() - success := p.Successes() - for { - select { - case err := <-errors: - if err != nil { - glog.Error(err) - } - case <-success: - } - } - }(client.asyncProducer) + //go func(p sarama.AsyncProducer) { + // errors := p.Errors() + // success := p.Successes() + // for { + // select { + // case err := <-errors: + // if err != nil { + // glog.Error(err) + // } + // case <-success: + // } + // } + //}(client.asyncProducer) } } for _, topic := range strings.Split(client.Config.Topics, ",") { - msg := messageToProducerMessage(message) + msg := messageToProducerMessage(message) msg.Topic = topic client.asyncProducer.Input() <- msg } diff --git a/geg/other/test.go b/geg/other/test.go index bbf51d3ab..d7479fe51 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -2,10 +2,22 @@ package main import ( "fmt" - "gitee.com/johng/gf/g/util/gregex" + "time" ) +var c = make(chan int, 10) + +func Chan() chan int { + fmt.Println("yes chan") + return c +} func main() { - fmt.Println(gregex.Quote(`/user/list/1.html`)) + for { + select { + case <- Chan(): + default: + time.Sleep(time.Second) + } + } } \ No newline at end of file