完成kafka客户端开发

This commit is contained in:
john
2018-06-15 18:31:23 +08:00
parent c51d9b9359
commit d407e8dca7
5 changed files with 327 additions and 0 deletions

167
g/database/gkafka/gkafka.go Normal file
View File

@ -0,0 +1,167 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Kafka客户端.
package gkafka
import (
"time"
"strings"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"gitee.com/johng/gf/g/os/glog"
)
// kafka Client based on sarama.Config
type Config struct {
GroupId string // group id for consumer.
Servers string // server list, multiple servers joined by ','.
Topics string // topic list, multiple topics joined by ','.
sarama.Config
}
// Kafka Client(Consumer/SyncProducer/AsyncProducer)
type Client struct {
config *Config
consumer *cluster.Consumer
syncProducer sarama.SyncProducer
asyncProducer sarama.AsyncProducer
}
// Kafka Message.
type Message struct {
Value []byte
Key []byte
Topic string
Partition int
Offset int
}
// New a kafka client.
func New(config Config) *Client {
config.Config = *sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
return &Client {
config : &config,
}
}
// Close client.
func (client *Client) Close() {
if client.consumer != nil {
client.consumer.Close()
}
if client.syncProducer != nil {
client.syncProducer.Close()
}
if client.asyncProducer != nil {
client.asyncProducer.Close()
}
}
// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.
func (client *Client) Receive() (*Message, error) {
config := cluster.NewConfig()
config.Config = client.config.Config
config.Group.Return.Notifications = true
if client.consumer == nil {
c, err := cluster.NewConsumer(strings.Split(client.config.Servers, ","), client.config.GroupId, strings.Split(client.config.Topics, ","), config)
if err != nil {
return nil, err
} else {
client.consumer = c
go func(c *cluster.Consumer) {
errors := c.Errors()
notify := c.Notifications()
for {
select {
case err := <-errors:
glog.Error(err)
case <-notify:
}
}
}(client.consumer)
}
}
msg := <- client.consumer.Messages()
client.consumer.MarkOffset(msg, "")
return &Message {
Value : msg.Value,
Key : msg.Key,
Topic : msg.Topic,
Partition : int(msg.Partition),
Offset : int(msg.Offset),
}, nil
}
// Send data to kafka in synchronized way.
func (client *Client) SyncSend(message *Message) error {
if client.syncProducer == nil {
if p, err := sarama.NewSyncProducer(strings.Split(client.config.Servers, ","), &client.config.Config); err != nil {
return err
} else {
client.syncProducer = p
}
}
for _, topic := range strings.Split(client.config.Topics, ",") {
msg := messageToProducerMessage(message)
msg.Topic = topic
if _, _, err := client.syncProducer.SendMessage(msg); err != nil {
return err
}
}
return nil
}
// Send data to kafka in asynchronized way.
func (client *Client) AsyncSend(message *Message) error {
if client.asyncProducer == nil {
if p, err := sarama.NewAsyncProducer(strings.Split(client.config.Servers, ","), &client.config.Config); err != nil {
return err
} else {
client.asyncProducer = p
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
glog.Error(err)
case <-success:
}
}
}(client.asyncProducer)
}
}
for _, topic := range strings.Split(client.config.Topics, ",") {
msg := messageToProducerMessage(message)
msg.Topic = topic
client.asyncProducer.Input() <- msg
}
return nil
}
// Convert *gkafka.Message to *sarama.ProducerMessage
func messageToProducerMessage(message *Message) *sarama.ProducerMessage {
return &sarama.ProducerMessage {
Topic : message.Topic,
Key : sarama.ByteEncoder(message.Key),
Value : sarama.ByteEncoder(message.Value),
Partition : int32(message.Partition),
Offset : int64(message.Offset),
}
}

View File

@ -0,0 +1,20 @@
package main
import (
"gitee.com/johng/gf/g/database/gkafka"
"fmt"
)
func main () {
client := gkafka.New(gkafka.Config{
GroupId : "group_1",
Servers : "localhost:9092",
Topics : "abc",
})
defer client.Close()
for {
msg, err := client.Receive()
fmt.Println(err)
fmt.Println(string(msg.Value))
}
}

View File

@ -0,0 +1,16 @@
package main
import (
"gitee.com/johng/gf/g/database/gkafka"
"fmt"
)
func main () {
client := gkafka.New(gkafka.Config{
Servers : "localhost:9092",
Topics : "abc",
})
defer client.Close()
err := client.SyncSend(&gkafka.Message{Value: []byte("111")})
fmt.Println(err)
}

124
geg/database/kafka/kafka.go Normal file
View File

@ -0,0 +1,124 @@
package main
import (
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
)
var (
topics = "abc"
)
func main() {
for {
fmt.Println("time to check")
syncProducer()
consumer()
time.Sleep(time.Second)
}
}
// consumer 消费者
func consumer() {
groupID := "group-12345"
config := cluster.NewConfig()
config.Group.Return.Notifications = true
config.Consumer.Return.Errors = true
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest
c, err := cluster.NewConsumer(strings.Split("localhost:9092", ","), groupID, strings.Split(topics, ","), config)
if err != nil {
fmt.Errorf("Failed open consumer: %v", err)
return
}
defer c.Close()
go func(c *cluster.Consumer) {
errors := c.Errors()
notify := c.Notifications()
for {
select {
case err := <-errors:
fmt.Println(err)
case <-notify:
}
}
}(c)
for msg := range c.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
c.MarkOffset(msg, "")
}
}
// syncProducer 同步生产者
// 并发量小时,可以用这种方式
func syncProducer() {
config := sarama.NewConfig()
// config.Producer.RequiredAcks = sarama.WaitForAll
// config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
defer p.Close()
if err != nil {
fmt.Println(err)
return
}
v := "sync: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
msg := &sarama.ProducerMessage{
Topic: topics,
Value: sarama.ByteEncoder(v),
}
if _, _, err := p.SendMessage(msg); err != nil {
fmt.Println(err)
return
}
}
// asyncProducer 异步生产者
// 并发量大时,必须采用这种方式
func asyncProducer() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewAsyncProducer(strings.Split("localhost:9092", ","), config)
defer p.Close()
if err != nil {
return
}
//必须有这个匿名函数内容
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
if err != nil {
fmt.Println(err)
}
case <-success:
}
}
}(p)
v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
msg := &sarama.ProducerMessage{
Topic: topics,
Value: sarama.ByteEncoder(v),
}
p.Input() <- msg
}