完善gkafka.Config默认值

This commit is contained in:
john
2018-06-15 18:50:47 +08:00
parent d407e8dca7
commit 23e8afdc12

View File

@ -25,7 +25,7 @@ type Config struct {
// Kafka Client(Consumer/SyncProducer/AsyncProducer)
type Client struct {
config *Config
Config *Config
consumer *cluster.Consumer
syncProducer sarama.SyncProducer
asyncProducer sarama.AsyncProducer
@ -45,16 +45,24 @@ type Message struct {
func New(config Config) *Client {
config.Config = *sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// default config for consumer
config.Consumer.Return.Errors = true
if config.Consumer.Offsets.Initial == 0 {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
if config.Consumer.Offsets.CommitInterval == 0 {
config.Consumer.Offsets.CommitInterval = 1 * time.Second
}
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
// default config for producer
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
if config.Producer.Timeout == 0 {
config.Producer.Timeout = 5 * time.Second
}
return &Client {
config : &config,
Config : &config,
}
}
@ -74,10 +82,10 @@ func (client *Client) Close() {
// Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.
func (client *Client) Receive() (*Message, error) {
config := cluster.NewConfig()
config.Config = client.config.Config
config.Config = client.Config.Config
config.Group.Return.Notifications = true
if client.consumer == nil {
c, err := cluster.NewConsumer(strings.Split(client.config.Servers, ","), client.config.GroupId, strings.Split(client.config.Topics, ","), config)
c, err := cluster.NewConsumer(strings.Split(client.Config.Servers, ","), client.Config.GroupId, strings.Split(client.Config.Topics, ","), config)
if err != nil {
return nil, err
} else {
@ -110,13 +118,13 @@ func (client *Client) Receive() (*Message, error) {
// Send data to kafka in synchronized way.
func (client *Client) SyncSend(message *Message) error {
if client.syncProducer == nil {
if p, err := sarama.NewSyncProducer(strings.Split(client.config.Servers, ","), &client.config.Config); err != nil {
if p, err := sarama.NewSyncProducer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil {
return err
} else {
client.syncProducer = p
}
}
for _, topic := range strings.Split(client.config.Topics, ",") {
for _, topic := range strings.Split(client.Config.Topics, ",") {
msg := messageToProducerMessage(message)
msg.Topic = topic
if _, _, err := client.syncProducer.SendMessage(msg); err != nil {
@ -129,7 +137,7 @@ func (client *Client) SyncSend(message *Message) error {
// Send data to kafka in asynchronized way.
func (client *Client) AsyncSend(message *Message) error {
if client.asyncProducer == nil {
if p, err := sarama.NewAsyncProducer(strings.Split(client.config.Servers, ","), &client.config.Config); err != nil {
if p, err := sarama.NewAsyncProducer(strings.Split(client.Config.Servers, ","), &client.Config.Config); err != nil {
return err
} else {
client.asyncProducer = p
@ -147,7 +155,7 @@ func (client *Client) AsyncSend(message *Message) error {
}
}
for _, topic := range strings.Split(client.config.Topics, ",") {
for _, topic := range strings.Split(client.Config.Topics, ",") {
msg := messageToProducerMessage(message)
msg.Topic = topic
client.asyncProducer.Input() <- msg