mirror of
https://gitee.com/johng/gf
synced 2026-07-03 03:39:35 +08:00
200 lines
4.8 KiB
Go
200 lines
4.8 KiB
Go
package sarama
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"testing"
|
|
)
|
|
|
|
func TestSyncProducer(t *testing.T) {
|
|
seedBroker := NewMockBroker(t, 1)
|
|
leader := NewMockBroker(t, 2)
|
|
|
|
metadataResponse := new(MetadataResponse)
|
|
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
seedBroker.Returns(metadataResponse)
|
|
|
|
prodSuccess := new(ProduceResponse)
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
for i := 0; i < 10; i++ {
|
|
leader.Returns(prodSuccess)
|
|
}
|
|
|
|
producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
for i := 0; i < 10; i++ {
|
|
msg := &ProducerMessage{
|
|
Topic: "my_topic",
|
|
Value: StringEncoder(TestMessage),
|
|
Metadata: "test",
|
|
}
|
|
|
|
partition, offset, err := producer.SendMessage(msg)
|
|
|
|
if partition != 0 || msg.Partition != partition {
|
|
t.Error("Unexpected partition")
|
|
}
|
|
if offset != 0 || msg.Offset != offset {
|
|
t.Error("Unexpected offset")
|
|
}
|
|
if str, ok := msg.Metadata.(string); !ok || str != "test" {
|
|
t.Error("Unexpected metadata")
|
|
}
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
safeClose(t, producer)
|
|
leader.Close()
|
|
seedBroker.Close()
|
|
}
|
|
|
|
func TestSyncProducerBatch(t *testing.T) {
|
|
seedBroker := NewMockBroker(t, 1)
|
|
leader := NewMockBroker(t, 2)
|
|
|
|
metadataResponse := new(MetadataResponse)
|
|
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
seedBroker.Returns(metadataResponse)
|
|
|
|
prodSuccess := new(ProduceResponse)
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
leader.Returns(prodSuccess)
|
|
|
|
config := NewConfig()
|
|
config.Producer.Flush.Messages = 3
|
|
config.Producer.Return.Successes = true
|
|
producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = producer.SendMessages([]*ProducerMessage{
|
|
{
|
|
Topic: "my_topic",
|
|
Value: StringEncoder(TestMessage),
|
|
Metadata: "test",
|
|
},
|
|
{
|
|
Topic: "my_topic",
|
|
Value: StringEncoder(TestMessage),
|
|
Metadata: "test",
|
|
},
|
|
{
|
|
Topic: "my_topic",
|
|
Value: StringEncoder(TestMessage),
|
|
Metadata: "test",
|
|
},
|
|
})
|
|
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
safeClose(t, producer)
|
|
leader.Close()
|
|
seedBroker.Close()
|
|
}
|
|
|
|
func TestConcurrentSyncProducer(t *testing.T) {
|
|
seedBroker := NewMockBroker(t, 1)
|
|
leader := NewMockBroker(t, 2)
|
|
|
|
metadataResponse := new(MetadataResponse)
|
|
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
seedBroker.Returns(metadataResponse)
|
|
|
|
prodSuccess := new(ProduceResponse)
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
leader.Returns(prodSuccess)
|
|
|
|
config := NewConfig()
|
|
config.Producer.Flush.Messages = 100
|
|
config.Producer.Return.Successes = true
|
|
producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
for i := 0; i < 100; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}
|
|
partition, _, err := producer.SendMessage(msg)
|
|
if partition != 0 {
|
|
t.Error("Unexpected partition")
|
|
}
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
safeClose(t, producer)
|
|
leader.Close()
|
|
seedBroker.Close()
|
|
}
|
|
|
|
func TestSyncProducerToNonExistingTopic(t *testing.T) {
|
|
broker := NewMockBroker(t, 1)
|
|
|
|
metadataResponse := new(MetadataResponse)
|
|
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
|
|
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
|
|
broker.Returns(metadataResponse)
|
|
|
|
config := NewConfig()
|
|
config.Metadata.Retry.Max = 0
|
|
config.Producer.Retry.Max = 0
|
|
config.Producer.Return.Successes = true
|
|
|
|
producer, err := NewSyncProducer([]string{broker.Addr()}, config)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
metadataResponse = new(MetadataResponse)
|
|
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
|
|
broker.Returns(metadataResponse)
|
|
|
|
_, _, err = producer.SendMessage(&ProducerMessage{Topic: "unknown"})
|
|
if err != ErrUnknownTopicOrPartition {
|
|
t.Error("Uxpected ErrUnknownTopicOrPartition, found:", err)
|
|
}
|
|
|
|
safeClose(t, producer)
|
|
broker.Close()
|
|
}
|
|
|
|
// This example shows the basic usage pattern of the SyncProducer.
|
|
func ExampleSyncProducer() {
|
|
producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
|
|
if err != nil {
|
|
log.Fatalln(err)
|
|
}
|
|
defer func() {
|
|
if err := producer.Close(); err != nil {
|
|
log.Fatalln(err)
|
|
}
|
|
}()
|
|
|
|
msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
|
|
partition, offset, err := producer.SendMessage(msg)
|
|
if err != nil {
|
|
log.Printf("FAILED to send message: %s\n", err)
|
|
} else {
|
|
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
|
|
}
|
|
}
|