mirror of
https://gitee.com/johng/gf
synced 2026-07-04 04:52:48 +08:00
新增go modules支持,自行管理第三方包依赖,方便开发者使用
This commit is contained in:
4
third/github.com/johng-cn/sarama-cluster/.gitignore
vendored
Normal file
4
third/github.com/johng-cn/sarama-cluster/.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
*.log
|
||||
*.pid
|
||||
kafka*/
|
||||
vendor/
|
||||
18
third/github.com/johng-cn/sarama-cluster/.travis.yml
Normal file
18
third/github.com/johng-cn/sarama-cluster/.travis.yml
Normal file
@ -0,0 +1,18 @@
|
||||
sudo: false
|
||||
language: go
|
||||
go:
|
||||
- 1.10.x
|
||||
- 1.9.x
|
||||
install:
|
||||
- go get -u github.com/golang/dep/cmd/dep
|
||||
- dep ensure
|
||||
env:
|
||||
- SCALA_VERSION=2.12 KAFKA_VERSION=0.11.0.1
|
||||
- SCALA_VERSION=2.12 KAFKA_VERSION=1.0.1
|
||||
- SCALA_VERSION=2.12 KAFKA_VERSION=1.1.0
|
||||
script:
|
||||
- make default test-race
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
- oracle-java8-set-default
|
||||
22
third/github.com/johng-cn/sarama-cluster/LICENSE
Normal file
22
third/github.com/johng-cn/sarama-cluster/LICENSE
Normal file
@ -0,0 +1,22 @@
|
||||
(The MIT License)
|
||||
|
||||
Copyright (c) 2017 Black Square Media Ltd
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
'Software'), to deal in the Software without restriction, including
|
||||
without limitation the rights to use, copy, modify, merge, publish,
|
||||
distribute, sublicense, and/or sell copies of the Software, and to
|
||||
permit persons to whom the Software is furnished to do so, subject to
|
||||
the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be
|
||||
included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
35
third/github.com/johng-cn/sarama-cluster/Makefile
Normal file
35
third/github.com/johng-cn/sarama-cluster/Makefile
Normal file
@ -0,0 +1,35 @@
|
||||
SCALA_VERSION?= 2.12
|
||||
KAFKA_VERSION?= 1.1.0
|
||||
KAFKA_DIR= kafka_$(SCALA_VERSION)-$(KAFKA_VERSION)
|
||||
KAFKA_SRC= https://archive.apache.org/dist/kafka/$(KAFKA_VERSION)/$(KAFKA_DIR).tgz
|
||||
KAFKA_ROOT= testdata/$(KAFKA_DIR)
|
||||
PKG=$(shell go list ./... | grep -v vendor)
|
||||
|
||||
default: vet test
|
||||
|
||||
vet:
|
||||
go vet $(PKG)
|
||||
|
||||
test: testdeps
|
||||
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60
|
||||
|
||||
test-verbose: testdeps
|
||||
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v
|
||||
|
||||
test-race: testdeps
|
||||
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v -race
|
||||
|
||||
testdeps: $(KAFKA_ROOT)
|
||||
|
||||
doc: README.md
|
||||
|
||||
.PHONY: test testdeps vet doc
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
|
||||
$(KAFKA_ROOT):
|
||||
@mkdir -p $(dir $@)
|
||||
cd $(dir $@) && curl -sSL $(KAFKA_SRC) | tar xz
|
||||
|
||||
README.md: README.md.tpl $(wildcard *.go)
|
||||
becca -package $(subst $(GOPATH)/src/,,$(PWD))
|
||||
151
third/github.com/johng-cn/sarama-cluster/README.md
Normal file
151
third/github.com/johng-cn/sarama-cluster/README.md
Normal file
@ -0,0 +1,151 @@
|
||||
# Sarama Cluster
|
||||
|
||||
[](https://godoc.org/github.com/bsm/sarama-cluster)
|
||||
[](https://travis-ci.org/bsm/sarama-cluster)
|
||||
[](https://goreportcard.com/report/github.com/bsm/sarama-cluster)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
|
||||
Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
|
||||
|
||||
## Documentation
|
||||
|
||||
Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
|
||||
|
||||
## Examples
|
||||
|
||||
Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
|
||||
topics and partitions are all passed to the single channel:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
cluster "gitee.com/johng/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
// init (custom) config, enable errors and notifications
|
||||
config := cluster.NewConfig()
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Group.Return.Notifications = true
|
||||
|
||||
// init consumer
|
||||
brokers := []string{"127.0.0.1:9092"}
|
||||
topics := []string{"my_topic", "other_topic"}
|
||||
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
|
||||
// trap SIGINT to trigger a shutdown.
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt)
|
||||
|
||||
// consume errors
|
||||
go func() {
|
||||
for err := range consumer.Errors() {
|
||||
log.Printf("Error: %s\n", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
// consume notifications
|
||||
go func() {
|
||||
for ntf := range consumer.Notifications() {
|
||||
log.Printf("Rebalanced: %+v\n", ntf)
|
||||
}
|
||||
}()
|
||||
|
||||
// consume messages, watch signals
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-consumer.Messages():
|
||||
if ok {
|
||||
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
|
||||
consumer.MarkOffset(msg, "") // mark message as processed
|
||||
}
|
||||
case <-signals:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
|
||||
consumers:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
cluster "gitee.com/johng/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
// init (custom) config, set mode to ConsumerModePartitions
|
||||
config := cluster.NewConfig()
|
||||
config.Group.Mode = cluster.ConsumerModePartitions
|
||||
|
||||
// init consumer
|
||||
brokers := []string{"127.0.0.1:9092"}
|
||||
topics := []string{"my_topic", "other_topic"}
|
||||
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
|
||||
// trap SIGINT to trigger a shutdown.
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt)
|
||||
|
||||
// consume partitions
|
||||
for {
|
||||
select {
|
||||
case part, ok := <-consumer.Partitions():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// start a separate goroutine to consume messages
|
||||
go func(pc cluster.PartitionConsumer) {
|
||||
for msg := range pc.Messages() {
|
||||
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
|
||||
consumer.MarkOffset(msg, "") // mark message as processed
|
||||
}
|
||||
}(part)
|
||||
case <-signals:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Running tests
|
||||
|
||||
You need to install Ginkgo & Gomega to run tests. Please see
|
||||
http://onsi.github.io/ginkgo for more details.
|
||||
|
||||
To run tests, call:
|
||||
|
||||
$ make test
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Consumer not receiving any messages?
|
||||
|
||||
By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
|
||||
|
||||
If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
|
||||
67
third/github.com/johng-cn/sarama-cluster/README.md.tpl
Normal file
67
third/github.com/johng-cn/sarama-cluster/README.md.tpl
Normal file
@ -0,0 +1,67 @@
|
||||
# Sarama Cluster
|
||||
|
||||
[](https://godoc.org/github.com/bsm/sarama-cluster)
|
||||
[](https://travis-ci.org/bsm/sarama-cluster)
|
||||
[](https://goreportcard.com/report/github.com/bsm/sarama-cluster)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
|
||||
Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
|
||||
|
||||
## Documentation
|
||||
|
||||
Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
|
||||
|
||||
## Examples
|
||||
|
||||
Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
|
||||
topics and partitions are all passed to the single channel:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
cluster "gitee.com/johng/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
func main() {{ "ExampleConsumer" | code }}
|
||||
```
|
||||
|
||||
Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
|
||||
consumers:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
cluster "gitee.com/johng/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
func main() {{ "ExampleConsumer_Partitions" | code }}
|
||||
```
|
||||
|
||||
## Running tests
|
||||
|
||||
You need to install Ginkgo & Gomega to run tests. Please see
|
||||
http://onsi.github.io/ginkgo for more details.
|
||||
|
||||
To run tests, call:
|
||||
|
||||
$ make test
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Consumer not receiving any messages?
|
||||
|
||||
By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
|
||||
|
||||
If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
|
||||
170
third/github.com/johng-cn/sarama-cluster/balancer.go
Normal file
170
third/github.com/johng-cn/sarama-cluster/balancer.go
Normal file
@ -0,0 +1,170 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// NotificationType defines the type of notification
|
||||
type NotificationType uint8
|
||||
|
||||
// String describes the notification type
|
||||
func (t NotificationType) String() string {
|
||||
switch t {
|
||||
case RebalanceStart:
|
||||
return "rebalance start"
|
||||
case RebalanceOK:
|
||||
return "rebalance OK"
|
||||
case RebalanceError:
|
||||
return "rebalance error"
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
const (
|
||||
UnknownNotification NotificationType = iota
|
||||
RebalanceStart
|
||||
RebalanceOK
|
||||
RebalanceError
|
||||
)
|
||||
|
||||
// Notification are state events emitted by the consumers on rebalance
|
||||
type Notification struct {
|
||||
// Type exposes the notification type
|
||||
Type NotificationType
|
||||
|
||||
// Claimed contains topic/partitions that were claimed by this rebalance cycle
|
||||
Claimed map[string][]int32
|
||||
|
||||
// Released contains topic/partitions that were released as part of this rebalance cycle
|
||||
Released map[string][]int32
|
||||
|
||||
// Current are topic/partitions that are currently claimed to the consumer
|
||||
Current map[string][]int32
|
||||
}
|
||||
|
||||
func newNotification(current map[string][]int32) *Notification {
|
||||
return &Notification{
|
||||
Type: RebalanceStart,
|
||||
Current: current,
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notification) success(current map[string][]int32) *Notification {
|
||||
o := &Notification{
|
||||
Type: RebalanceOK,
|
||||
Claimed: make(map[string][]int32),
|
||||
Released: make(map[string][]int32),
|
||||
Current: current,
|
||||
}
|
||||
for topic, partitions := range current {
|
||||
o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
|
||||
}
|
||||
for topic, partitions := range n.Current {
|
||||
o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type topicInfo struct {
|
||||
Partitions []int32
|
||||
MemberIDs []string
|
||||
}
|
||||
|
||||
func (info topicInfo) Perform(s Strategy) map[string][]int32 {
|
||||
if s == StrategyRoundRobin {
|
||||
return info.RoundRobin()
|
||||
}
|
||||
return info.Ranges()
|
||||
}
|
||||
|
||||
func (info topicInfo) Ranges() map[string][]int32 {
|
||||
sort.Strings(info.MemberIDs)
|
||||
|
||||
mlen := len(info.MemberIDs)
|
||||
plen := len(info.Partitions)
|
||||
res := make(map[string][]int32, mlen)
|
||||
|
||||
for pos, memberID := range info.MemberIDs {
|
||||
n, i := float64(plen)/float64(mlen), float64(pos)
|
||||
min := int(math.Floor(i*n + 0.5))
|
||||
max := int(math.Floor((i+1)*n + 0.5))
|
||||
sub := info.Partitions[min:max]
|
||||
if len(sub) > 0 {
|
||||
res[memberID] = sub
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (info topicInfo) RoundRobin() map[string][]int32 {
|
||||
sort.Strings(info.MemberIDs)
|
||||
|
||||
mlen := len(info.MemberIDs)
|
||||
res := make(map[string][]int32, mlen)
|
||||
for i, pnum := range info.Partitions {
|
||||
memberID := info.MemberIDs[i%mlen]
|
||||
res[memberID] = append(res[memberID], pnum)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type balancer struct {
|
||||
client sarama.Client
|
||||
topics map[string]topicInfo
|
||||
}
|
||||
|
||||
func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
|
||||
balancer := newBalancer(client)
|
||||
for memberID, meta := range members {
|
||||
for _, topic := range meta.Topics {
|
||||
if err := balancer.Topic(topic, memberID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return balancer, nil
|
||||
}
|
||||
|
||||
func newBalancer(client sarama.Client) *balancer {
|
||||
return &balancer{
|
||||
client: client,
|
||||
topics: make(map[string]topicInfo),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *balancer) Topic(name string, memberID string) error {
|
||||
topic, ok := r.topics[name]
|
||||
if !ok {
|
||||
nums, err := r.client.Partitions(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
topic = topicInfo{
|
||||
Partitions: nums,
|
||||
MemberIDs: make([]string, 0, 1),
|
||||
}
|
||||
}
|
||||
topic.MemberIDs = append(topic.MemberIDs, memberID)
|
||||
r.topics[name] = topic
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
|
||||
res := make(map[string]map[string][]int32, 1)
|
||||
for topic, info := range r.topics {
|
||||
for memberID, partitions := range info.Perform(s) {
|
||||
if _, ok := res[memberID]; !ok {
|
||||
res[memberID] = make(map[string][]int32, 1)
|
||||
}
|
||||
res[memberID][topic] = partitions
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
130
third/github.com/johng-cn/sarama-cluster/balancer_test.go
Normal file
130
third/github.com/johng-cn/sarama-cluster/balancer_test.go
Normal file
@ -0,0 +1,130 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/ginkgo"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/ginkgo/extensions/table"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Notification", func() {
|
||||
|
||||
It("should init and convert", func() {
|
||||
n := newNotification(map[string][]int32{
|
||||
"a": {1, 2, 3},
|
||||
"b": {4, 5},
|
||||
"c": {1, 2},
|
||||
})
|
||||
Expect(n).To(Equal(&Notification{
|
||||
Type: RebalanceStart,
|
||||
Current: map[string][]int32{"a": {1, 2, 3}, "b": {4, 5}, "c": {1, 2}},
|
||||
}))
|
||||
|
||||
o := n.success(map[string][]int32{
|
||||
"a": {3, 4},
|
||||
"b": {1, 2, 3, 4},
|
||||
"d": {3, 4},
|
||||
})
|
||||
Expect(o).To(Equal(&Notification{
|
||||
Type: RebalanceOK,
|
||||
Claimed: map[string][]int32{"a": {4}, "b": {1, 2, 3}, "d": {3, 4}},
|
||||
Released: map[string][]int32{"a": {1, 2}, "b": {5}, "c": {1, 2}},
|
||||
Current: map[string][]int32{"a": {3, 4}, "b": {1, 2, 3, 4}, "d": {3, 4}},
|
||||
}))
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
var _ = Describe("balancer", func() {
|
||||
var subject *balancer
|
||||
|
||||
BeforeEach(func() {
|
||||
client := &mockClient{
|
||||
topics: map[string][]int32{
|
||||
"one": {0, 1, 2, 3},
|
||||
"two": {0, 1, 2},
|
||||
"three": {0, 1},
|
||||
},
|
||||
}
|
||||
|
||||
var err error
|
||||
subject, err = newBalancerFromMeta(client, map[string]sarama.ConsumerGroupMemberMetadata{
|
||||
"b": {Topics: []string{"three", "one"}},
|
||||
"a": {Topics: []string{"one", "two"}},
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should parse from meta data", func() {
|
||||
Expect(subject.topics).To(HaveLen(3))
|
||||
})
|
||||
|
||||
It("should perform", func() {
|
||||
Expect(subject.Perform(StrategyRange)).To(Equal(map[string]map[string][]int32{
|
||||
"a": {"one": {0, 1}, "two": {0, 1, 2}},
|
||||
"b": {"one": {2, 3}, "three": {0, 1}},
|
||||
}))
|
||||
|
||||
Expect(subject.Perform(StrategyRoundRobin)).To(Equal(map[string]map[string][]int32{
|
||||
"a": {"one": {0, 2}, "two": {0, 1, 2}},
|
||||
"b": {"one": {1, 3}, "three": {0, 1}},
|
||||
}))
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
var _ = Describe("topicInfo", func() {
|
||||
|
||||
DescribeTable("Ranges",
|
||||
func(memberIDs []string, partitions []int32, expected map[string][]int32) {
|
||||
info := topicInfo{MemberIDs: memberIDs, Partitions: partitions}
|
||||
Expect(info.Ranges()).To(Equal(expected))
|
||||
},
|
||||
|
||||
Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{
|
||||
"M1": {0}, "M2": {1}, "M3": {2},
|
||||
}),
|
||||
Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{
|
||||
"M1": {0}, "M2": {1}, "M3": {2},
|
||||
}),
|
||||
Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{
|
||||
"M1": {0}, "M3": {1},
|
||||
}),
|
||||
Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{
|
||||
"M2": {0},
|
||||
}),
|
||||
Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{
|
||||
"M1": {0}, "M2": {1, 2}, "M3": {3},
|
||||
}),
|
||||
Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{
|
||||
"M1": {0, 2}, "M2": {4}, "M3": {6, 8},
|
||||
}),
|
||||
)
|
||||
|
||||
DescribeTable("RoundRobin",
|
||||
func(memberIDs []string, partitions []int32, expected map[string][]int32) {
|
||||
info := topicInfo{MemberIDs: memberIDs, Partitions: partitions}
|
||||
Expect(info.RoundRobin()).To(Equal(expected))
|
||||
},
|
||||
|
||||
Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{
|
||||
"M1": {0}, "M2": {1}, "M3": {2},
|
||||
}),
|
||||
Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{
|
||||
"M1": {0}, "M2": {1}, "M3": {2},
|
||||
}),
|
||||
Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{
|
||||
"M1": {0}, "M2": {1},
|
||||
}),
|
||||
Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{
|
||||
"M1": {0},
|
||||
}),
|
||||
Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{
|
||||
"M1": {0, 3}, "M2": {1}, "M3": {2},
|
||||
}),
|
||||
Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{
|
||||
"M1": {0, 6}, "M2": {2, 8}, "M3": {4},
|
||||
}),
|
||||
)
|
||||
|
||||
})
|
||||
50
third/github.com/johng-cn/sarama-cluster/client.go
Normal file
50
third/github.com/johng-cn/sarama-cluster/client.go
Normal file
@ -0,0 +1,50 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
var errClientInUse = errors.New("cluster: client is already used by another consumer")
|
||||
|
||||
// Client is a group client
|
||||
type Client struct {
|
||||
sarama.Client
|
||||
config Config
|
||||
|
||||
inUse uint32
|
||||
}
|
||||
|
||||
// NewClient creates a new client instance
|
||||
func NewClient(addrs []string, config *Config) (*Client, error) {
|
||||
if config == nil {
|
||||
config = NewConfig()
|
||||
}
|
||||
|
||||
if err := config.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := sarama.NewClient(addrs, &config.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Client{Client: client, config: *config}, nil
|
||||
}
|
||||
|
||||
// ClusterConfig returns the cluster configuration.
|
||||
func (c *Client) ClusterConfig() *Config {
|
||||
cfg := c.config
|
||||
return &cfg
|
||||
}
|
||||
|
||||
func (c *Client) claim() bool {
|
||||
return atomic.CompareAndSwapUint32(&c.inUse, 0, 1)
|
||||
}
|
||||
|
||||
func (c *Client) release() {
|
||||
atomic.CompareAndSwapUint32(&c.inUse, 1, 0)
|
||||
}
|
||||
31
third/github.com/johng-cn/sarama-cluster/client_test.go
Normal file
31
third/github.com/johng-cn/sarama-cluster/client_test.go
Normal file
@ -0,0 +1,31 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/ginkgo"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Client", func() {
|
||||
var subject *Client
|
||||
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
subject, err = NewClient(testKafkaAddrs, nil)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should not allow to share clients across multiple consumers", func() {
|
||||
c1, err := NewConsumerFromClient(subject, testGroup, testTopics)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer c1.Close()
|
||||
|
||||
_, err = NewConsumerFromClient(subject, testGroup, testTopics)
|
||||
Expect(err).To(MatchError("cluster: client is already used by another consumer"))
|
||||
|
||||
Expect(c1.Close()).To(Succeed())
|
||||
c2, err := NewConsumerFromClient(subject, testGroup, testTopics)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(c2.Close()).To(Succeed())
|
||||
})
|
||||
|
||||
})
|
||||
25
third/github.com/johng-cn/sarama-cluster/cluster.go
Normal file
25
third/github.com/johng-cn/sarama-cluster/cluster.go
Normal file
@ -0,0 +1,25 @@
|
||||
package cluster
|
||||
|
||||
// Strategy for partition to consumer assignement
|
||||
type Strategy string
|
||||
|
||||
const (
|
||||
// StrategyRange is the default and assigns partition ranges to consumers.
|
||||
// Example with six partitions and two consumers:
|
||||
// C1: [0, 1, 2]
|
||||
// C2: [3, 4, 5]
|
||||
StrategyRange Strategy = "range"
|
||||
|
||||
// StrategyRoundRobin assigns partitions by alternating over consumers.
|
||||
// Example with six partitions and two consumers:
|
||||
// C1: [0, 2, 4]
|
||||
// C2: [1, 3, 5]
|
||||
StrategyRoundRobin Strategy = "roundrobin"
|
||||
)
|
||||
|
||||
// Error instances are wrappers for internal errors with a context and
|
||||
// may be returned through the consumer's Errors() channel
|
||||
type Error struct {
|
||||
Ctx string
|
||||
error
|
||||
}
|
||||
201
third/github.com/johng-cn/sarama-cluster/cluster_test.go
Normal file
201
third/github.com/johng-cn/sarama-cluster/cluster_test.go
Normal file
@ -0,0 +1,201 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/ginkgo"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
const (
|
||||
testGroup = "sarama-cluster-group"
|
||||
testKafkaData = "/tmp/sarama-cluster-test"
|
||||
)
|
||||
|
||||
var (
|
||||
testKafkaRoot = "kafka_2.12-1.1.0"
|
||||
testKafkaAddrs = []string{"127.0.0.1:29092"}
|
||||
testTopics = []string{"topic-a", "topic-b"}
|
||||
|
||||
testClient sarama.Client
|
||||
testKafkaCmd, testZkCmd *exec.Cmd
|
||||
)
|
||||
|
||||
func init() {
|
||||
if dir := os.Getenv("KAFKA_DIR"); dir != "" {
|
||||
testKafkaRoot = dir
|
||||
}
|
||||
}
|
||||
|
||||
var _ = Describe("offsetInfo", func() {
|
||||
|
||||
It("should calculate next offset", func() {
|
||||
Expect(offsetInfo{-2, ""}.NextOffset(sarama.OffsetOldest)).To(Equal(sarama.OffsetOldest))
|
||||
Expect(offsetInfo{-2, ""}.NextOffset(sarama.OffsetNewest)).To(Equal(sarama.OffsetNewest))
|
||||
Expect(offsetInfo{-1, ""}.NextOffset(sarama.OffsetOldest)).To(Equal(sarama.OffsetOldest))
|
||||
Expect(offsetInfo{-1, ""}.NextOffset(sarama.OffsetNewest)).To(Equal(sarama.OffsetNewest))
|
||||
Expect(offsetInfo{0, ""}.NextOffset(sarama.OffsetOldest)).To(Equal(int64(0)))
|
||||
Expect(offsetInfo{100, ""}.NextOffset(sarama.OffsetOldest)).To(Equal(int64(100)))
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
var _ = Describe("int32Slice", func() {
|
||||
|
||||
It("should diff", func() {
|
||||
Expect(((int32Slice)(nil)).Diff(int32Slice{1, 3, 5})).To(BeNil())
|
||||
Expect(int32Slice{1, 3, 5}.Diff((int32Slice)(nil))).To(Equal([]int32{1, 3, 5}))
|
||||
Expect(int32Slice{1, 3, 5}.Diff(int32Slice{1, 3, 5})).To(BeNil())
|
||||
Expect(int32Slice{1, 3, 5}.Diff(int32Slice{1, 2, 3, 4, 5})).To(BeNil())
|
||||
Expect(int32Slice{1, 3, 5}.Diff(int32Slice{2, 3, 4})).To(Equal([]int32{1, 5}))
|
||||
Expect(int32Slice{1, 3, 5}.Diff(int32Slice{1, 4})).To(Equal([]int32{3, 5}))
|
||||
Expect(int32Slice{1, 3, 5}.Diff(int32Slice{2, 5})).To(Equal([]int32{1, 3}))
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
var _ = BeforeSuite(func() {
|
||||
testZkCmd = testCmd(
|
||||
testDataDir(testKafkaRoot, "bin", "kafka-run-class.sh"),
|
||||
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
|
||||
testDataDir("zookeeper.properties"),
|
||||
)
|
||||
|
||||
testKafkaCmd = testCmd(
|
||||
testDataDir(testKafkaRoot, "bin", "kafka-run-class.sh"),
|
||||
"-name", "kafkaServer", "kafka.Kafka",
|
||||
testDataDir("server.properties"),
|
||||
)
|
||||
|
||||
// Remove old test data before starting
|
||||
Expect(os.RemoveAll(testKafkaData)).NotTo(HaveOccurred())
|
||||
|
||||
Expect(os.MkdirAll(testKafkaData, 0777)).To(Succeed())
|
||||
Expect(testZkCmd.Start()).To(Succeed())
|
||||
Expect(testKafkaCmd.Start()).To(Succeed())
|
||||
|
||||
// Wait for client
|
||||
Eventually(func() error {
|
||||
var err error
|
||||
|
||||
// sync-producer requires Return.Successes set to true
|
||||
testConf := sarama.NewConfig()
|
||||
testConf.Producer.Return.Successes = true
|
||||
testClient, err = sarama.NewClient(testKafkaAddrs, testConf)
|
||||
return err
|
||||
}, "30s", "1s").Should(Succeed())
|
||||
|
||||
// Ensure we can retrieve partition info
|
||||
Eventually(func() error {
|
||||
_, err := testClient.Partitions(testTopics[0])
|
||||
return err
|
||||
}, "30s", "1s").Should(Succeed())
|
||||
|
||||
// Seed a few messages
|
||||
Expect(testSeed(1000, testTopics)).To(Succeed())
|
||||
})
|
||||
|
||||
var _ = AfterSuite(func() {
|
||||
if testClient != nil {
|
||||
_ = testClient.Close()
|
||||
}
|
||||
|
||||
_ = testKafkaCmd.Process.Kill()
|
||||
_ = testZkCmd.Process.Kill()
|
||||
_ = testKafkaCmd.Wait()
|
||||
_ = testZkCmd.Wait()
|
||||
_ = os.RemoveAll(testKafkaData)
|
||||
})
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
func TestSuite(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "sarama/cluster")
|
||||
}
|
||||
|
||||
func testDataDir(tokens ...string) string {
|
||||
tokens = append([]string{"testdata"}, tokens...)
|
||||
return filepath.Join(tokens...)
|
||||
}
|
||||
|
||||
func testSeed(n int, testTopics []string) error {
|
||||
producer, err := sarama.NewSyncProducerFromClient(testClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer producer.Close()
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
kv := sarama.StringEncoder(fmt.Sprintf("PLAINDATA-%08d", i))
|
||||
for _, t := range testTopics {
|
||||
msg := &sarama.ProducerMessage{Topic: t, Key: kv, Value: kv}
|
||||
if _, _, err := producer.SendMessage(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func testCmd(name string, arg ...string) *exec.Cmd {
|
||||
cmd := exec.Command(name, arg...)
|
||||
if testing.Verbose() || os.Getenv("CI") != "" {
|
||||
cmd.Stderr = os.Stderr
|
||||
cmd.Stdout = os.Stdout
|
||||
}
|
||||
cmd.Env = []string{"KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"}
|
||||
return cmd
|
||||
}
|
||||
|
||||
type testConsumerMessage struct {
|
||||
sarama.ConsumerMessage
|
||||
ConsumerID string
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
var _ sarama.Consumer = &mockConsumer{}
|
||||
var _ sarama.PartitionConsumer = &mockPartitionConsumer{}
|
||||
|
||||
type mockClient struct {
|
||||
sarama.Client
|
||||
|
||||
topics map[string][]int32
|
||||
}
|
||||
type mockConsumer struct{ sarama.Consumer }
|
||||
type mockPartitionConsumer struct {
|
||||
sarama.PartitionConsumer
|
||||
|
||||
Topic string
|
||||
Partition int32
|
||||
Offset int64
|
||||
}
|
||||
|
||||
func (m *mockClient) Partitions(t string) ([]int32, error) {
|
||||
pts, ok := m.topics[t]
|
||||
if !ok {
|
||||
return nil, sarama.ErrInvalidTopic
|
||||
}
|
||||
return pts, nil
|
||||
}
|
||||
|
||||
func (*mockConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
|
||||
if offset > -1 && offset < 1000 {
|
||||
return nil, sarama.ErrOffsetOutOfRange
|
||||
}
|
||||
return &mockPartitionConsumer{
|
||||
Topic: topic,
|
||||
Partition: partition,
|
||||
Offset: offset,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (*mockPartitionConsumer) Close() error { return nil }
|
||||
@ -0,0 +1,97 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
"gitee.com/johng/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
var (
|
||||
groupID = flag.String("group", "", "REQUIRED: The shared consumer group name")
|
||||
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
|
||||
topicList = flag.String("topics", "", "REQUIRED: The comma separated list of topics to consume")
|
||||
offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
|
||||
verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
|
||||
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
)
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
if *groupID == "" {
|
||||
printUsageErrorAndExit("You have to provide a -group name.")
|
||||
} else if *brokerList == "" {
|
||||
printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
|
||||
} else if *topicList == "" {
|
||||
printUsageErrorAndExit("You have to provide -topics as a comma-separated list.")
|
||||
}
|
||||
|
||||
// Init config
|
||||
config := cluster.NewConfig()
|
||||
if *verbose {
|
||||
sarama.Logger = logger
|
||||
} else {
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Group.Return.Notifications = true
|
||||
}
|
||||
|
||||
switch *offset {
|
||||
case "oldest":
|
||||
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||||
case "newest":
|
||||
config.Consumer.Offsets.Initial = sarama.OffsetNewest
|
||||
default:
|
||||
printUsageErrorAndExit("-offset should be `oldest` or `newest`")
|
||||
}
|
||||
|
||||
// Init consumer, consume errors & messages
|
||||
consumer, err := cluster.NewConsumer(strings.Split(*brokerList, ","), *groupID, strings.Split(*topicList, ","), config)
|
||||
if err != nil {
|
||||
printErrorAndExit(69, "Failed to start consumer: %s", err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
|
||||
// Create signal channel
|
||||
sigchan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigchan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Consume all channels, wait for signal to exit
|
||||
for {
|
||||
select {
|
||||
case msg, more := <-consumer.Messages():
|
||||
if more {
|
||||
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
|
||||
consumer.MarkOffset(msg, "")
|
||||
}
|
||||
case ntf, more := <-consumer.Notifications():
|
||||
if more {
|
||||
logger.Printf("Rebalanced: %+v\n", ntf)
|
||||
}
|
||||
case err, more := <-consumer.Errors():
|
||||
if more {
|
||||
logger.Printf("Error: %s\n", err.Error())
|
||||
}
|
||||
case <-sigchan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func printErrorAndExit(code int, format string, values ...interface{}) {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: "+format+"\n\n", values...)
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
func printUsageErrorAndExit(format string, values ...interface{}) {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: "+format+"\n\n", values...)
|
||||
flag.Usage()
|
||||
os.Exit(64)
|
||||
}
|
||||
146
third/github.com/johng-cn/sarama-cluster/config.go
Normal file
146
third/github.com/johng-cn/sarama-cluster/config.go
Normal file
@ -0,0 +1,146 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
var minVersion = sarama.V0_9_0_0
|
||||
|
||||
type ConsumerMode uint8
|
||||
|
||||
const (
|
||||
ConsumerModeMultiplex ConsumerMode = iota
|
||||
ConsumerModePartitions
|
||||
)
|
||||
|
||||
// Config extends sarama.Config with Group specific namespace
|
||||
type Config struct {
|
||||
sarama.Config
|
||||
|
||||
// Group is the namespace for group management properties
|
||||
Group struct {
|
||||
|
||||
// The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
|
||||
PartitionStrategy Strategy
|
||||
|
||||
// By default, messages and errors from the subscribed topics and partitions are all multiplexed and
|
||||
// made available through the consumer's Messages() and Errors() channels.
|
||||
//
|
||||
// Users who require low-level access can enable ConsumerModePartitions where individual partitions
|
||||
// are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions
|
||||
// themselves.
|
||||
Mode ConsumerMode
|
||||
|
||||
Offsets struct {
|
||||
Retry struct {
|
||||
// The number retries when committing offsets (defaults to 3).
|
||||
Max int
|
||||
}
|
||||
Synchronization struct {
|
||||
// The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance
|
||||
// NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration
|
||||
DwellTime time.Duration
|
||||
}
|
||||
}
|
||||
|
||||
Session struct {
|
||||
// The allowed session timeout for registered consumers (defaults to 30s).
|
||||
// Must be within the allowed server range.
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
Heartbeat struct {
|
||||
// Interval between each heartbeat (defaults to 3s). It should be no more
|
||||
// than 1/3rd of the Group.Session.Timeout setting
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
// Return specifies which group channels will be populated. If they are set to true,
|
||||
// you must read from the respective channels to prevent deadlock.
|
||||
Return struct {
|
||||
// If enabled, rebalance notification will be returned on the
|
||||
// Notifications channel (default disabled).
|
||||
Notifications bool
|
||||
}
|
||||
|
||||
Topics struct {
|
||||
// An additional whitelist of topics to subscribe to.
|
||||
Whitelist *regexp.Regexp
|
||||
// An additional blacklist of topics to avoid. If set, this will precede over
|
||||
// the Whitelist setting.
|
||||
Blacklist *regexp.Regexp
|
||||
}
|
||||
|
||||
Member struct {
|
||||
// Custom metadata to include when joining the group. The user data for all joined members
|
||||
// can be retrieved by sending a DescribeGroupRequest to the broker that is the
|
||||
// coordinator for the group.
|
||||
UserData []byte
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewConfig returns a new configuration instance with sane defaults.
|
||||
func NewConfig() *Config {
|
||||
c := &Config{
|
||||
Config: *sarama.NewConfig(),
|
||||
}
|
||||
c.Group.PartitionStrategy = StrategyRange
|
||||
c.Group.Offsets.Retry.Max = 3
|
||||
c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime
|
||||
c.Group.Session.Timeout = 30 * time.Second
|
||||
c.Group.Heartbeat.Interval = 3 * time.Second
|
||||
c.Config.Version = minVersion
|
||||
return c
|
||||
}
|
||||
|
||||
// Validate checks a Config instance. It will return a
|
||||
// sarama.ConfigurationError if the specified values don't make sense.
|
||||
func (c *Config) Validate() error {
|
||||
if c.Group.Heartbeat.Interval%time.Millisecond != 0 {
|
||||
sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
|
||||
}
|
||||
if c.Group.Session.Timeout%time.Millisecond != 0 {
|
||||
sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
|
||||
}
|
||||
if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin {
|
||||
sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.")
|
||||
}
|
||||
if !c.Version.IsAtLeast(minVersion) {
|
||||
sarama.Logger.Println("Version is not supported; 0.9. will be assumed.")
|
||||
c.Version = minVersion
|
||||
}
|
||||
if err := c.Config.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// validate the Group values
|
||||
switch {
|
||||
case c.Group.Offsets.Retry.Max < 0:
|
||||
return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0")
|
||||
case c.Group.Offsets.Synchronization.DwellTime <= 0:
|
||||
return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0")
|
||||
case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute:
|
||||
return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m")
|
||||
case c.Group.Heartbeat.Interval <= 0:
|
||||
return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0")
|
||||
case c.Group.Session.Timeout <= 0:
|
||||
return sarama.ConfigurationError("Group.Session.Timeout must be > 0")
|
||||
case !c.Metadata.Full && c.Group.Topics.Whitelist != nil:
|
||||
return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used")
|
||||
case !c.Metadata.Full && c.Group.Topics.Blacklist != nil:
|
||||
return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used")
|
||||
}
|
||||
|
||||
// ensure offset is correct
|
||||
switch c.Consumer.Offsets.Initial {
|
||||
case sarama.OffsetOldest, sarama.OffsetNewest:
|
||||
default:
|
||||
return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
26
third/github.com/johng-cn/sarama-cluster/config_test.go
Normal file
26
third/github.com/johng-cn/sarama-cluster/config_test.go
Normal file
@ -0,0 +1,26 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/ginkgo"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Config", func() {
|
||||
var subject *Config
|
||||
|
||||
BeforeEach(func() {
|
||||
subject = NewConfig()
|
||||
})
|
||||
|
||||
It("should init", func() {
|
||||
Expect(subject.Group.Session.Timeout).To(Equal(30 * time.Second))
|
||||
Expect(subject.Group.Heartbeat.Interval).To(Equal(3 * time.Second))
|
||||
Expect(subject.Group.Return.Notifications).To(BeFalse())
|
||||
Expect(subject.Metadata.Retry.Max).To(Equal(3))
|
||||
Expect(subject.Group.Offsets.Synchronization.DwellTime).NotTo(BeZero())
|
||||
// Expect(subject.Config.Version).To(Equal(sarama.V0_9_0_0))
|
||||
})
|
||||
|
||||
})
|
||||
948
third/github.com/johng-cn/sarama-cluster/consumer.go
Normal file
948
third/github.com/johng-cn/sarama-cluster/consumer.go
Normal file
@ -0,0 +1,948 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// Consumer is a cluster group consumer
|
||||
type Consumer struct {
|
||||
client *Client
|
||||
ownClient bool
|
||||
|
||||
consumer sarama.Consumer
|
||||
subs *partitionMap
|
||||
|
||||
consumerID string
|
||||
groupID string
|
||||
|
||||
memberID string
|
||||
generationID int32
|
||||
membershipMu sync.RWMutex
|
||||
|
||||
coreTopics []string
|
||||
extraTopics []string
|
||||
|
||||
dying, dead chan none
|
||||
closeOnce sync.Once
|
||||
|
||||
consuming int32
|
||||
messages chan *sarama.ConsumerMessage
|
||||
errors chan error
|
||||
partitions chan PartitionConsumer
|
||||
notifications chan *Notification
|
||||
|
||||
customOffsets map[string]map[int32]offsetInfo
|
||||
|
||||
commitMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewConsumer initializes a new consumer
|
||||
func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
|
||||
client, err := NewClient(addrs, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
consumer, err := NewConsumerFromClient(client, groupID, topics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
consumer.ownClient = true
|
||||
return consumer, nil
|
||||
}
|
||||
|
||||
// NewConsumerFromClient initializes a new consumer from an existing client.
|
||||
//
|
||||
// Please note that clients cannot be shared between consumers (due to Kafka internals),
|
||||
// they can only be re-used which requires the user to call Close() on the first consumer
|
||||
// before using this method again to initialize another one. Attempts to use a client with
|
||||
// more than one consumer at a time will return errors.
|
||||
func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
|
||||
if !client.claim() {
|
||||
return nil, errClientInUse
|
||||
}
|
||||
|
||||
consumer, err := sarama.NewConsumerFromClient(client.Client)
|
||||
if err != nil {
|
||||
client.release()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sort.Strings(topics)
|
||||
c := &Consumer{
|
||||
client: client,
|
||||
consumer: consumer,
|
||||
subs: newPartitionMap(),
|
||||
groupID: groupID,
|
||||
|
||||
coreTopics: topics,
|
||||
|
||||
dying: make(chan none),
|
||||
dead: make(chan none),
|
||||
|
||||
messages: make(chan *sarama.ConsumerMessage),
|
||||
errors: make(chan error, client.config.ChannelBufferSize),
|
||||
partitions: make(chan PartitionConsumer, 1),
|
||||
notifications: make(chan *Notification),
|
||||
|
||||
customOffsets: make(map[string]map[int32]offsetInfo),
|
||||
}
|
||||
if err := c.client.RefreshCoordinator(groupID); err != nil {
|
||||
client.release()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go c.mainLoop()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Messages returns the read channel for the messages that are returned by
|
||||
// the broker.
|
||||
//
|
||||
// This channel will only return if Config.Group.Mode option is set to
|
||||
// ConsumerModeMultiplex (default).
|
||||
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
|
||||
|
||||
// Partitions returns the read channels for individual partitions of this broker.
|
||||
//
|
||||
// This will channel will only return if Config.Group.Mode option is set to
|
||||
// ConsumerModePartitions.
|
||||
//
|
||||
// The Partitions() channel must be listened to for the life of this consumer;
|
||||
// when a rebalance happens old partitions will be closed (naturally come to
|
||||
// completion) and new ones will be emitted. The returned channel will only close
|
||||
// when the consumer is completely shut down.
|
||||
func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
|
||||
|
||||
// Errors returns a read channel of errors that occur during offset management, if
|
||||
// enabled. By default, errors are logged and not returned over this channel. If
|
||||
// you want to implement any custom error handling, set your config's
|
||||
// Consumer.Return.Errors setting to true, and read from this channel.
|
||||
func (c *Consumer) Errors() <-chan error { return c.errors }
|
||||
|
||||
// Notifications returns a channel of Notifications that occur during consumer
|
||||
// rebalancing. Notifications will only be emitted over this channel, if your config's
|
||||
// Group.Return.Notifications setting to true.
|
||||
func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
|
||||
|
||||
// HighWaterMarks returns the current high water marks for each topic and partition
|
||||
// Consistency between partitions is not guaranteed since high water marks are updated separately.
|
||||
func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
|
||||
|
||||
// MarkOffset marks the provided message as processed, alongside a metadata string
|
||||
// that represents the state of the partition consumer at that point in time. The
|
||||
// metadata string can be used by another consumer to restore that state, so it
|
||||
// can resume consumption.
|
||||
//
|
||||
// Note: calling MarkOffset does not necessarily commit the offset to the backend
|
||||
// store immediately for efficiency reasons, and it may never be committed if
|
||||
// your application crashes. This means that you may end up processing the same
|
||||
// message twice, and your processing should ideally be idempotent.
|
||||
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
|
||||
sub.MarkOffset(msg.Offset, metadata)
|
||||
} else {
|
||||
c.markCustomOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
|
||||
// See MarkOffset for additional explanation.
|
||||
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
if sub := c.subs.Fetch(topic, partition); sub != nil {
|
||||
sub.MarkOffset(offset, metadata)
|
||||
} else {
|
||||
c.markCustomOffset(topic, partition, offset, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) markCustomOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
if _, ok := c.customOffsets[topic]; !ok {
|
||||
c.customOffsets[topic] = make(map[int32]offsetInfo)
|
||||
}
|
||||
c.customOffsets[topic][partition] = offsetInfo {
|
||||
Offset : offset,
|
||||
Metadata : metadata,
|
||||
}
|
||||
}
|
||||
|
||||
// MarkOffsets marks stashed offsets as processed.
|
||||
// See MarkOffset for additional explanation.
|
||||
func (c *Consumer) MarkOffsets(s *OffsetStash) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for tp, info := range s.offsets {
|
||||
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
|
||||
sub.MarkOffset(info.Offset, info.Metadata)
|
||||
} else {
|
||||
c.markCustomOffset(tp.Topic, tp.Partition, info.Offset, info.Metadata)
|
||||
}
|
||||
delete(s.offsets, tp)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetOffsets marks the provided message as processed, alongside a metadata string
|
||||
// that represents the state of the partition consumer at that point in time. The
|
||||
// metadata string can be used by another consumer to restore that state, so it
|
||||
// can resume consumption.
|
||||
//
|
||||
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
|
||||
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
|
||||
sub.ResetOffset(msg.Offset, metadata)
|
||||
} else {
|
||||
c.markCustomOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
|
||||
// See ResetOffset for additional explanation.
|
||||
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
sub := c.subs.Fetch(topic, partition)
|
||||
if sub != nil {
|
||||
sub.ResetOffset(offset, metadata)
|
||||
} else {
|
||||
c.markCustomOffset(topic, partition, offset, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetOffsets marks stashed offsets as processed.
|
||||
// See ResetOffset for additional explanation.
|
||||
func (c *Consumer) ResetOffsets(s *OffsetStash) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for tp, info := range s.offsets {
|
||||
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
|
||||
sub.ResetOffset(info.Offset, info.Metadata)
|
||||
}
|
||||
delete(s.offsets, tp)
|
||||
}
|
||||
}
|
||||
|
||||
// Subscriptions returns the consumed topics and partitions
|
||||
func (c *Consumer) Subscriptions() map[string][]int32 {
|
||||
return c.subs.Info()
|
||||
}
|
||||
|
||||
// CommitOffsets allows to manually commit previously marked offsets. By default there is no
|
||||
// need to call this function as the consumer will commit offsets automatically
|
||||
// using the Config.Consumer.Offsets.CommitInterval setting.
|
||||
//
|
||||
// Please be aware that calling this function during an internal rebalance cycle may return
|
||||
// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
|
||||
func (c *Consumer) CommitOffsets() error {
|
||||
c.commitMu.Lock()
|
||||
defer c.commitMu.Unlock()
|
||||
|
||||
memberID, generationID := c.membership()
|
||||
req := &sarama.OffsetCommitRequest{
|
||||
Version: 2,
|
||||
ConsumerGroup: c.groupID,
|
||||
ConsumerGroupGeneration: generationID,
|
||||
ConsumerID: memberID,
|
||||
RetentionTime: -1,
|
||||
}
|
||||
|
||||
if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
|
||||
req.RetentionTime = int64(ns / time.Millisecond)
|
||||
}
|
||||
|
||||
snap := c.subs.Snapshot()
|
||||
dirty := false
|
||||
for tp, state := range snap {
|
||||
if state.Dirty {
|
||||
dirty = true
|
||||
req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
|
||||
}
|
||||
}
|
||||
if !dirty {
|
||||
return nil
|
||||
}
|
||||
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := broker.CommitOffset(req)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
|
||||
for topic, errs := range resp.Errors {
|
||||
for partition, kerr := range errs {
|
||||
if kerr != sarama.ErrNoError {
|
||||
err = kerr
|
||||
} else if state, ok := snap[topicPartition{topic, partition}]; ok {
|
||||
if sub := c.subs.Fetch(topic, partition); sub != nil {
|
||||
sub.markCommitted(state.Info.Offset)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Close safely closes the consumer and releases all resources
|
||||
func (c *Consumer) Close() (err error) {
|
||||
c.closeOnce.Do(func() {
|
||||
close(c.dying)
|
||||
<-c.dead
|
||||
|
||||
if e := c.release(); e != nil {
|
||||
err = e
|
||||
}
|
||||
if e := c.consumer.Close(); e != nil {
|
||||
err = e
|
||||
}
|
||||
close(c.messages)
|
||||
close(c.errors)
|
||||
|
||||
if e := c.leaveGroup(); e != nil {
|
||||
err = e
|
||||
}
|
||||
close(c.partitions)
|
||||
close(c.notifications)
|
||||
|
||||
// drain
|
||||
for range c.messages {
|
||||
}
|
||||
for range c.errors {
|
||||
}
|
||||
for p := range c.partitions {
|
||||
_ = p.Close()
|
||||
}
|
||||
for range c.notifications {
|
||||
}
|
||||
|
||||
c.client.release()
|
||||
if c.ownClient {
|
||||
if e := c.client.Close(); e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Consumer) mainLoop() {
|
||||
defer close(c.dead)
|
||||
defer atomic.StoreInt32(&c.consuming, 0)
|
||||
|
||||
for {
|
||||
atomic.StoreInt32(&c.consuming, 0)
|
||||
|
||||
// Check if close was requested
|
||||
select {
|
||||
case <-c.dying:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Start next consume cycle
|
||||
c.nextTick()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) nextTick() {
|
||||
// Remember previous subscriptions
|
||||
var notification *Notification
|
||||
if c.client.config.Group.Return.Notifications {
|
||||
notification = newNotification(c.subs.Info())
|
||||
}
|
||||
|
||||
// Refresh coordinator
|
||||
if err := c.refreshCoordinator(); err != nil {
|
||||
c.rebalanceError(err, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Release subscriptions
|
||||
if err := c.release(); err != nil {
|
||||
c.rebalanceError(err, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Issue rebalance start notification
|
||||
if c.client.config.Group.Return.Notifications {
|
||||
c.handleNotification(notification)
|
||||
}
|
||||
|
||||
// Rebalance, fetch new subscriptions
|
||||
subs, err := c.rebalance()
|
||||
if err != nil {
|
||||
c.rebalanceError(err, notification)
|
||||
return
|
||||
}
|
||||
|
||||
// Coordinate loops, make sure everything is
|
||||
// stopped on exit
|
||||
tomb := newLoopTomb()
|
||||
defer tomb.Close()
|
||||
|
||||
// Start the heartbeat
|
||||
tomb.Go(c.hbLoop)
|
||||
|
||||
// Subscribe to topic/partitions
|
||||
if err := c.subscribe(tomb, subs); err != nil {
|
||||
c.rebalanceError(err, notification)
|
||||
return
|
||||
}
|
||||
|
||||
// Update/issue notification with new claims
|
||||
if c.client.config.Group.Return.Notifications {
|
||||
notification = notification.success(subs)
|
||||
c.handleNotification(notification)
|
||||
}
|
||||
|
||||
// Start topic watcher loop
|
||||
tomb.Go(c.twLoop)
|
||||
|
||||
// Start consuming and committing offsets
|
||||
tomb.Go(c.cmLoop)
|
||||
atomic.StoreInt32(&c.consuming, 1)
|
||||
|
||||
// Wait for signals
|
||||
select {
|
||||
case <-tomb.Dying():
|
||||
case <-c.dying:
|
||||
}
|
||||
}
|
||||
|
||||
// heartbeat loop, triggered by the mainLoop
|
||||
func (c *Consumer) hbLoop(stopped <-chan none) {
|
||||
ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
switch err := c.heartbeat(); err {
|
||||
case nil, sarama.ErrNoError:
|
||||
case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
|
||||
return
|
||||
default:
|
||||
c.handleError(&Error{Ctx: "heartbeat", error: err})
|
||||
return
|
||||
}
|
||||
case <-stopped:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// topic watcher loop, triggered by the mainLoop
|
||||
func (c *Consumer) twLoop(stopped <-chan none) {
|
||||
ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
topics, err := c.client.Topics()
|
||||
if err != nil {
|
||||
c.handleError(&Error{Ctx: "topics", error: err})
|
||||
return
|
||||
}
|
||||
|
||||
for _, topic := range topics {
|
||||
if !c.isKnownCoreTopic(topic) &&
|
||||
!c.isKnownExtraTopic(topic) &&
|
||||
c.isPotentialExtraTopic(topic) {
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-stopped:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// commit loop, triggered by the mainLoop
|
||||
func (c *Consumer) cmLoop(stopped <-chan none) {
|
||||
ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
|
||||
c.handleError(&Error{Ctx: "commit", error: err})
|
||||
return
|
||||
}
|
||||
case <-stopped:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) rebalanceError(err error, n *Notification) {
|
||||
if n != nil {
|
||||
n.Type = RebalanceError
|
||||
c.handleNotification(n)
|
||||
}
|
||||
|
||||
switch err {
|
||||
case sarama.ErrRebalanceInProgress:
|
||||
default:
|
||||
c.handleError(&Error{Ctx: "rebalance", error: err})
|
||||
}
|
||||
|
||||
select {
|
||||
case <-c.dying:
|
||||
case <-time.After(c.client.config.Metadata.Retry.Backoff):
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) handleNotification(n *Notification) {
|
||||
if c.client.config.Group.Return.Notifications {
|
||||
select {
|
||||
case c.notifications <- n:
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) handleError(e *Error) {
|
||||
if c.client.config.Consumer.Return.Errors {
|
||||
select {
|
||||
case c.errors <- e:
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
} else {
|
||||
sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Releases the consumer and commits offsets, called from rebalance() and Close()
|
||||
func (c *Consumer) release() (err error) {
|
||||
// Stop all consumers
|
||||
c.subs.Stop()
|
||||
|
||||
// Clear subscriptions on exit
|
||||
defer c.subs.Clear()
|
||||
|
||||
// Wait for messages to be processed
|
||||
timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
|
||||
defer timeout.Stop()
|
||||
|
||||
select {
|
||||
case <-c.dying:
|
||||
case <-timeout.C:
|
||||
}
|
||||
|
||||
// Commit offsets, continue on errors
|
||||
if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
|
||||
err = e
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
// Performs a heartbeat, part of the mainLoop()
|
||||
func (c *Consumer) heartbeat() error {
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
|
||||
memberID, generationID := c.membership()
|
||||
resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
|
||||
GroupId: c.groupID,
|
||||
MemberId: memberID,
|
||||
GenerationId: generationID,
|
||||
})
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
return resp.Err
|
||||
}
|
||||
|
||||
// Performs a rebalance, part of the mainLoop()
|
||||
func (c *Consumer) rebalance() (map[string][]int32, error) {
|
||||
memberID, _ := c.membership()
|
||||
sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
|
||||
|
||||
allTopics, err := c.client.Topics()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.extraTopics = c.selectExtraTopics(allTopics)
|
||||
sort.Strings(c.extraTopics)
|
||||
|
||||
// Re-join consumer group
|
||||
strategy, err := c.joinGroup()
|
||||
switch {
|
||||
case err == sarama.ErrUnknownMemberId:
|
||||
c.membershipMu.Lock()
|
||||
c.memberID = ""
|
||||
c.membershipMu.Unlock()
|
||||
return nil, err
|
||||
case err != nil:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Sync consumer group state, fetch subscriptions
|
||||
subs, err := c.syncGroup(strategy)
|
||||
switch {
|
||||
case err == sarama.ErrRebalanceInProgress:
|
||||
return nil, err
|
||||
case err != nil:
|
||||
_ = c.leaveGroup()
|
||||
return nil, err
|
||||
}
|
||||
return subs, nil
|
||||
}
|
||||
|
||||
// Performs the subscription, part of the mainLoop()
|
||||
func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
|
||||
// fetch offsets
|
||||
offsets, err := c.fetchOffsets(subs)
|
||||
if err != nil {
|
||||
_ = c.leaveGroup()
|
||||
return err
|
||||
}
|
||||
|
||||
// create consumers in parallel
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for topic, partitions := range subs {
|
||||
for _, partition := range partitions {
|
||||
wg.Add(1)
|
||||
|
||||
info := offsets[topic][partition]
|
||||
if item, ok := c.customOffsets[topic]; ok {
|
||||
if i, ok := item[partition]; ok {
|
||||
info = i
|
||||
}
|
||||
}
|
||||
go func(topic string, partition int32) {
|
||||
if e := c.createConsumer(tomb, topic, partition, info); e != nil {
|
||||
mu.Lock()
|
||||
err = e
|
||||
mu.Unlock()
|
||||
}
|
||||
wg.Done()
|
||||
}(topic, partition)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if err != nil {
|
||||
_ = c.release()
|
||||
_ = c.leaveGroup()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
// Send a request to the broker to join group on rebalance()
|
||||
func (c *Consumer) joinGroup() (*balancer, error) {
|
||||
memberID, _ := c.membership()
|
||||
req := &sarama.JoinGroupRequest{
|
||||
GroupId: c.groupID,
|
||||
MemberId: memberID,
|
||||
SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
|
||||
ProtocolType: "consumer",
|
||||
}
|
||||
|
||||
meta := &sarama.ConsumerGroupMemberMetadata{
|
||||
Version: 1,
|
||||
Topics: append(c.coreTopics, c.extraTopics...),
|
||||
UserData: c.client.config.Group.Member.UserData,
|
||||
}
|
||||
err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := broker.JoinGroup(req)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
} else if resp.Err != sarama.ErrNoError {
|
||||
c.closeCoordinator(broker, resp.Err)
|
||||
return nil, resp.Err
|
||||
}
|
||||
|
||||
var strategy *balancer
|
||||
if resp.LeaderId == resp.MemberId {
|
||||
members, err := resp.GetMembers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
strategy, err = newBalancerFromMeta(c.client, members)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
c.membershipMu.Lock()
|
||||
c.memberID = resp.MemberId
|
||||
c.generationID = resp.GenerationId
|
||||
c.membershipMu.Unlock()
|
||||
|
||||
return strategy, nil
|
||||
}
|
||||
|
||||
// Send a request to the broker to sync the group on rebalance().
|
||||
// Returns a list of topics and partitions to consume.
|
||||
func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
|
||||
memberID, generationID := c.membership()
|
||||
req := &sarama.SyncGroupRequest{
|
||||
GroupId: c.groupID,
|
||||
MemberId: memberID,
|
||||
GenerationId: generationID,
|
||||
}
|
||||
|
||||
if strategy != nil {
|
||||
for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
|
||||
if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
|
||||
Topics: topics,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := broker.SyncGroup(req)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
} else if resp.Err != sarama.ErrNoError {
|
||||
c.closeCoordinator(broker, resp.Err)
|
||||
return nil, resp.Err
|
||||
}
|
||||
|
||||
// Return if there is nothing to subscribe to
|
||||
if len(resp.MemberAssignment) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Get assigned subscriptions
|
||||
members, err := resp.GetMemberAssignment()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Sort partitions, for each topic
|
||||
for topic := range members.Topics {
|
||||
sort.Sort(int32Slice(members.Topics[topic]))
|
||||
}
|
||||
return members.Topics, nil
|
||||
}
|
||||
|
||||
// Fetches latest committed offsets for all subscriptions
|
||||
func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
|
||||
offsets := make(map[string]map[int32]offsetInfo, len(subs))
|
||||
req := &sarama.OffsetFetchRequest{
|
||||
Version: 1,
|
||||
ConsumerGroup: c.groupID,
|
||||
}
|
||||
|
||||
for topic, partitions := range subs {
|
||||
offsets[topic] = make(map[int32]offsetInfo, len(partitions))
|
||||
for _, partition := range partitions {
|
||||
offsets[topic][partition] = offsetInfo{Offset: -1}
|
||||
req.AddPartition(topic, partition)
|
||||
}
|
||||
}
|
||||
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := broker.FetchOffset(req)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for topic, partitions := range subs {
|
||||
for _, partition := range partitions {
|
||||
block := resp.GetBlock(topic, partition)
|
||||
if block == nil {
|
||||
return nil, sarama.ErrIncompleteResponse
|
||||
}
|
||||
|
||||
if block.Err == sarama.ErrNoError {
|
||||
offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
|
||||
} else {
|
||||
return nil, block.Err
|
||||
}
|
||||
}
|
||||
}
|
||||
return offsets, nil
|
||||
}
|
||||
|
||||
// Send a request to the broker to leave the group on failes rebalance() and on Close()
|
||||
func (c *Consumer) leaveGroup() error {
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
|
||||
memberID, _ := c.membership()
|
||||
if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
|
||||
GroupId: c.groupID,
|
||||
MemberId: memberID,
|
||||
}); err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
|
||||
memberID, _ := c.membership()
|
||||
sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
|
||||
|
||||
// Create partitionConsumer
|
||||
pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store in subscriptions
|
||||
c.subs.Store(topic, partition, pc)
|
||||
|
||||
// Start partition consumer goroutine
|
||||
tomb.Go(func(stopper <-chan none) {
|
||||
if c.client.config.Group.Mode == ConsumerModePartitions {
|
||||
pc.waitFor(stopper)
|
||||
} else {
|
||||
pc.multiplex(stopper, c.messages, c.errors)
|
||||
}
|
||||
})
|
||||
|
||||
if c.client.config.Group.Mode == ConsumerModePartitions {
|
||||
c.partitions <- pc
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Consumer) commitOffsetsWithRetry(retries int) error {
|
||||
err := c.CommitOffsets()
|
||||
if err != nil && retries > 0 {
|
||||
return c.commitOffsetsWithRetry(retries - 1)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
|
||||
if broker != nil {
|
||||
_ = broker.Close()
|
||||
}
|
||||
|
||||
switch err {
|
||||
case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
|
||||
_ = c.client.RefreshCoordinator(c.groupID)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) selectExtraTopics(allTopics []string) []string {
|
||||
extra := allTopics[:0]
|
||||
for _, topic := range allTopics {
|
||||
if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
|
||||
extra = append(extra, topic)
|
||||
}
|
||||
}
|
||||
return extra
|
||||
}
|
||||
|
||||
func (c *Consumer) isKnownCoreTopic(topic string) bool {
|
||||
pos := sort.SearchStrings(c.coreTopics, topic)
|
||||
return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
|
||||
}
|
||||
|
||||
func (c *Consumer) isKnownExtraTopic(topic string) bool {
|
||||
pos := sort.SearchStrings(c.extraTopics, topic)
|
||||
return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
|
||||
}
|
||||
|
||||
func (c *Consumer) isPotentialExtraTopic(topic string) bool {
|
||||
rx := c.client.config.Group.Topics
|
||||
if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
|
||||
return false
|
||||
}
|
||||
if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *Consumer) refreshCoordinator() error {
|
||||
if err := c.refreshMetadata(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.client.RefreshCoordinator(c.groupID)
|
||||
}
|
||||
|
||||
func (c *Consumer) refreshMetadata() (err error) {
|
||||
if c.client.config.Metadata.Full {
|
||||
err = c.client.RefreshMetadata()
|
||||
} else {
|
||||
var topics []string
|
||||
if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
|
||||
err = c.client.RefreshMetadata(topics...)
|
||||
}
|
||||
}
|
||||
|
||||
// maybe we didn't have authorization to describe all topics
|
||||
switch err {
|
||||
case sarama.ErrTopicAuthorizationFailed:
|
||||
err = c.client.RefreshMetadata(c.coreTopics...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Consumer) membership() (memberID string, generationID int32) {
|
||||
c.membershipMu.RLock()
|
||||
memberID, generationID = c.memberID, c.generationID
|
||||
c.membershipMu.RUnlock()
|
||||
return
|
||||
}
|
||||
350
third/github.com/johng-cn/sarama-cluster/consumer_test.go
Normal file
350
third/github.com/johng-cn/sarama-cluster/consumer_test.go
Normal file
@ -0,0 +1,350 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/ginkgo"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Consumer", func() {
|
||||
|
||||
var newConsumerOf = func(group string, topics ...string) (*Consumer, error) {
|
||||
config := NewConfig()
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||||
return NewConsumer(testKafkaAddrs, group, topics, config)
|
||||
}
|
||||
|
||||
var subscriptionsOf = func(c *Consumer) GomegaAsyncAssertion {
|
||||
return Eventually(func() map[string][]int32 {
|
||||
return c.Subscriptions()
|
||||
}, "10s", "100ms")
|
||||
}
|
||||
|
||||
It("should init and share", func() {
|
||||
// start CS1
|
||||
cs1, err := newConsumerOf(testGroup, testTopics...)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// CS1 should consume all 8 partitions
|
||||
subscriptionsOf(cs1).Should(Equal(map[string][]int32{
|
||||
"topic-a": {0, 1, 2, 3},
|
||||
"topic-b": {0, 1, 2, 3},
|
||||
}))
|
||||
|
||||
// start CS2
|
||||
cs2, err := newConsumerOf(testGroup, testTopics...)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs2.Close()
|
||||
|
||||
// CS1 and CS2 should consume 4 partitions each
|
||||
subscriptionsOf(cs1).Should(HaveLen(2))
|
||||
subscriptionsOf(cs1).Should(HaveKeyWithValue("topic-a", HaveLen(2)))
|
||||
subscriptionsOf(cs1).Should(HaveKeyWithValue("topic-b", HaveLen(2)))
|
||||
|
||||
subscriptionsOf(cs2).Should(HaveLen(2))
|
||||
subscriptionsOf(cs2).Should(HaveKeyWithValue("topic-a", HaveLen(2)))
|
||||
subscriptionsOf(cs2).Should(HaveKeyWithValue("topic-b", HaveLen(2)))
|
||||
|
||||
// shutdown CS1, now CS2 should consume all 8 partitions
|
||||
Expect(cs1.Close()).NotTo(HaveOccurred())
|
||||
subscriptionsOf(cs2).Should(Equal(map[string][]int32{
|
||||
"topic-a": {0, 1, 2, 3},
|
||||
"topic-b": {0, 1, 2, 3},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should allow more consumers than partitions", func() {
|
||||
cs1, err := newConsumerOf(testGroup, "topic-a")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs1.Close()
|
||||
cs2, err := newConsumerOf(testGroup, "topic-a")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs2.Close()
|
||||
cs3, err := newConsumerOf(testGroup, "topic-a")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs3.Close()
|
||||
cs4, err := newConsumerOf(testGroup, "topic-a")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// start 4 consumers, one for each partition
|
||||
subscriptionsOf(cs1).Should(HaveKeyWithValue("topic-a", HaveLen(1)))
|
||||
subscriptionsOf(cs2).Should(HaveKeyWithValue("topic-a", HaveLen(1)))
|
||||
subscriptionsOf(cs3).Should(HaveKeyWithValue("topic-a", HaveLen(1)))
|
||||
subscriptionsOf(cs4).Should(HaveKeyWithValue("topic-a", HaveLen(1)))
|
||||
|
||||
// add a 5th consumer
|
||||
cs5, err := newConsumerOf(testGroup, "topic-a")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs5.Close()
|
||||
|
||||
// make sure no errors occurred
|
||||
Expect(cs1.Errors()).ShouldNot(Receive())
|
||||
Expect(cs2.Errors()).ShouldNot(Receive())
|
||||
Expect(cs3.Errors()).ShouldNot(Receive())
|
||||
Expect(cs4.Errors()).ShouldNot(Receive())
|
||||
Expect(cs5.Errors()).ShouldNot(Receive())
|
||||
|
||||
// close 4th, make sure the 5th takes over
|
||||
Expect(cs4.Close()).To(Succeed())
|
||||
subscriptionsOf(cs1).Should(HaveKeyWithValue("topic-a", HaveLen(1)))
|
||||
subscriptionsOf(cs2).Should(HaveKeyWithValue("topic-a", HaveLen(1)))
|
||||
subscriptionsOf(cs3).Should(HaveKeyWithValue("topic-a", HaveLen(1)))
|
||||
subscriptionsOf(cs4).Should(BeEmpty())
|
||||
subscriptionsOf(cs5).Should(HaveKeyWithValue("topic-a", HaveLen(1)))
|
||||
|
||||
// there should still be no errors
|
||||
Expect(cs1.Errors()).ShouldNot(Receive())
|
||||
Expect(cs2.Errors()).ShouldNot(Receive())
|
||||
Expect(cs3.Errors()).ShouldNot(Receive())
|
||||
Expect(cs4.Errors()).ShouldNot(Receive())
|
||||
Expect(cs5.Errors()).ShouldNot(Receive())
|
||||
})
|
||||
|
||||
It("should be allowed to subscribe to partitions via white/black-lists", func() {
|
||||
config := NewConfig()
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Group.Topics.Whitelist = regexp.MustCompile(`topic-\w+`)
|
||||
config.Group.Topics.Blacklist = regexp.MustCompile(`[bcd]$`)
|
||||
|
||||
cs, err := NewConsumer(testKafkaAddrs, testGroup, nil, config)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs.Close()
|
||||
|
||||
subscriptionsOf(cs).Should(Equal(map[string][]int32{
|
||||
"topic-a": {0, 1, 2, 3},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should receive rebalance notifications", func() {
|
||||
config := NewConfig()
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Group.Return.Notifications = true
|
||||
|
||||
cs, err := NewConsumer(testKafkaAddrs, testGroup, testTopics, config)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs.Close()
|
||||
|
||||
select {
|
||||
case n := <-cs.Notifications():
|
||||
Expect(n).To(Equal(&Notification{
|
||||
Type: RebalanceStart,
|
||||
Current: map[string][]int32{},
|
||||
}))
|
||||
case err := <-cs.Errors():
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
case <-cs.Messages():
|
||||
Fail("expected notification to arrive before message")
|
||||
}
|
||||
|
||||
select {
|
||||
case n := <-cs.Notifications():
|
||||
Expect(n).To(Equal(&Notification{
|
||||
Type: RebalanceOK,
|
||||
Claimed: map[string][]int32{
|
||||
"topic-a": {0, 1, 2, 3},
|
||||
"topic-b": {0, 1, 2, 3},
|
||||
},
|
||||
Released: map[string][]int32{},
|
||||
Current: map[string][]int32{
|
||||
"topic-a": {0, 1, 2, 3},
|
||||
"topic-b": {0, 1, 2, 3},
|
||||
},
|
||||
}))
|
||||
case err := <-cs.Errors():
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
case <-cs.Messages():
|
||||
Fail("expected notification to arrive before message")
|
||||
}
|
||||
})
|
||||
|
||||
It("should support manual mark/commit", func() {
|
||||
cs, err := newConsumerOf(testGroup, "topic-a")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs.Close()
|
||||
|
||||
subscriptionsOf(cs).Should(Equal(map[string][]int32{
|
||||
"topic-a": {0, 1, 2, 3}},
|
||||
))
|
||||
|
||||
cs.MarkPartitionOffset("topic-a", 1, 3, "")
|
||||
cs.MarkPartitionOffset("topic-a", 2, 4, "")
|
||||
Expect(cs.CommitOffsets()).NotTo(HaveOccurred())
|
||||
|
||||
offsets, err := cs.fetchOffsets(cs.Subscriptions())
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(offsets).To(Equal(map[string]map[int32]offsetInfo{
|
||||
"topic-a": {0: {Offset: -1}, 1: {Offset: 4}, 2: {Offset: 5}, 3: {Offset: -1}},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should support manual mark/commit, reset/commit", func() {
|
||||
cs, err := newConsumerOf(testGroup, "topic-a")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs.Close()
|
||||
|
||||
subscriptionsOf(cs).Should(Equal(map[string][]int32{
|
||||
"topic-a": {0, 1, 2, 3}},
|
||||
))
|
||||
|
||||
cs.MarkPartitionOffset("topic-a", 1, 3, "")
|
||||
cs.MarkPartitionOffset("topic-a", 2, 4, "")
|
||||
cs.MarkPartitionOffset("topic-b", 1, 2, "") // should not throw NPE
|
||||
Expect(cs.CommitOffsets()).NotTo(HaveOccurred())
|
||||
|
||||
cs.ResetPartitionOffset("topic-a", 1, 2, "")
|
||||
cs.ResetPartitionOffset("topic-a", 2, 3, "")
|
||||
cs.ResetPartitionOffset("topic-b", 1, 2, "") // should not throw NPE
|
||||
Expect(cs.CommitOffsets()).NotTo(HaveOccurred())
|
||||
|
||||
offsets, err := cs.fetchOffsets(cs.Subscriptions())
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(offsets).To(Equal(map[string]map[int32]offsetInfo{
|
||||
"topic-a": {0: {Offset: -1}, 1: {Offset: 3}, 2: {Offset: 4}, 3: {Offset: -1}},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should not commit unprocessed offsets", func() {
|
||||
const groupID = "panicking"
|
||||
|
||||
cs, err := newConsumerOf(groupID, "topic-a")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
subscriptionsOf(cs).Should(Equal(map[string][]int32{
|
||||
"topic-a": {0, 1, 2, 3},
|
||||
}))
|
||||
|
||||
n := 0
|
||||
Expect(func() {
|
||||
for range cs.Messages() {
|
||||
n++
|
||||
panic("stop here!")
|
||||
}
|
||||
}).To(Panic())
|
||||
Expect(cs.Close()).To(Succeed())
|
||||
Expect(n).To(Equal(1))
|
||||
|
||||
bk, err := testClient.Coordinator(groupID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
req := &sarama.OffsetFetchRequest{
|
||||
Version: 1,
|
||||
ConsumerGroup: groupID,
|
||||
}
|
||||
req.AddPartition("topic-a", 0)
|
||||
req.AddPartition("topic-a", 1)
|
||||
req.AddPartition("topic-a", 2)
|
||||
req.AddPartition("topic-a", 3)
|
||||
Expect(bk.FetchOffset(req)).To(Equal(&sarama.OffsetFetchResponse{
|
||||
Blocks: map[string]map[int32]*sarama.OffsetFetchResponseBlock{
|
||||
"topic-a": {0: {Offset: -1}, 1: {Offset: -1}, 2: {Offset: -1}, 3: {Offset: -1}},
|
||||
},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should consume partitions", func() {
|
||||
count := int32(0)
|
||||
consume := func(consumerID string) {
|
||||
defer GinkgoRecover()
|
||||
|
||||
config := NewConfig()
|
||||
config.Group.Mode = ConsumerModePartitions
|
||||
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||||
|
||||
cs, err := NewConsumer(testKafkaAddrs, "partitions", testTopics, config)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs.Close()
|
||||
|
||||
for pc := range cs.Partitions() {
|
||||
go func(pc PartitionConsumer) {
|
||||
defer pc.Close()
|
||||
|
||||
for msg := range pc.Messages() {
|
||||
atomic.AddInt32(&count, 1)
|
||||
cs.MarkOffset(msg, "")
|
||||
}
|
||||
}(pc)
|
||||
}
|
||||
}
|
||||
|
||||
go consume("A")
|
||||
go consume("B")
|
||||
go consume("C")
|
||||
|
||||
Eventually(func() int32 {
|
||||
return atomic.LoadInt32(&count)
|
||||
}, "30s", "100ms").Should(BeNumerically(">=", 2000))
|
||||
})
|
||||
|
||||
It("should consume/commit/resume", func() {
|
||||
acc := make(chan *testConsumerMessage, 20000)
|
||||
consume := func(consumerID string, max int32) {
|
||||
defer GinkgoRecover()
|
||||
|
||||
cs, err := NewConsumer(testKafkaAddrs, "fuzzing", testTopics, nil)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer cs.Close()
|
||||
cs.consumerID = consumerID
|
||||
|
||||
for msg := range cs.Messages() {
|
||||
acc <- &testConsumerMessage{*msg, consumerID}
|
||||
cs.MarkOffset(msg, "")
|
||||
|
||||
if atomic.AddInt32(&max, -1) <= 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go consume("A", 1500)
|
||||
go consume("B", 2000)
|
||||
go consume("C", 1500)
|
||||
go consume("D", 200)
|
||||
go consume("E", 100)
|
||||
time.Sleep(10 * time.Second) // wait for consumers to subscribe to topics
|
||||
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
|
||||
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 5000))
|
||||
|
||||
go consume("F", 300)
|
||||
go consume("G", 400)
|
||||
go consume("H", 1000)
|
||||
go consume("I", 2000)
|
||||
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
|
||||
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 8000))
|
||||
|
||||
go consume("J", 1000)
|
||||
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
|
||||
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 9000))
|
||||
|
||||
go consume("K", 1000)
|
||||
go consume("L", 3000)
|
||||
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
|
||||
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 12000))
|
||||
|
||||
go consume("M", 1000)
|
||||
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
|
||||
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 15000))
|
||||
|
||||
close(acc)
|
||||
|
||||
uniques := make(map[string][]string)
|
||||
for msg := range acc {
|
||||
key := fmt.Sprintf("%s/%d/%d", msg.Topic, msg.Partition, msg.Offset)
|
||||
uniques[key] = append(uniques[key], msg.ConsumerID)
|
||||
}
|
||||
Expect(uniques).To(HaveLen(15000))
|
||||
})
|
||||
|
||||
It("should allow close to be called multiple times", func() {
|
||||
cs, err := newConsumerOf(testGroup, testTopics...)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(cs.Close()).NotTo(HaveOccurred())
|
||||
Expect(cs.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
})
|
||||
8
third/github.com/johng-cn/sarama-cluster/doc.go
Normal file
8
third/github.com/johng-cn/sarama-cluster/doc.go
Normal file
@ -0,0 +1,8 @@
|
||||
/*
|
||||
Package cluster provides cluster extensions for Sarama, enabing users
|
||||
to consume topics across from multiple, balanced nodes.
|
||||
|
||||
It requires Kafka v0.9+ and follows the steps guide, described in:
|
||||
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
|
||||
*/
|
||||
package cluster
|
||||
123
third/github.com/johng-cn/sarama-cluster/examples_test.go
Normal file
123
third/github.com/johng-cn/sarama-cluster/examples_test.go
Normal file
@ -0,0 +1,123 @@
|
||||
package cluster_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"regexp"
|
||||
|
||||
cluster "gitee.com/johng/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
// This example shows how to use the consumer to read messages
|
||||
// from a multiple topics through a multiplexed channel.
|
||||
func ExampleConsumer() {
|
||||
|
||||
// init (custom) config, enable errors and notifications
|
||||
config := cluster.NewConfig()
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Group.Return.Notifications = true
|
||||
|
||||
// init consumer
|
||||
brokers := []string{"127.0.0.1:9092"}
|
||||
topics := []string{"my_topic", "other_topic"}
|
||||
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
|
||||
// trap SIGINT to trigger a shutdown.
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt)
|
||||
|
||||
// consume errors
|
||||
go func() {
|
||||
for err := range consumer.Errors() {
|
||||
log.Printf("Error: %s\n", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
// consume notifications
|
||||
go func() {
|
||||
for ntf := range consumer.Notifications() {
|
||||
log.Printf("Rebalanced: %+v\n", ntf)
|
||||
}
|
||||
}()
|
||||
|
||||
// consume messages, watch signals
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-consumer.Messages():
|
||||
if ok {
|
||||
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
|
||||
consumer.MarkOffset(msg, "") // mark message as processed
|
||||
}
|
||||
case <-signals:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This example shows how to use the consumer to read messages
|
||||
// through individual partitions.
|
||||
func ExampleConsumer_Partitions() {
|
||||
|
||||
// init (custom) config, set mode to ConsumerModePartitions
|
||||
config := cluster.NewConfig()
|
||||
config.Group.Mode = cluster.ConsumerModePartitions
|
||||
|
||||
// init consumer
|
||||
brokers := []string{"127.0.0.1:9092"}
|
||||
topics := []string{"my_topic", "other_topic"}
|
||||
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
|
||||
// trap SIGINT to trigger a shutdown.
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt)
|
||||
|
||||
// consume partitions
|
||||
for {
|
||||
select {
|
||||
case part, ok := <-consumer.Partitions():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// start a separate goroutine to consume messages
|
||||
go func(pc cluster.PartitionConsumer) {
|
||||
for msg := range pc.Messages() {
|
||||
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
|
||||
consumer.MarkOffset(msg, "") // mark message as processed
|
||||
}
|
||||
}(part)
|
||||
case <-signals:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This example shows how to use the consumer with
|
||||
// topic whitelists.
|
||||
func ExampleConfig_whitelist() {
|
||||
|
||||
// init (custom) config, enable errors and notifications
|
||||
config := cluster.NewConfig()
|
||||
config.Group.Topics.Whitelist = regexp.MustCompile(`myservice.*`)
|
||||
|
||||
// init consumer
|
||||
consumer, err := cluster.NewConsumer([]string{"127.0.0.1:9092"}, "my-consumer-group", nil, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
|
||||
// consume messages
|
||||
msg := <-consumer.Messages()
|
||||
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
|
||||
}
|
||||
69
third/github.com/johng-cn/sarama-cluster/offsets.go
Normal file
69
third/github.com/johng-cn/sarama-cluster/offsets.go
Normal file
@ -0,0 +1,69 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// OffsetStash allows to accumulate offsets and
|
||||
// mark them as processed in a bulk
|
||||
type OffsetStash struct {
|
||||
offsets map[topicPartition]offsetInfo
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewOffsetStash inits a blank stash
|
||||
func NewOffsetStash() *OffsetStash {
|
||||
return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
|
||||
}
|
||||
|
||||
// MarkOffset stashes the provided message offset
|
||||
func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||||
}
|
||||
|
||||
// MarkPartitionOffset stashes the offset for the provided topic/partition combination
|
||||
func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
key := topicPartition{Topic: topic, Partition: partition}
|
||||
if info := s.offsets[key]; offset >= info.Offset {
|
||||
info.Offset = offset
|
||||
info.Metadata = metadata
|
||||
s.offsets[key] = info
|
||||
}
|
||||
}
|
||||
|
||||
// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
|
||||
// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
|
||||
func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
key := topicPartition{Topic: topic, Partition: partition}
|
||||
if info := s.offsets[key]; offset <= info.Offset {
|
||||
info.Offset = offset
|
||||
info.Metadata = metadata
|
||||
s.offsets[key] = info
|
||||
}
|
||||
}
|
||||
|
||||
// ResetOffset stashes the provided message offset
|
||||
// See ResetPartitionOffset for explanation
|
||||
func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||||
}
|
||||
|
||||
// Offsets returns the latest stashed offsets by topic-partition
|
||||
func (s *OffsetStash) Offsets() map[string]int64 {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
res := make(map[string]int64, len(s.offsets))
|
||||
for tp, info := range s.offsets {
|
||||
res[tp.String()] = info.Offset
|
||||
}
|
||||
return res
|
||||
}
|
||||
87
third/github.com/johng-cn/sarama-cluster/offsets_test.go
Normal file
87
third/github.com/johng-cn/sarama-cluster/offsets_test.go
Normal file
@ -0,0 +1,87 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/ginkgo"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("OffsetStash", func() {
|
||||
var subject *OffsetStash
|
||||
|
||||
BeforeEach(func() {
|
||||
subject = NewOffsetStash()
|
||||
})
|
||||
|
||||
It("should update", func() {
|
||||
Expect(subject.offsets).To(HaveLen(0))
|
||||
|
||||
subject.MarkPartitionOffset("topic", 0, 0, "m3ta")
|
||||
Expect(subject.offsets).To(HaveLen(1))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 0},
|
||||
offsetInfo{Offset: 0, Metadata: "m3ta"},
|
||||
))
|
||||
|
||||
subject.MarkPartitionOffset("topic", 0, 200, "m3ta")
|
||||
Expect(subject.offsets).To(HaveLen(1))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 0},
|
||||
offsetInfo{Offset: 200, Metadata: "m3ta"},
|
||||
))
|
||||
|
||||
subject.MarkPartitionOffset("topic", 0, 199, "m3t@")
|
||||
Expect(subject.offsets).To(HaveLen(1))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 0},
|
||||
offsetInfo{Offset: 200, Metadata: "m3ta"},
|
||||
))
|
||||
|
||||
subject.MarkPartitionOffset("topic", 1, 300, "")
|
||||
Expect(subject.offsets).To(HaveLen(2))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 1},
|
||||
offsetInfo{Offset: 300, Metadata: ""},
|
||||
))
|
||||
})
|
||||
|
||||
It("should reset", func() {
|
||||
Expect(subject.offsets).To(HaveLen(0))
|
||||
|
||||
subject.MarkPartitionOffset("topic", 0, 0, "m3ta")
|
||||
Expect(subject.offsets).To(HaveLen(1))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 0},
|
||||
offsetInfo{Offset: 0, Metadata: "m3ta"},
|
||||
))
|
||||
|
||||
subject.MarkPartitionOffset("topic", 0, 200, "m3ta")
|
||||
Expect(subject.offsets).To(HaveLen(1))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 0},
|
||||
offsetInfo{Offset: 200, Metadata: "m3ta"},
|
||||
))
|
||||
|
||||
subject.ResetPartitionOffset("topic", 0, 199, "m3t@")
|
||||
Expect(subject.offsets).To(HaveLen(1))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 0},
|
||||
offsetInfo{Offset: 199, Metadata: "m3t@"},
|
||||
))
|
||||
|
||||
subject.MarkPartitionOffset("topic", 1, 300, "")
|
||||
Expect(subject.offsets).To(HaveLen(2))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 1},
|
||||
offsetInfo{Offset: 300, Metadata: ""},
|
||||
))
|
||||
|
||||
subject.ResetPartitionOffset("topic", 1, 200, "m3t@")
|
||||
Expect(subject.offsets).To(HaveLen(2))
|
||||
Expect(subject.offsets).To(HaveKeyWithValue(
|
||||
topicPartition{Topic: "topic", Partition: 1},
|
||||
offsetInfo{Offset: 200, Metadata: "m3t@"},
|
||||
))
|
||||
|
||||
})
|
||||
|
||||
})
|
||||
274
third/github.com/johng-cn/sarama-cluster/partitions.go
Normal file
274
third/github.com/johng-cn/sarama-cluster/partitions.go
Normal file
@ -0,0 +1,274 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// PartitionConsumer allows code to consume individual partitions from the cluster.
|
||||
//
|
||||
// See docs for Consumer.Partitions() for more on how to implement this.
|
||||
type PartitionConsumer interface {
|
||||
sarama.PartitionConsumer
|
||||
|
||||
// Topic returns the consumed topic name
|
||||
Topic() string
|
||||
|
||||
// Partition returns the consumed partition
|
||||
Partition() int32
|
||||
|
||||
// InitialOffset returns the offset used for creating the PartitionConsumer instance.
|
||||
// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
|
||||
InitialOffset() int64
|
||||
|
||||
// MarkOffset marks the offset of a message as preocessed.
|
||||
MarkOffset(offset int64, metadata string)
|
||||
|
||||
// ResetOffset resets the offset to a previously processed message.
|
||||
ResetOffset(offset int64, metadata string)
|
||||
}
|
||||
|
||||
type partitionConsumer struct {
|
||||
sarama.PartitionConsumer
|
||||
|
||||
state partitionState
|
||||
mu sync.Mutex
|
||||
|
||||
topic string
|
||||
partition int32
|
||||
initialOffset int64
|
||||
|
||||
closeOnce sync.Once
|
||||
closeErr error
|
||||
|
||||
dying, dead chan none
|
||||
}
|
||||
|
||||
func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
|
||||
offset := info.NextOffset(defaultOffset)
|
||||
pcm, err := manager.ConsumePartition(topic, partition, offset)
|
||||
|
||||
// Resume from default offset, if requested offset is out-of-range
|
||||
if err == sarama.ErrOffsetOutOfRange {
|
||||
info.Offset = -1
|
||||
offset = defaultOffset
|
||||
pcm, err = manager.ConsumePartition(topic, partition, offset)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &partitionConsumer{
|
||||
PartitionConsumer: pcm,
|
||||
state: partitionState{Info: info},
|
||||
|
||||
topic: topic,
|
||||
partition: partition,
|
||||
initialOffset: offset,
|
||||
|
||||
dying: make(chan none),
|
||||
dead: make(chan none),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Topic implements PartitionConsumer
|
||||
func (c *partitionConsumer) Topic() string { return c.topic }
|
||||
|
||||
// Partition implements PartitionConsumer
|
||||
func (c *partitionConsumer) Partition() int32 { return c.partition }
|
||||
|
||||
// InitialOffset implements PartitionConsumer
|
||||
func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
|
||||
|
||||
// AsyncClose implements PartitionConsumer
|
||||
func (c *partitionConsumer) AsyncClose() {
|
||||
c.closeOnce.Do(func() {
|
||||
c.closeErr = c.PartitionConsumer.Close()
|
||||
close(c.dying)
|
||||
})
|
||||
}
|
||||
|
||||
// Close implements PartitionConsumer
|
||||
func (c *partitionConsumer) Close() error {
|
||||
c.AsyncClose()
|
||||
<-c.dead
|
||||
return c.closeErr
|
||||
}
|
||||
|
||||
func (c *partitionConsumer) waitFor(stopper <-chan none) {
|
||||
select {
|
||||
case <-stopper:
|
||||
case <-c.dying:
|
||||
}
|
||||
close(c.dead)
|
||||
}
|
||||
|
||||
func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
|
||||
defer close(c.dead)
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-c.Messages():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case messages <- msg:
|
||||
case <-stopper:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
case err, ok := <-c.Errors():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case errors <- err:
|
||||
case <-stopper:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
case <-stopper:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *partitionConsumer) getState() partitionState {
|
||||
c.mu.Lock()
|
||||
state := c.state
|
||||
c.mu.Unlock()
|
||||
|
||||
return state
|
||||
}
|
||||
|
||||
func (c *partitionConsumer) markCommitted(offset int64) {
|
||||
c.mu.Lock()
|
||||
if offset == c.state.Info.Offset {
|
||||
c.state.Dirty = false
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// MarkOffset implements PartitionConsumer
|
||||
func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
|
||||
c.mu.Lock()
|
||||
if next := offset + 1; next > c.state.Info.Offset {
|
||||
c.state.Info.Offset = next
|
||||
c.state.Info.Metadata = metadata
|
||||
c.state.Dirty = true
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// ResetOffset implements PartitionConsumer
|
||||
func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
|
||||
c.mu.Lock()
|
||||
if next := offset + 1; next <= c.state.Info.Offset {
|
||||
c.state.Info.Offset = next
|
||||
c.state.Info.Metadata = metadata
|
||||
c.state.Dirty = true
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type partitionState struct {
|
||||
Info offsetInfo
|
||||
Dirty bool
|
||||
LastCommit time.Time
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type partitionMap struct {
|
||||
data map[topicPartition]*partitionConsumer
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func newPartitionMap() *partitionMap {
|
||||
return &partitionMap{
|
||||
data: make(map[topicPartition]*partitionConsumer),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *partitionMap) IsSubscribedTo(topic string) bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
for tp := range m.data {
|
||||
if tp.Topic == topic {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
|
||||
m.mu.RLock()
|
||||
pc, _ := m.data[topicPartition{topic, partition}]
|
||||
m.mu.RUnlock()
|
||||
return pc
|
||||
}
|
||||
|
||||
func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
|
||||
m.mu.Lock()
|
||||
m.data[topicPartition{topic, partition}] = pc
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
snap := make(map[topicPartition]partitionState, len(m.data))
|
||||
for tp, pc := range m.data {
|
||||
snap[tp] = pc.getState()
|
||||
}
|
||||
return snap
|
||||
}
|
||||
|
||||
func (m *partitionMap) Stop() {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for tp := range m.data {
|
||||
wg.Add(1)
|
||||
go func(p *partitionConsumer) {
|
||||
_ = p.Close()
|
||||
wg.Done()
|
||||
}(m.data[tp])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (m *partitionMap) Clear() {
|
||||
m.mu.Lock()
|
||||
for tp := range m.data {
|
||||
delete(m.data, tp)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *partitionMap) Info() map[string][]int32 {
|
||||
info := make(map[string][]int32)
|
||||
m.mu.RLock()
|
||||
for tp := range m.data {
|
||||
info[tp.Topic] = append(info[tp.Topic], tp.Partition)
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
for topic := range info {
|
||||
sort.Sort(int32Slice(info[topic]))
|
||||
}
|
||||
return info
|
||||
}
|
||||
137
third/github.com/johng-cn/sarama-cluster/partitions_test.go
Normal file
137
third/github.com/johng-cn/sarama-cluster/partitions_test.go
Normal file
@ -0,0 +1,137 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/ginkgo"
|
||||
. "gitee.com/johng/gf/third/github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("partitionConsumer", func() {
|
||||
var subject *partitionConsumer
|
||||
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
subject, err = newPartitionConsumer(&mockConsumer{}, "topic", 0, offsetInfo{2000, "m3ta"}, sarama.OffsetOldest)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
close(subject.dead)
|
||||
Expect(subject.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should set state", func() {
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2000, "m3ta"},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should recover from default offset if requested offset is out of bounds", func() {
|
||||
pc, err := newPartitionConsumer(&mockConsumer{}, "topic", 0, offsetInfo{200, "m3ta"}, sarama.OffsetOldest)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer pc.Close()
|
||||
close(pc.dead)
|
||||
|
||||
state := pc.getState()
|
||||
Expect(state.Info.Offset).To(Equal(int64(-1)))
|
||||
Expect(state.Info.Metadata).To(Equal("m3ta"))
|
||||
})
|
||||
|
||||
It("should update state", func() {
|
||||
subject.MarkOffset(2001, "met@") // should set state
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2002, "met@"},
|
||||
Dirty: true,
|
||||
}))
|
||||
|
||||
subject.markCommitted(2002) // should reset dirty status
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2002, "met@"},
|
||||
}))
|
||||
|
||||
subject.MarkOffset(2001, "me7a") // should not update state
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2002, "met@"},
|
||||
}))
|
||||
|
||||
subject.MarkOffset(2002, "me7a") // should bump state
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2003, "me7a"},
|
||||
Dirty: true,
|
||||
}))
|
||||
|
||||
// After committing a later offset, try rewinding back to earlier offset with new metadata.
|
||||
subject.ResetOffset(2001, "met@")
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2002, "met@"},
|
||||
Dirty: true,
|
||||
}))
|
||||
|
||||
subject.markCommitted(2002) // should not unset state
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2002, "met@"},
|
||||
}))
|
||||
|
||||
subject.MarkOffset(2002, "me7a") // should bump state
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2003, "me7a"},
|
||||
Dirty: true,
|
||||
}))
|
||||
|
||||
subject.markCommitted(2003)
|
||||
Expect(subject.getState()).To(Equal(partitionState{
|
||||
Info: offsetInfo{2003, "me7a"},
|
||||
}))
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
var _ = Describe("partitionMap", func() {
|
||||
var subject *partitionMap
|
||||
|
||||
BeforeEach(func() {
|
||||
subject = newPartitionMap()
|
||||
})
|
||||
|
||||
It("should fetch/store", func() {
|
||||
Expect(subject.Fetch("topic", 0)).To(BeNil())
|
||||
|
||||
pc, err := newPartitionConsumer(&mockConsumer{}, "topic", 0, offsetInfo{2000, "m3ta"}, sarama.OffsetNewest)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
subject.Store("topic", 0, pc)
|
||||
Expect(subject.Fetch("topic", 0)).To(Equal(pc))
|
||||
Expect(subject.Fetch("topic", 1)).To(BeNil())
|
||||
Expect(subject.Fetch("other", 0)).To(BeNil())
|
||||
})
|
||||
|
||||
It("should return info", func() {
|
||||
pc0, err := newPartitionConsumer(&mockConsumer{}, "topic", 0, offsetInfo{2000, "m3ta"}, sarama.OffsetNewest)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
pc1, err := newPartitionConsumer(&mockConsumer{}, "topic", 1, offsetInfo{2000, "m3ta"}, sarama.OffsetNewest)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
subject.Store("topic", 0, pc0)
|
||||
subject.Store("topic", 1, pc1)
|
||||
|
||||
info := subject.Info()
|
||||
Expect(info).To(HaveLen(1))
|
||||
Expect(info).To(HaveKeyWithValue("topic", []int32{0, 1}))
|
||||
})
|
||||
|
||||
It("should create snapshots", func() {
|
||||
pc0, err := newPartitionConsumer(&mockConsumer{}, "topic", 0, offsetInfo{2000, "m3ta"}, sarama.OffsetNewest)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
pc1, err := newPartitionConsumer(&mockConsumer{}, "topic", 1, offsetInfo{2000, "m3ta"}, sarama.OffsetNewest)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
subject.Store("topic", 0, pc0)
|
||||
subject.Store("topic", 1, pc1)
|
||||
subject.Fetch("topic", 1).MarkOffset(2000, "met@")
|
||||
|
||||
Expect(subject.Snapshot()).To(Equal(map[topicPartition]partitionState{
|
||||
{"topic", 0}: {Info: offsetInfo{2000, "m3ta"}, Dirty: false},
|
||||
{"topic", 1}: {Info: offsetInfo{2001, "met@"}, Dirty: true},
|
||||
}))
|
||||
})
|
||||
|
||||
})
|
||||
75
third/github.com/johng-cn/sarama-cluster/util.go
Normal file
75
third/github.com/johng-cn/sarama-cluster/util.go
Normal file
@ -0,0 +1,75 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type none struct{}
|
||||
|
||||
type topicPartition struct {
|
||||
Topic string
|
||||
Partition int32
|
||||
}
|
||||
|
||||
func (tp *topicPartition) String() string {
|
||||
return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition)
|
||||
}
|
||||
|
||||
type offsetInfo struct {
|
||||
Offset int64
|
||||
Metadata string
|
||||
}
|
||||
|
||||
func (i offsetInfo) NextOffset(fallback int64) int64 {
|
||||
if i.Offset > -1 {
|
||||
return i.Offset
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
type int32Slice []int32
|
||||
|
||||
func (p int32Slice) Len() int { return len(p) }
|
||||
func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
func (p int32Slice) Diff(o int32Slice) (res []int32) {
|
||||
on := len(o)
|
||||
for _, x := range p {
|
||||
n := sort.Search(on, func(i int) bool { return o[i] >= x })
|
||||
if n < on && o[n] == x {
|
||||
continue
|
||||
}
|
||||
res = append(res, x)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type loopTomb struct {
|
||||
c chan none
|
||||
o sync.Once
|
||||
w sync.WaitGroup
|
||||
}
|
||||
|
||||
func newLoopTomb() *loopTomb {
|
||||
return &loopTomb{c: make(chan none)}
|
||||
}
|
||||
|
||||
func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) }
|
||||
func (t *loopTomb) Close() { t.stop(); t.w.Wait() }
|
||||
|
||||
func (t *loopTomb) Dying() <-chan none { return t.c }
|
||||
func (t *loopTomb) Go(f func(<-chan none)) {
|
||||
t.w.Add(1)
|
||||
|
||||
go func() {
|
||||
defer t.stop()
|
||||
defer t.w.Done()
|
||||
|
||||
f(t.c)
|
||||
}()
|
||||
}
|
||||
Reference in New Issue
Block a user