完善gkafka包测试

This commit is contained in:
john
2018-06-19 19:32:36 +08:00
parent 1836b7ba6f
commit e8a1543e46
3 changed files with 37 additions and 30 deletions

View File

@ -42,28 +42,28 @@ type Message struct {
// New a kafka client.
func New(config Config) *Client {
func New(config *Config) *Client {
return &Client {
Config : config,
}
}
// New a default configuration object.
func NewConfig() *Config {
config := &Config{}
config.Config = *sarama.NewConfig()
config.Config = *sarama.NewConfig()
// default config for consumer
config.Consumer.Return.Errors = true
if config.Consumer.Offsets.Initial == 0 {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
if config.Consumer.Offsets.CommitInterval == 0 {
config.Consumer.Offsets.CommitInterval = 1 * time.Second
}
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.CommitInterval = 1 * time.Second
// default config for producer
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
if config.Producer.Timeout == 0 {
config.Producer.Timeout = 5 * time.Second
}
return &Client {
Config : &config,
}
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
return config
}
// Close client.
@ -96,7 +96,10 @@ func (client *Client) Receive() (*Message, error) {
for {
select {
case err := <-errors:
glog.Error(err)
if err != nil {
glog.Error(err)
}
case <-notify:
}
}
@ -147,7 +150,9 @@ func (client *Client) AsyncSend(message *Message) error {
for {
select {
case err := <-errors:
glog.Error(err)
if err != nil {
glog.Error(err)
}
case <-success:
}
}

View File

@ -6,15 +6,16 @@ import (
)
func main () {
client := gkafka.New(gkafka.Config{
GroupId : "group_1",
Servers : "localhost:9092",
Topics : "abc",
})
config := gkafka.NewConfig()
config.GroupId = "group_2"
config.Servers = "localhost:9092"
config.Topics = "abc"
client := gkafka.New(config)
defer client.Close()
for {
msg, err := client.Receive()
fmt.Println(err)
fmt.Println(string(msg.Value))
fmt.Printf("value: %s, err: %v", string(msg.Value), err)
}
}

View File

@ -6,10 +6,11 @@ import (
)
func main () {
client := gkafka.New(gkafka.Config{
Servers : "localhost:9092",
Topics : "abc",
})
config := gkafka.NewConfig()
config.Servers = "localhost:9092"
config.Topics = "abc"
client := gkafka.New(config)
defer client.Close()
err := client.SyncSend(&gkafka.Message{Value: []byte("111")})
fmt.Println(err)