mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
remove sarama-cluster
This commit is contained in:
@ -1,4 +0,0 @@
|
||||
*.log
|
||||
*.pid
|
||||
kafka*/
|
||||
vendor/
|
||||
@ -1,18 +0,0 @@
|
||||
sudo: false
|
||||
language: go
|
||||
go:
|
||||
- 1.10.x
|
||||
- 1.9.x
|
||||
install:
|
||||
- go get -u github.com/golang/dep/cmd/dep
|
||||
- dep ensure
|
||||
env:
|
||||
- SCALA_VERSION=2.12 KAFKA_VERSION=0.11.0.1
|
||||
- SCALA_VERSION=2.12 KAFKA_VERSION=1.0.1
|
||||
- SCALA_VERSION=2.12 KAFKA_VERSION=1.1.0
|
||||
script:
|
||||
- make default test-race
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
- oracle-java8-set-default
|
||||
@ -1,22 +0,0 @@
|
||||
(The MIT License)
|
||||
|
||||
Copyright (c) 2017 Black Square Media Ltd
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
'Software'), to deal in the Software without restriction, including
|
||||
without limitation the rights to use, copy, modify, merge, publish,
|
||||
distribute, sublicense, and/or sell copies of the Software, and to
|
||||
permit persons to whom the Software is furnished to do so, subject to
|
||||
the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be
|
||||
included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@ -1,35 +0,0 @@
|
||||
SCALA_VERSION?= 2.12
|
||||
KAFKA_VERSION?= 1.1.0
|
||||
KAFKA_DIR= kafka_$(SCALA_VERSION)-$(KAFKA_VERSION)
|
||||
KAFKA_SRC= https://archive.apache.org/dist/kafka/$(KAFKA_VERSION)/$(KAFKA_DIR).tgz
|
||||
KAFKA_ROOT= testdata/$(KAFKA_DIR)
|
||||
PKG=$(shell go list ./... | grep -v vendor)
|
||||
|
||||
default: vet test
|
||||
|
||||
vet:
|
||||
go vet $(PKG)
|
||||
|
||||
test: testdeps
|
||||
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60
|
||||
|
||||
test-verbose: testdeps
|
||||
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v
|
||||
|
||||
test-race: testdeps
|
||||
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v -race
|
||||
|
||||
testdeps: $(KAFKA_ROOT)
|
||||
|
||||
doc: README.md
|
||||
|
||||
.PHONY: test testdeps vet doc
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
|
||||
$(KAFKA_ROOT):
|
||||
@mkdir -p $(dir $@)
|
||||
cd $(dir $@) && curl -sSL $(KAFKA_SRC) | tar xz
|
||||
|
||||
README.md: README.md.tpl $(wildcard *.go)
|
||||
becca -package $(subst $(GOPATH)/src/,,$(PWD))
|
||||
@ -1,151 +0,0 @@
|
||||
# Sarama Cluster
|
||||
|
||||
[](https://godoc.org/github.com/bsm/sarama-cluster)
|
||||
[](https://travis-ci.org/bsm/sarama-cluster)
|
||||
[](https://goreportcard.com/report/github.com/bsm/sarama-cluster)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
|
||||
Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
|
||||
|
||||
## Documentation
|
||||
|
||||
Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
|
||||
|
||||
## Examples
|
||||
|
||||
Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
|
||||
topics and partitions are all passed to the single channel:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
cluster "github.com/gogf/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
// init (custom) config, enable errors and notifications
|
||||
config := cluster.NewConfig()
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Group.Return.Notifications = true
|
||||
|
||||
// init consumer
|
||||
brokers := []string{"127.0.0.1:9092"}
|
||||
topics := []string{"my_topic", "other_topic"}
|
||||
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
|
||||
// trap SIGINT to trigger a shutdown.
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt)
|
||||
|
||||
// consume errors
|
||||
go func() {
|
||||
for err := range consumer.Errors() {
|
||||
log.Printf("Error: %s\n", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
// consume notifications
|
||||
go func() {
|
||||
for ntf := range consumer.Notifications() {
|
||||
log.Printf("Rebalanced: %+v\n", ntf)
|
||||
}
|
||||
}()
|
||||
|
||||
// consume messages, watch signals
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-consumer.Messages():
|
||||
if ok {
|
||||
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
|
||||
consumer.MarkOffset(msg, "") // mark message as processed
|
||||
}
|
||||
case <-signals:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
|
||||
consumers:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
cluster "github.com/gogf/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
// init (custom) config, set mode to ConsumerModePartitions
|
||||
config := cluster.NewConfig()
|
||||
config.Group.Mode = cluster.ConsumerModePartitions
|
||||
|
||||
// init consumer
|
||||
brokers := []string{"127.0.0.1:9092"}
|
||||
topics := []string{"my_topic", "other_topic"}
|
||||
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
|
||||
// trap SIGINT to trigger a shutdown.
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt)
|
||||
|
||||
// consume partitions
|
||||
for {
|
||||
select {
|
||||
case part, ok := <-consumer.Partitions():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// start a separate goroutine to consume messages
|
||||
go func(pc cluster.PartitionConsumer) {
|
||||
for msg := range pc.Messages() {
|
||||
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
|
||||
consumer.MarkOffset(msg, "") // mark message as processed
|
||||
}
|
||||
}(part)
|
||||
case <-signals:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Running tests
|
||||
|
||||
You need to install Ginkgo & Gomega to run tests. Please see
|
||||
http://onsi.github.io/ginkgo for more details.
|
||||
|
||||
To run tests, call:
|
||||
|
||||
$ make test
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Consumer not receiving any messages?
|
||||
|
||||
By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
|
||||
|
||||
If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
|
||||
@ -1,67 +0,0 @@
|
||||
# Sarama Cluster
|
||||
|
||||
[](https://godoc.org/github.com/bsm/sarama-cluster)
|
||||
[](https://travis-ci.org/bsm/sarama-cluster)
|
||||
[](https://goreportcard.com/report/github.com/bsm/sarama-cluster)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
|
||||
Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
|
||||
|
||||
## Documentation
|
||||
|
||||
Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
|
||||
|
||||
## Examples
|
||||
|
||||
Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
|
||||
topics and partitions are all passed to the single channel:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
cluster "github.com/gogf/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
func main() {{ "ExampleConsumer" | code }}
|
||||
```
|
||||
|
||||
Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
|
||||
consumers:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
cluster "github.com/gogf/gf/third/github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
func main() {{ "ExampleConsumer_Partitions" | code }}
|
||||
```
|
||||
|
||||
## Running tests
|
||||
|
||||
You need to install Ginkgo & Gomega to run tests. Please see
|
||||
http://onsi.github.io/ginkgo for more details.
|
||||
|
||||
To run tests, call:
|
||||
|
||||
$ make test
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Consumer not receiving any messages?
|
||||
|
||||
By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
|
||||
|
||||
If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
|
||||
@ -1,172 +0,0 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/gogf/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// NotificationType defines the type of notification
|
||||
type NotificationType uint8
|
||||
|
||||
// String describes the notification type
|
||||
func (t NotificationType) String() string {
|
||||
switch t {
|
||||
case RebalanceStart:
|
||||
return "rebalance start"
|
||||
case RebalanceOK:
|
||||
return "rebalance OK"
|
||||
case RebalanceError:
|
||||
return "rebalance error"
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
const (
|
||||
UnknownNotification NotificationType = iota
|
||||
RebalanceStart
|
||||
RebalanceOK
|
||||
RebalanceError
|
||||
)
|
||||
|
||||
// Notification are state events emitted by the consumers on rebalance
|
||||
type Notification struct {
|
||||
// Type exposes the notification type
|
||||
Type NotificationType
|
||||
|
||||
// Claimed contains topic/partitions that were claimed by this rebalance cycle
|
||||
Claimed map[string][]int32
|
||||
|
||||
// Released contains topic/partitions that were released as part of this rebalance cycle
|
||||
Released map[string][]int32
|
||||
|
||||
// Current are topic/partitions that are currently claimed to the consumer
|
||||
Current map[string][]int32
|
||||
}
|
||||
|
||||
func newNotification(current map[string][]int32) *Notification {
|
||||
return &Notification{
|
||||
Type: RebalanceStart,
|
||||
Current: current,
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notification) success(current map[string][]int32) *Notification {
|
||||
o := &Notification{
|
||||
Type: RebalanceOK,
|
||||
Claimed: make(map[string][]int32),
|
||||
Released: make(map[string][]int32),
|
||||
Current: current,
|
||||
}
|
||||
for topic, partitions := range current {
|
||||
o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
|
||||
}
|
||||
for topic, partitions := range n.Current {
|
||||
o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type topicInfo struct {
|
||||
Partitions []int32
|
||||
MemberIDs []string
|
||||
}
|
||||
|
||||
func (info topicInfo) Perform(s Strategy) map[string][]int32 {
|
||||
if s == StrategyRoundRobin {
|
||||
return info.RoundRobin()
|
||||
}
|
||||
return info.Ranges()
|
||||
}
|
||||
|
||||
func (info topicInfo) Ranges() map[string][]int32 {
|
||||
sort.Strings(info.MemberIDs)
|
||||
|
||||
mlen := len(info.MemberIDs)
|
||||
plen := len(info.Partitions)
|
||||
res := make(map[string][]int32, mlen)
|
||||
|
||||
for pos, memberID := range info.MemberIDs {
|
||||
n, i := float64(plen)/float64(mlen), float64(pos)
|
||||
min := int(math.Floor(i*n + 0.5))
|
||||
max := int(math.Floor((i+1)*n + 0.5))
|
||||
sub := info.Partitions[min:max]
|
||||
if len(sub) > 0 {
|
||||
res[memberID] = sub
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (info topicInfo) RoundRobin() map[string][]int32 {
|
||||
sort.Strings(info.MemberIDs)
|
||||
|
||||
mlen := len(info.MemberIDs)
|
||||
res := make(map[string][]int32, mlen)
|
||||
for i, pnum := range info.Partitions {
|
||||
memberID := info.MemberIDs[i%mlen]
|
||||
res[memberID] = append(res[memberID], pnum)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type balancer struct {
|
||||
client sarama.Client
|
||||
topics map[string]topicInfo
|
||||
strategy Strategy
|
||||
}
|
||||
|
||||
func newBalancerFromMeta(client sarama.Client, strategy Strategy, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
|
||||
balancer := newBalancer(client, strategy)
|
||||
for memberID, meta := range members {
|
||||
for _, topic := range meta.Topics {
|
||||
if err := balancer.Topic(topic, memberID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return balancer, nil
|
||||
}
|
||||
|
||||
func newBalancer(client sarama.Client, strategy Strategy) *balancer {
|
||||
return &balancer{
|
||||
client: client,
|
||||
topics: make(map[string]topicInfo),
|
||||
strategy: strategy,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *balancer) Topic(name string, memberID string) error {
|
||||
topic, ok := r.topics[name]
|
||||
if !ok {
|
||||
nums, err := r.client.Partitions(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
topic = topicInfo{
|
||||
Partitions: nums,
|
||||
MemberIDs: make([]string, 0, 1),
|
||||
}
|
||||
}
|
||||
topic.MemberIDs = append(topic.MemberIDs, memberID)
|
||||
r.topics[name] = topic
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *balancer) Perform() map[string]map[string][]int32 {
|
||||
res := make(map[string]map[string][]int32, 1)
|
||||
for topic, info := range r.topics {
|
||||
for memberID, partitions := range info.Perform(r.strategy) {
|
||||
if _, ok := res[memberID]; !ok {
|
||||
res[memberID] = make(map[string][]int32, 1)
|
||||
}
|
||||
res[memberID][topic] = partitions
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
@ -1,50 +0,0 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/gogf/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
var errClientInUse = errors.New("cluster: client is already used by another consumer")
|
||||
|
||||
// Client is a group client
|
||||
type Client struct {
|
||||
sarama.Client
|
||||
config Config
|
||||
|
||||
inUse uint32
|
||||
}
|
||||
|
||||
// NewClient creates a new client instance
|
||||
func NewClient(addrs []string, config *Config) (*Client, error) {
|
||||
if config == nil {
|
||||
config = NewConfig()
|
||||
}
|
||||
|
||||
if err := config.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := sarama.NewClient(addrs, &config.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Client{Client: client, config: *config}, nil
|
||||
}
|
||||
|
||||
// ClusterConfig returns the cluster configuration.
|
||||
func (c *Client) ClusterConfig() *Config {
|
||||
cfg := c.config
|
||||
return &cfg
|
||||
}
|
||||
|
||||
func (c *Client) claim() bool {
|
||||
return atomic.CompareAndSwapUint32(&c.inUse, 0, 1)
|
||||
}
|
||||
|
||||
func (c *Client) release() {
|
||||
atomic.CompareAndSwapUint32(&c.inUse, 1, 0)
|
||||
}
|
||||
@ -1,25 +0,0 @@
|
||||
package cluster
|
||||
|
||||
// Strategy for partition to consumer assignement
|
||||
type Strategy string
|
||||
|
||||
const (
|
||||
// StrategyRange is the default and assigns partition ranges to consumers.
|
||||
// Example with six partitions and two consumers:
|
||||
// C1: [0, 1, 2]
|
||||
// C2: [3, 4, 5]
|
||||
StrategyRange Strategy = "range"
|
||||
|
||||
// StrategyRoundRobin assigns partitions by alternating over consumers.
|
||||
// Example with six partitions and two consumers:
|
||||
// C1: [0, 2, 4]
|
||||
// C2: [1, 3, 5]
|
||||
StrategyRoundRobin Strategy = "roundrobin"
|
||||
)
|
||||
|
||||
// Error instances are wrappers for internal errors with a context and
|
||||
// may be returned through the consumer's Errors() channel
|
||||
type Error struct {
|
||||
Ctx string
|
||||
error
|
||||
}
|
||||
@ -1,146 +0,0 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
var minVersion = sarama.V0_9_0_0
|
||||
|
||||
type ConsumerMode uint8
|
||||
|
||||
const (
|
||||
ConsumerModeMultiplex ConsumerMode = iota
|
||||
ConsumerModePartitions
|
||||
)
|
||||
|
||||
// Config extends sarama.Config with Group specific namespace
|
||||
type Config struct {
|
||||
sarama.Config
|
||||
|
||||
// Group is the namespace for group management properties
|
||||
Group struct {
|
||||
|
||||
// The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
|
||||
PartitionStrategy Strategy
|
||||
|
||||
// By default, messages and errors from the subscribed topics and partitions are all multiplexed and
|
||||
// made available through the consumer's Messages() and Errors() channels.
|
||||
//
|
||||
// Users who require low-level access can enable ConsumerModePartitions where individual partitions
|
||||
// are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions
|
||||
// themselves.
|
||||
Mode ConsumerMode
|
||||
|
||||
Offsets struct {
|
||||
Retry struct {
|
||||
// The number retries when committing offsets (defaults to 3).
|
||||
Max int
|
||||
}
|
||||
Synchronization struct {
|
||||
// The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance
|
||||
// NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration
|
||||
DwellTime time.Duration
|
||||
}
|
||||
}
|
||||
|
||||
Session struct {
|
||||
// The allowed session timeout for registered consumers (defaults to 30s).
|
||||
// Must be within the allowed server range.
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
Heartbeat struct {
|
||||
// Interval between each heartbeat (defaults to 3s). It should be no more
|
||||
// than 1/3rd of the Group.Session.Timeout setting
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
// Return specifies which group channels will be populated. If they are set to true,
|
||||
// you must read from the respective channels to prevent deadlock.
|
||||
Return struct {
|
||||
// If enabled, rebalance notification will be returned on the
|
||||
// Notifications channel (default disabled).
|
||||
Notifications bool
|
||||
}
|
||||
|
||||
Topics struct {
|
||||
// An additional whitelist of topics to subscribe to.
|
||||
Whitelist *regexp.Regexp
|
||||
// An additional blacklist of topics to avoid. If set, this will precede over
|
||||
// the Whitelist setting.
|
||||
Blacklist *regexp.Regexp
|
||||
}
|
||||
|
||||
Member struct {
|
||||
// Custom metadata to include when joining the group. The user data for all joined members
|
||||
// can be retrieved by sending a DescribeGroupRequest to the broker that is the
|
||||
// coordinator for the group.
|
||||
UserData []byte
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewConfig returns a new configuration instance with sane defaults.
|
||||
func NewConfig() *Config {
|
||||
c := &Config{
|
||||
Config: *sarama.NewConfig(),
|
||||
}
|
||||
c.Group.PartitionStrategy = StrategyRange
|
||||
c.Group.Offsets.Retry.Max = 3
|
||||
c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime
|
||||
c.Group.Session.Timeout = 30 * time.Second
|
||||
c.Group.Heartbeat.Interval = 3 * time.Second
|
||||
c.Config.Version = minVersion
|
||||
return c
|
||||
}
|
||||
|
||||
// Validate checks a Config instance. It will return a
|
||||
// sarama.ConfigurationError if the specified values don't make sense.
|
||||
func (c *Config) Validate() error {
|
||||
if c.Group.Heartbeat.Interval%time.Millisecond != 0 {
|
||||
sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
|
||||
}
|
||||
if c.Group.Session.Timeout%time.Millisecond != 0 {
|
||||
sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
|
||||
}
|
||||
if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin {
|
||||
sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.")
|
||||
}
|
||||
if !c.Version.IsAtLeast(minVersion) {
|
||||
sarama.Logger.Println("Version is not supported; 0.9. will be assumed.")
|
||||
c.Version = minVersion
|
||||
}
|
||||
if err := c.Config.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// validate the Group values
|
||||
switch {
|
||||
case c.Group.Offsets.Retry.Max < 0:
|
||||
return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0")
|
||||
case c.Group.Offsets.Synchronization.DwellTime <= 0:
|
||||
return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0")
|
||||
case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute:
|
||||
return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m")
|
||||
case c.Group.Heartbeat.Interval <= 0:
|
||||
return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0")
|
||||
case c.Group.Session.Timeout <= 0:
|
||||
return sarama.ConfigurationError("Group.Session.Timeout must be > 0")
|
||||
case !c.Metadata.Full && c.Group.Topics.Whitelist != nil:
|
||||
return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used")
|
||||
case !c.Metadata.Full && c.Group.Topics.Blacklist != nil:
|
||||
return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used")
|
||||
}
|
||||
|
||||
// ensure offset is correct
|
||||
switch c.Consumer.Offsets.Initial {
|
||||
case sarama.OffsetOldest, sarama.OffsetNewest:
|
||||
default:
|
||||
return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1,952 +0,0 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// Consumer is a cluster group consumer
|
||||
type Consumer struct {
|
||||
client *Client
|
||||
ownClient bool
|
||||
|
||||
consumer sarama.Consumer
|
||||
subs *partitionMap
|
||||
|
||||
consumerID string
|
||||
groupID string
|
||||
|
||||
memberID string
|
||||
generationID int32
|
||||
membershipMu sync.RWMutex
|
||||
|
||||
coreTopics []string
|
||||
extraTopics []string
|
||||
|
||||
dying, dead chan none
|
||||
closeOnce sync.Once
|
||||
|
||||
consuming int32
|
||||
messages chan *sarama.ConsumerMessage
|
||||
errors chan error
|
||||
partitions chan PartitionConsumer
|
||||
notifications chan *Notification
|
||||
|
||||
customOffsets map[string]map[int32]offsetInfo
|
||||
|
||||
commitMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewConsumer initializes a new consumer
|
||||
func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
|
||||
client, err := NewClient(addrs, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
consumer, err := NewConsumerFromClient(client, groupID, topics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
consumer.ownClient = true
|
||||
return consumer, nil
|
||||
}
|
||||
|
||||
// NewConsumerFromClient initializes a new consumer from an existing client.
|
||||
//
|
||||
// Please note that clients cannot be shared between consumers (due to Kafka internals),
|
||||
// they can only be re-used which requires the user to call Close() on the first consumer
|
||||
// before using this method again to initialize another one. Attempts to use a client with
|
||||
// more than one consumer at a time will return errors.
|
||||
func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
|
||||
if !client.claim() {
|
||||
return nil, errClientInUse
|
||||
}
|
||||
|
||||
consumer, err := sarama.NewConsumerFromClient(client.Client)
|
||||
if err != nil {
|
||||
client.release()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sort.Strings(topics)
|
||||
c := &Consumer{
|
||||
client: client,
|
||||
consumer: consumer,
|
||||
subs: newPartitionMap(),
|
||||
groupID: groupID,
|
||||
|
||||
coreTopics: topics,
|
||||
|
||||
dying: make(chan none),
|
||||
dead: make(chan none),
|
||||
|
||||
messages: make(chan *sarama.ConsumerMessage),
|
||||
errors: make(chan error, client.config.ChannelBufferSize),
|
||||
partitions: make(chan PartitionConsumer, 1),
|
||||
notifications: make(chan *Notification),
|
||||
|
||||
customOffsets: make(map[string]map[int32]offsetInfo),
|
||||
}
|
||||
if err := c.client.RefreshCoordinator(groupID); err != nil {
|
||||
client.release()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go c.mainLoop()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Messages returns the read channel for the messages that are returned by
|
||||
// the broker.
|
||||
//
|
||||
// This channel will only return if Config.Group.Mode option is set to
|
||||
// ConsumerModeMultiplex (default).
|
||||
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
|
||||
|
||||
// Partitions returns the read channels for individual partitions of this broker.
|
||||
//
|
||||
// This will channel will only return if Config.Group.Mode option is set to
|
||||
// ConsumerModePartitions.
|
||||
//
|
||||
// The Partitions() channel must be listened to for the life of this consumer;
|
||||
// when a rebalance happens old partitions will be closed (naturally come to
|
||||
// completion) and new ones will be emitted. The returned channel will only close
|
||||
// when the consumer is completely shut down.
|
||||
func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
|
||||
|
||||
// Errors returns a read channel of errors that occur during offset management, if
|
||||
// enabled. By default, errors are logged and not returned over this channel. If
|
||||
// you want to implement any custom error handling, set your config's
|
||||
// Consumer.Return.Errors setting to true, and read from this channel.
|
||||
func (c *Consumer) Errors() <-chan error { return c.errors }
|
||||
|
||||
// Notifications returns a channel of Notifications that occur during consumer
|
||||
// rebalancing. Notifications will only be emitted over this channel, if your config's
|
||||
// Group.Return.Notifications setting to true.
|
||||
func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
|
||||
|
||||
// HighWaterMarks returns the current high water marks for each topic and partition
|
||||
// Consistency between partitions is not guaranteed since high water marks are updated separately.
|
||||
func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
|
||||
|
||||
// MarkOffset marks the provided message as processed, alongside a metadata string
|
||||
// that represents the state of the partition consumer at that point in time. The
|
||||
// metadata string can be used by another consumer to restore that state, so it
|
||||
// can resume consumption.
|
||||
//
|
||||
// Note: calling MarkOffset does not necessarily commit the offset to the backend
|
||||
// store immediately for efficiency reasons, and it may never be committed if
|
||||
// your application crashes. This means that you may end up processing the same
|
||||
// message twice, and your processing should ideally be idempotent.
|
||||
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
|
||||
sub.MarkOffset(msg.Offset, metadata)
|
||||
} else {
|
||||
c.markCustomOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
|
||||
// See MarkOffset for additional explanation.
|
||||
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
if sub := c.subs.Fetch(topic, partition); sub != nil {
|
||||
sub.MarkOffset(offset, metadata)
|
||||
} else {
|
||||
c.markCustomOffset(topic, partition, offset, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) markCustomOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
if _, ok := c.customOffsets[topic]; !ok {
|
||||
c.customOffsets[topic] = make(map[int32]offsetInfo)
|
||||
}
|
||||
c.customOffsets[topic][partition] = offsetInfo {
|
||||
Offset : offset,
|
||||
Metadata : metadata,
|
||||
}
|
||||
}
|
||||
|
||||
// MarkOffsets marks stashed offsets as processed.
|
||||
// See MarkOffset for additional explanation.
|
||||
func (c *Consumer) MarkOffsets(s *OffsetStash) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for tp, info := range s.offsets {
|
||||
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
|
||||
sub.MarkOffset(info.Offset, info.Metadata)
|
||||
} else {
|
||||
c.markCustomOffset(tp.Topic, tp.Partition, info.Offset, info.Metadata)
|
||||
}
|
||||
delete(s.offsets, tp)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetOffsets marks the provided message as processed, alongside a metadata string
|
||||
// that represents the state of the partition consumer at that point in time. The
|
||||
// metadata string can be used by another consumer to restore that state, so it
|
||||
// can resume consumption.
|
||||
//
|
||||
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
|
||||
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
|
||||
sub.ResetOffset(msg.Offset, metadata)
|
||||
} else {
|
||||
c.markCustomOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
|
||||
// See ResetOffset for additional explanation.
|
||||
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
sub := c.subs.Fetch(topic, partition)
|
||||
if sub != nil {
|
||||
sub.ResetOffset(offset, metadata)
|
||||
} else {
|
||||
c.markCustomOffset(topic, partition, offset, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetOffsets marks stashed offsets as processed.
|
||||
// See ResetOffset for additional explanation.
|
||||
func (c *Consumer) ResetOffsets(s *OffsetStash) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for tp, info := range s.offsets {
|
||||
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
|
||||
sub.ResetOffset(info.Offset, info.Metadata)
|
||||
}
|
||||
delete(s.offsets, tp)
|
||||
}
|
||||
}
|
||||
|
||||
// Subscriptions returns the consumed topics and partitions
|
||||
func (c *Consumer) Subscriptions() map[string][]int32 {
|
||||
return c.subs.Info()
|
||||
}
|
||||
|
||||
// CommitOffsets allows to manually commit previously marked offsets. By default there is no
|
||||
// need to call this function as the consumer will commit offsets automatically
|
||||
// using the Config.Consumer.Offsets.CommitInterval setting.
|
||||
//
|
||||
// Please be aware that calling this function during an internal rebalance cycle may return
|
||||
// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
|
||||
func (c *Consumer) CommitOffsets() error {
|
||||
c.commitMu.Lock()
|
||||
defer c.commitMu.Unlock()
|
||||
|
||||
memberID, generationID := c.membership()
|
||||
req := &sarama.OffsetCommitRequest{
|
||||
Version: 2,
|
||||
ConsumerGroup: c.groupID,
|
||||
ConsumerGroupGeneration: generationID,
|
||||
ConsumerID: memberID,
|
||||
RetentionTime: -1,
|
||||
}
|
||||
|
||||
if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
|
||||
req.RetentionTime = int64(ns / time.Millisecond)
|
||||
}
|
||||
|
||||
snap := c.subs.Snapshot()
|
||||
dirty := false
|
||||
for tp, state := range snap {
|
||||
if state.Dirty {
|
||||
dirty = true
|
||||
req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
|
||||
}
|
||||
}
|
||||
if !dirty {
|
||||
return nil
|
||||
}
|
||||
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := broker.CommitOffset(req)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
|
||||
for topic, errs := range resp.Errors {
|
||||
for partition, kerr := range errs {
|
||||
if kerr != sarama.ErrNoError {
|
||||
err = kerr
|
||||
} else if state, ok := snap[topicPartition{topic, partition}]; ok {
|
||||
if sub := c.subs.Fetch(topic, partition); sub != nil {
|
||||
sub.markCommitted(state.Info.Offset)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Close safely closes the consumer and releases all resources
|
||||
func (c *Consumer) Close() (err error) {
|
||||
c.closeOnce.Do(func() {
|
||||
close(c.dying)
|
||||
<-c.dead
|
||||
|
||||
if e := c.release(); e != nil {
|
||||
err = e
|
||||
}
|
||||
if e := c.consumer.Close(); e != nil {
|
||||
err = e
|
||||
}
|
||||
close(c.messages)
|
||||
close(c.errors)
|
||||
|
||||
if e := c.leaveGroup(); e != nil {
|
||||
err = e
|
||||
}
|
||||
close(c.partitions)
|
||||
close(c.notifications)
|
||||
|
||||
// drain
|
||||
for range c.messages {
|
||||
}
|
||||
for range c.errors {
|
||||
}
|
||||
for p := range c.partitions {
|
||||
_ = p.Close()
|
||||
}
|
||||
for range c.notifications {
|
||||
}
|
||||
|
||||
c.client.release()
|
||||
if c.ownClient {
|
||||
if e := c.client.Close(); e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Consumer) mainLoop() {
|
||||
defer close(c.dead)
|
||||
defer atomic.StoreInt32(&c.consuming, 0)
|
||||
|
||||
for {
|
||||
atomic.StoreInt32(&c.consuming, 0)
|
||||
|
||||
// Check if close was requested
|
||||
select {
|
||||
case <-c.dying:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Start next consume cycle
|
||||
c.nextTick()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) nextTick() {
|
||||
// Remember previous subscriptions
|
||||
var notification *Notification
|
||||
if c.client.config.Group.Return.Notifications {
|
||||
notification = newNotification(c.subs.Info())
|
||||
}
|
||||
|
||||
// Refresh coordinator
|
||||
if err := c.refreshCoordinator(); err != nil {
|
||||
c.rebalanceError(err, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Release subscriptions
|
||||
if err := c.release(); err != nil {
|
||||
c.rebalanceError(err, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Issue rebalance start notification
|
||||
if c.client.config.Group.Return.Notifications {
|
||||
c.handleNotification(newNotification(c.subs.Info()))
|
||||
}
|
||||
|
||||
// Rebalance, fetch new subscriptions
|
||||
subs, err := c.rebalance()
|
||||
if err != nil {
|
||||
c.rebalanceError(err, notification)
|
||||
return
|
||||
}
|
||||
|
||||
// Coordinate loops, make sure everything is
|
||||
// stopped on exit
|
||||
tomb := newLoopTomb()
|
||||
defer tomb.Close()
|
||||
|
||||
// Start the heartbeat
|
||||
tomb.Go(c.hbLoop)
|
||||
|
||||
// Subscribe to topic/partitions
|
||||
if err := c.subscribe(tomb, subs); err != nil {
|
||||
c.rebalanceError(err, notification)
|
||||
return
|
||||
}
|
||||
|
||||
// Update/issue notification with new claims
|
||||
if c.client.config.Group.Return.Notifications {
|
||||
notification = notification.success(subs)
|
||||
c.handleNotification(notification)
|
||||
}
|
||||
|
||||
// Start topic watcher loop
|
||||
tomb.Go(c.twLoop)
|
||||
|
||||
// Start consuming and committing offsets
|
||||
tomb.Go(c.cmLoop)
|
||||
atomic.StoreInt32(&c.consuming, 1)
|
||||
|
||||
// Wait for signals
|
||||
select {
|
||||
case <-tomb.Dying():
|
||||
case <-c.dying:
|
||||
}
|
||||
}
|
||||
|
||||
// heartbeat loop, triggered by the mainLoop
|
||||
func (c *Consumer) hbLoop(stopped <-chan none) {
|
||||
ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
switch err := c.heartbeat(); err {
|
||||
case nil, sarama.ErrNoError:
|
||||
case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
|
||||
return
|
||||
default:
|
||||
c.handleError(&Error{Ctx: "heartbeat", error: err})
|
||||
return
|
||||
}
|
||||
case <-stopped:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// topic watcher loop, triggered by the mainLoop
|
||||
func (c *Consumer) twLoop(stopped <-chan none) {
|
||||
ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
topics, err := c.client.Topics()
|
||||
if err != nil {
|
||||
c.handleError(&Error{Ctx: "topics", error: err})
|
||||
return
|
||||
}
|
||||
|
||||
for _, topic := range topics {
|
||||
if !c.isKnownCoreTopic(topic) &&
|
||||
!c.isKnownExtraTopic(topic) &&
|
||||
c.isPotentialExtraTopic(topic) {
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-stopped:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// commit loop, triggered by the mainLoop
|
||||
func (c *Consumer) cmLoop(stopped <-chan none) {
|
||||
ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
|
||||
c.handleError(&Error{Ctx: "commit", error: err})
|
||||
return
|
||||
}
|
||||
case <-stopped:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) rebalanceError(err error, n *Notification) {
|
||||
if n != nil {
|
||||
n.Type = RebalanceError
|
||||
c.handleNotification(n)
|
||||
}
|
||||
|
||||
switch err {
|
||||
case sarama.ErrRebalanceInProgress:
|
||||
default:
|
||||
c.handleError(&Error{Ctx: "rebalance", error: err})
|
||||
}
|
||||
|
||||
select {
|
||||
case <-c.dying:
|
||||
case <-time.After(c.client.config.Metadata.Retry.Backoff):
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) handleNotification(n *Notification) {
|
||||
if c.client.config.Group.Return.Notifications {
|
||||
select {
|
||||
case c.notifications <- n:
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) handleError(e *Error) {
|
||||
if c.client.config.Consumer.Return.Errors {
|
||||
select {
|
||||
case c.errors <- e:
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
} else {
|
||||
sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Releases the consumer and commits offsets, called from rebalance() and Close()
|
||||
func (c *Consumer) release() (err error) {
|
||||
// Stop all consumers
|
||||
c.subs.Stop()
|
||||
|
||||
// Clear subscriptions on exit
|
||||
defer c.subs.Clear()
|
||||
|
||||
// Wait for messages to be processed
|
||||
timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
|
||||
defer timeout.Stop()
|
||||
|
||||
select {
|
||||
case <-c.dying:
|
||||
case <-timeout.C:
|
||||
}
|
||||
|
||||
// Commit offsets, continue on errors
|
||||
if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
|
||||
err = e
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
// Performs a heartbeat, part of the mainLoop()
|
||||
func (c *Consumer) heartbeat() error {
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
|
||||
memberID, generationID := c.membership()
|
||||
resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
|
||||
GroupId: c.groupID,
|
||||
MemberId: memberID,
|
||||
GenerationId: generationID,
|
||||
})
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
return resp.Err
|
||||
}
|
||||
|
||||
// Performs a rebalance, part of the mainLoop()
|
||||
func (c *Consumer) rebalance() (map[string][]int32, error) {
|
||||
memberID, _ := c.membership()
|
||||
sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
|
||||
|
||||
allTopics, err := c.client.Topics()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.extraTopics = c.selectExtraTopics(allTopics)
|
||||
sort.Strings(c.extraTopics)
|
||||
|
||||
// Re-join consumer group
|
||||
strategy, err := c.joinGroup()
|
||||
switch {
|
||||
case err == sarama.ErrUnknownMemberId:
|
||||
c.membershipMu.Lock()
|
||||
c.memberID = ""
|
||||
c.membershipMu.Unlock()
|
||||
return nil, err
|
||||
case err != nil:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Sync consumer group state, fetch subscriptions
|
||||
subs, err := c.syncGroup(strategy)
|
||||
switch {
|
||||
case err == sarama.ErrRebalanceInProgress:
|
||||
return nil, err
|
||||
case err != nil:
|
||||
_ = c.leaveGroup()
|
||||
return nil, err
|
||||
}
|
||||
return subs, nil
|
||||
}
|
||||
|
||||
// Performs the subscription, part of the mainLoop()
|
||||
func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
|
||||
// fetch offsets
|
||||
offsets, err := c.fetchOffsets(subs)
|
||||
if err != nil {
|
||||
_ = c.leaveGroup()
|
||||
return err
|
||||
}
|
||||
|
||||
// create consumers in parallel
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for topic, partitions := range subs {
|
||||
for _, partition := range partitions {
|
||||
wg.Add(1)
|
||||
|
||||
info := offsets[topic][partition]
|
||||
if item, ok := c.customOffsets[topic]; ok {
|
||||
if i, ok := item[partition]; ok {
|
||||
info = i
|
||||
}
|
||||
}
|
||||
go func(topic string, partition int32) {
|
||||
if e := c.createConsumer(tomb, topic, partition, info); e != nil {
|
||||
mu.Lock()
|
||||
err = e
|
||||
mu.Unlock()
|
||||
}
|
||||
wg.Done()
|
||||
}(topic, partition)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if err != nil {
|
||||
_ = c.release()
|
||||
_ = c.leaveGroup()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
// Send a request to the broker to join group on rebalance()
|
||||
func (c *Consumer) joinGroup() (*balancer, error) {
|
||||
memberID, _ := c.membership()
|
||||
req := &sarama.JoinGroupRequest{
|
||||
GroupId: c.groupID,
|
||||
MemberId: memberID,
|
||||
SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
|
||||
ProtocolType: "consumer",
|
||||
}
|
||||
|
||||
meta := &sarama.ConsumerGroupMemberMetadata{
|
||||
Version: 1,
|
||||
Topics: append(c.coreTopics, c.extraTopics...),
|
||||
UserData: c.client.config.Group.Member.UserData,
|
||||
}
|
||||
err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := broker.JoinGroup(req)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
} else if resp.Err != sarama.ErrNoError {
|
||||
c.closeCoordinator(broker, resp.Err)
|
||||
return nil, resp.Err
|
||||
}
|
||||
|
||||
var strategy *balancer
|
||||
if resp.LeaderId == resp.MemberId {
|
||||
members, err := resp.GetMembers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
strategy, err = newBalancerFromMeta(c.client, Strategy(resp.GroupProtocol), members)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
c.membershipMu.Lock()
|
||||
c.memberID = resp.MemberId
|
||||
c.generationID = resp.GenerationId
|
||||
c.membershipMu.Unlock()
|
||||
|
||||
return strategy, nil
|
||||
}
|
||||
|
||||
// Send a request to the broker to sync the group on rebalance().
|
||||
// Returns a list of topics and partitions to consume.
|
||||
func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
|
||||
memberID, generationID := c.membership()
|
||||
req := &sarama.SyncGroupRequest{
|
||||
GroupId: c.groupID,
|
||||
MemberId: memberID,
|
||||
GenerationId: generationID,
|
||||
}
|
||||
|
||||
if strategy != nil {
|
||||
for memberID, topics := range strategy.Perform() {
|
||||
if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
|
||||
Topics: topics,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := broker.SyncGroup(req)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
} else if resp.Err != sarama.ErrNoError {
|
||||
c.closeCoordinator(broker, resp.Err)
|
||||
return nil, resp.Err
|
||||
}
|
||||
|
||||
// Return if there is nothing to subscribe to
|
||||
if len(resp.MemberAssignment) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Get assigned subscriptions
|
||||
members, err := resp.GetMemberAssignment()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Sort partitions, for each topic
|
||||
for topic := range members.Topics {
|
||||
sort.Sort(int32Slice(members.Topics[topic]))
|
||||
}
|
||||
return members.Topics, nil
|
||||
}
|
||||
|
||||
// Fetches latest committed offsets for all subscriptions
|
||||
func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
|
||||
offsets := make(map[string]map[int32]offsetInfo, len(subs))
|
||||
req := &sarama.OffsetFetchRequest{
|
||||
Version: 1,
|
||||
ConsumerGroup: c.groupID,
|
||||
}
|
||||
|
||||
for topic, partitions := range subs {
|
||||
offsets[topic] = make(map[int32]offsetInfo, len(partitions))
|
||||
for _, partition := range partitions {
|
||||
offsets[topic][partition] = offsetInfo{Offset: -1}
|
||||
req.AddPartition(topic, partition)
|
||||
}
|
||||
}
|
||||
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := broker.FetchOffset(req)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for topic, partitions := range subs {
|
||||
for _, partition := range partitions {
|
||||
block := resp.GetBlock(topic, partition)
|
||||
if block == nil {
|
||||
return nil, sarama.ErrIncompleteResponse
|
||||
}
|
||||
|
||||
if block.Err == sarama.ErrNoError {
|
||||
offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
|
||||
} else {
|
||||
return nil, block.Err
|
||||
}
|
||||
}
|
||||
}
|
||||
return offsets, nil
|
||||
}
|
||||
|
||||
// Send a request to the broker to leave the group on failes rebalance() and on Close()
|
||||
func (c *Consumer) leaveGroup() error {
|
||||
broker, err := c.client.Coordinator(c.groupID)
|
||||
if err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
return err
|
||||
}
|
||||
|
||||
memberID, _ := c.membership()
|
||||
if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
|
||||
GroupId: c.groupID,
|
||||
MemberId: memberID,
|
||||
}); err != nil {
|
||||
c.closeCoordinator(broker, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
|
||||
memberID, _ := c.membership()
|
||||
sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
|
||||
|
||||
// Create partitionConsumer
|
||||
pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store in subscriptions
|
||||
c.subs.Store(topic, partition, pc)
|
||||
|
||||
// Start partition consumer goroutine
|
||||
tomb.Go(func(stopper <-chan none) {
|
||||
if c.client.config.Group.Mode == ConsumerModePartitions {
|
||||
pc.waitFor(stopper)
|
||||
} else {
|
||||
pc.multiplex(stopper, c.messages, c.errors)
|
||||
}
|
||||
})
|
||||
|
||||
if c.client.config.Group.Mode == ConsumerModePartitions {
|
||||
select {
|
||||
case c.partitions <- pc:
|
||||
case <-c.dying:
|
||||
pc.Close()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Consumer) commitOffsetsWithRetry(retries int) error {
|
||||
err := c.CommitOffsets()
|
||||
if err != nil && retries > 0 {
|
||||
return c.commitOffsetsWithRetry(retries - 1)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
|
||||
if broker != nil {
|
||||
_ = broker.Close()
|
||||
}
|
||||
|
||||
switch err {
|
||||
case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
|
||||
_ = c.client.RefreshCoordinator(c.groupID)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) selectExtraTopics(allTopics []string) []string {
|
||||
extra := allTopics[:0]
|
||||
for _, topic := range allTopics {
|
||||
if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
|
||||
extra = append(extra, topic)
|
||||
}
|
||||
}
|
||||
return extra
|
||||
}
|
||||
|
||||
func (c *Consumer) isKnownCoreTopic(topic string) bool {
|
||||
pos := sort.SearchStrings(c.coreTopics, topic)
|
||||
return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
|
||||
}
|
||||
|
||||
func (c *Consumer) isKnownExtraTopic(topic string) bool {
|
||||
pos := sort.SearchStrings(c.extraTopics, topic)
|
||||
return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
|
||||
}
|
||||
|
||||
func (c *Consumer) isPotentialExtraTopic(topic string) bool {
|
||||
rx := c.client.config.Group.Topics
|
||||
if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
|
||||
return false
|
||||
}
|
||||
if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *Consumer) refreshCoordinator() error {
|
||||
if err := c.refreshMetadata(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.client.RefreshCoordinator(c.groupID)
|
||||
}
|
||||
|
||||
func (c *Consumer) refreshMetadata() (err error) {
|
||||
if c.client.config.Metadata.Full {
|
||||
err = c.client.RefreshMetadata()
|
||||
} else {
|
||||
var topics []string
|
||||
if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
|
||||
err = c.client.RefreshMetadata(topics...)
|
||||
}
|
||||
}
|
||||
|
||||
// maybe we didn't have authorization to describe all topics
|
||||
switch err {
|
||||
case sarama.ErrTopicAuthorizationFailed:
|
||||
err = c.client.RefreshMetadata(c.coreTopics...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Consumer) membership() (memberID string, generationID int32) {
|
||||
c.membershipMu.RLock()
|
||||
memberID, generationID = c.memberID, c.generationID
|
||||
c.membershipMu.RUnlock()
|
||||
return
|
||||
}
|
||||
@ -1,8 +0,0 @@
|
||||
/*
|
||||
Package cluster provides cluster extensions for Sarama, enabing users
|
||||
to consume topics across from multiple, balanced nodes.
|
||||
|
||||
It requires Kafka v0.9+ and follows the steps guide, described in:
|
||||
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
|
||||
*/
|
||||
package cluster
|
||||
@ -1,69 +0,0 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/gogf/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// OffsetStash allows to accumulate offsets and
|
||||
// mark them as processed in a bulk
|
||||
type OffsetStash struct {
|
||||
offsets map[topicPartition]offsetInfo
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewOffsetStash inits a blank stash
|
||||
func NewOffsetStash() *OffsetStash {
|
||||
return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
|
||||
}
|
||||
|
||||
// MarkOffset stashes the provided message offset
|
||||
func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||||
}
|
||||
|
||||
// MarkPartitionOffset stashes the offset for the provided topic/partition combination
|
||||
func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
key := topicPartition{Topic: topic, Partition: partition}
|
||||
if info := s.offsets[key]; offset >= info.Offset {
|
||||
info.Offset = offset
|
||||
info.Metadata = metadata
|
||||
s.offsets[key] = info
|
||||
}
|
||||
}
|
||||
|
||||
// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
|
||||
// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
|
||||
func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
key := topicPartition{Topic: topic, Partition: partition}
|
||||
if info := s.offsets[key]; offset <= info.Offset {
|
||||
info.Offset = offset
|
||||
info.Metadata = metadata
|
||||
s.offsets[key] = info
|
||||
}
|
||||
}
|
||||
|
||||
// ResetOffset stashes the provided message offset
|
||||
// See ResetPartitionOffset for explanation
|
||||
func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||||
}
|
||||
|
||||
// Offsets returns the latest stashed offsets by topic-partition
|
||||
func (s *OffsetStash) Offsets() map[string]int64 {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
res := make(map[string]int64, len(s.offsets))
|
||||
for tp, info := range s.offsets {
|
||||
res[tp.String()] = info.Offset
|
||||
}
|
||||
return res
|
||||
}
|
||||
@ -1,274 +0,0 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/third/github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// PartitionConsumer allows code to consume individual partitions from the cluster.
|
||||
//
|
||||
// See docs for Consumer.Partitions() for more on how to implement this.
|
||||
type PartitionConsumer interface {
|
||||
sarama.PartitionConsumer
|
||||
|
||||
// Topic returns the consumed topic name
|
||||
Topic() string
|
||||
|
||||
// Partition returns the consumed partition
|
||||
Partition() int32
|
||||
|
||||
// InitialOffset returns the offset used for creating the PartitionConsumer instance.
|
||||
// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
|
||||
InitialOffset() int64
|
||||
|
||||
// MarkOffset marks the offset of a message as preocessed.
|
||||
MarkOffset(offset int64, metadata string)
|
||||
|
||||
// ResetOffset resets the offset to a previously processed message.
|
||||
ResetOffset(offset int64, metadata string)
|
||||
}
|
||||
|
||||
type partitionConsumer struct {
|
||||
sarama.PartitionConsumer
|
||||
|
||||
state partitionState
|
||||
mu sync.Mutex
|
||||
|
||||
topic string
|
||||
partition int32
|
||||
initialOffset int64
|
||||
|
||||
closeOnce sync.Once
|
||||
closeErr error
|
||||
|
||||
dying, dead chan none
|
||||
}
|
||||
|
||||
func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
|
||||
offset := info.NextOffset(defaultOffset)
|
||||
pcm, err := manager.ConsumePartition(topic, partition, offset)
|
||||
|
||||
// Resume from default offset, if requested offset is out-of-range
|
||||
if err == sarama.ErrOffsetOutOfRange {
|
||||
info.Offset = -1
|
||||
offset = defaultOffset
|
||||
pcm, err = manager.ConsumePartition(topic, partition, offset)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &partitionConsumer{
|
||||
PartitionConsumer: pcm,
|
||||
state: partitionState{Info: info},
|
||||
|
||||
topic: topic,
|
||||
partition: partition,
|
||||
initialOffset: offset,
|
||||
|
||||
dying: make(chan none),
|
||||
dead: make(chan none),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Topic implements PartitionConsumer
|
||||
func (c *partitionConsumer) Topic() string { return c.topic }
|
||||
|
||||
// Partition implements PartitionConsumer
|
||||
func (c *partitionConsumer) Partition() int32 { return c.partition }
|
||||
|
||||
// InitialOffset implements PartitionConsumer
|
||||
func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
|
||||
|
||||
// AsyncClose implements PartitionConsumer
|
||||
func (c *partitionConsumer) AsyncClose() {
|
||||
c.closeOnce.Do(func() {
|
||||
c.closeErr = c.PartitionConsumer.Close()
|
||||
close(c.dying)
|
||||
})
|
||||
}
|
||||
|
||||
// Close implements PartitionConsumer
|
||||
func (c *partitionConsumer) Close() error {
|
||||
c.AsyncClose()
|
||||
<-c.dead
|
||||
return c.closeErr
|
||||
}
|
||||
|
||||
func (c *partitionConsumer) waitFor(stopper <-chan none) {
|
||||
select {
|
||||
case <-stopper:
|
||||
case <-c.dying:
|
||||
}
|
||||
close(c.dead)
|
||||
}
|
||||
|
||||
func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
|
||||
defer close(c.dead)
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-c.Messages():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case messages <- msg:
|
||||
case <-stopper:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
case err, ok := <-c.Errors():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case errors <- err:
|
||||
case <-stopper:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
case <-stopper:
|
||||
return
|
||||
case <-c.dying:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *partitionConsumer) getState() partitionState {
|
||||
c.mu.Lock()
|
||||
state := c.state
|
||||
c.mu.Unlock()
|
||||
|
||||
return state
|
||||
}
|
||||
|
||||
func (c *partitionConsumer) markCommitted(offset int64) {
|
||||
c.mu.Lock()
|
||||
if offset == c.state.Info.Offset {
|
||||
c.state.Dirty = false
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// MarkOffset implements PartitionConsumer
|
||||
func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
|
||||
c.mu.Lock()
|
||||
if next := offset + 1; next > c.state.Info.Offset {
|
||||
c.state.Info.Offset = next
|
||||
c.state.Info.Metadata = metadata
|
||||
c.state.Dirty = true
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// ResetOffset implements PartitionConsumer
|
||||
func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
|
||||
c.mu.Lock()
|
||||
if next := offset + 1; next <= c.state.Info.Offset {
|
||||
c.state.Info.Offset = next
|
||||
c.state.Info.Metadata = metadata
|
||||
c.state.Dirty = true
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type partitionState struct {
|
||||
Info offsetInfo
|
||||
Dirty bool
|
||||
LastCommit time.Time
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type partitionMap struct {
|
||||
data map[topicPartition]*partitionConsumer
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func newPartitionMap() *partitionMap {
|
||||
return &partitionMap{
|
||||
data: make(map[topicPartition]*partitionConsumer),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *partitionMap) IsSubscribedTo(topic string) bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
for tp := range m.data {
|
||||
if tp.Topic == topic {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
|
||||
m.mu.RLock()
|
||||
pc, _ := m.data[topicPartition{topic, partition}]
|
||||
m.mu.RUnlock()
|
||||
return pc
|
||||
}
|
||||
|
||||
func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
|
||||
m.mu.Lock()
|
||||
m.data[topicPartition{topic, partition}] = pc
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
snap := make(map[topicPartition]partitionState, len(m.data))
|
||||
for tp, pc := range m.data {
|
||||
snap[tp] = pc.getState()
|
||||
}
|
||||
return snap
|
||||
}
|
||||
|
||||
func (m *partitionMap) Stop() {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for tp := range m.data {
|
||||
wg.Add(1)
|
||||
go func(p *partitionConsumer) {
|
||||
_ = p.Close()
|
||||
wg.Done()
|
||||
}(m.data[tp])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (m *partitionMap) Clear() {
|
||||
m.mu.Lock()
|
||||
for tp := range m.data {
|
||||
delete(m.data, tp)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *partitionMap) Info() map[string][]int32 {
|
||||
info := make(map[string][]int32)
|
||||
m.mu.RLock()
|
||||
for tp := range m.data {
|
||||
info[tp.Topic] = append(info[tp.Topic], tp.Partition)
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
for topic := range info {
|
||||
sort.Sort(int32Slice(info[topic]))
|
||||
}
|
||||
return info
|
||||
}
|
||||
@ -1,75 +0,0 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type none struct{}
|
||||
|
||||
type topicPartition struct {
|
||||
Topic string
|
||||
Partition int32
|
||||
}
|
||||
|
||||
func (tp *topicPartition) String() string {
|
||||
return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition)
|
||||
}
|
||||
|
||||
type offsetInfo struct {
|
||||
Offset int64
|
||||
Metadata string
|
||||
}
|
||||
|
||||
func (i offsetInfo) NextOffset(fallback int64) int64 {
|
||||
if i.Offset > -1 {
|
||||
return i.Offset
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
type int32Slice []int32
|
||||
|
||||
func (p int32Slice) Len() int { return len(p) }
|
||||
func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
func (p int32Slice) Diff(o int32Slice) (res []int32) {
|
||||
on := len(o)
|
||||
for _, x := range p {
|
||||
n := sort.Search(on, func(i int) bool { return o[i] >= x })
|
||||
if n < on && o[n] == x {
|
||||
continue
|
||||
}
|
||||
res = append(res, x)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type loopTomb struct {
|
||||
c chan none
|
||||
o sync.Once
|
||||
w sync.WaitGroup
|
||||
}
|
||||
|
||||
func newLoopTomb() *loopTomb {
|
||||
return &loopTomb{c: make(chan none)}
|
||||
}
|
||||
|
||||
func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) }
|
||||
func (t *loopTomb) Close() { t.stop(); t.w.Wait() }
|
||||
|
||||
func (t *loopTomb) Dying() <-chan none { return t.c }
|
||||
func (t *loopTomb) Go(f func(<-chan none)) {
|
||||
t.w.Add(1)
|
||||
|
||||
go func() {
|
||||
defer t.stop()
|
||||
defer t.w.Done()
|
||||
|
||||
f(t.c)
|
||||
}()
|
||||
}
|
||||
Reference in New Issue
Block a user