diff --git a/third/github.com/johng-cn/sarama-cluster/.gitignore b/third/github.com/johng-cn/sarama-cluster/.gitignore deleted file mode 100644 index 88113c5b2..000000000 --- a/third/github.com/johng-cn/sarama-cluster/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -*.log -*.pid -kafka*/ -vendor/ diff --git a/third/github.com/johng-cn/sarama-cluster/.travis.yml b/third/github.com/johng-cn/sarama-cluster/.travis.yml deleted file mode 100644 index 07c7c10fe..000000000 --- a/third/github.com/johng-cn/sarama-cluster/.travis.yml +++ /dev/null @@ -1,18 +0,0 @@ -sudo: false -language: go -go: - - 1.10.x - - 1.9.x -install: - - go get -u github.com/golang/dep/cmd/dep - - dep ensure -env: - - SCALA_VERSION=2.12 KAFKA_VERSION=0.11.0.1 - - SCALA_VERSION=2.12 KAFKA_VERSION=1.0.1 - - SCALA_VERSION=2.12 KAFKA_VERSION=1.1.0 -script: - - make default test-race -addons: - apt: - packages: - - oracle-java8-set-default diff --git a/third/github.com/johng-cn/sarama-cluster/LICENSE b/third/github.com/johng-cn/sarama-cluster/LICENSE deleted file mode 100644 index 127751c47..000000000 --- a/third/github.com/johng-cn/sarama-cluster/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -(The MIT License) - -Copyright (c) 2017 Black Square Media Ltd - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -'Software'), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, -TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE -SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/third/github.com/johng-cn/sarama-cluster/Makefile b/third/github.com/johng-cn/sarama-cluster/Makefile deleted file mode 100644 index 25c5bc207..000000000 --- a/third/github.com/johng-cn/sarama-cluster/Makefile +++ /dev/null @@ -1,35 +0,0 @@ -SCALA_VERSION?= 2.12 -KAFKA_VERSION?= 1.1.0 -KAFKA_DIR= kafka_$(SCALA_VERSION)-$(KAFKA_VERSION) -KAFKA_SRC= https://archive.apache.org/dist/kafka/$(KAFKA_VERSION)/$(KAFKA_DIR).tgz -KAFKA_ROOT= testdata/$(KAFKA_DIR) -PKG=$(shell go list ./... | grep -v vendor) - -default: vet test - -vet: - go vet $(PKG) - -test: testdeps - KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 - -test-verbose: testdeps - KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v - -test-race: testdeps - KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v -race - -testdeps: $(KAFKA_ROOT) - -doc: README.md - -.PHONY: test testdeps vet doc - -# --------------------------------------------------------------------- - -$(KAFKA_ROOT): - @mkdir -p $(dir $@) - cd $(dir $@) && curl -sSL $(KAFKA_SRC) | tar xz - -README.md: README.md.tpl $(wildcard *.go) - becca -package $(subst $(GOPATH)/src/,,$(PWD)) diff --git a/third/github.com/johng-cn/sarama-cluster/README.md b/third/github.com/johng-cn/sarama-cluster/README.md deleted file mode 100644 index 327371000..000000000 --- a/third/github.com/johng-cn/sarama-cluster/README.md +++ /dev/null @@ -1,151 +0,0 @@ -# Sarama Cluster - -[![GoDoc](https://godoc.org/github.com/bsm/sarama-cluster?status.svg)](https://godoc.org/github.com/bsm/sarama-cluster) -[![Build Status](https://travis-ci.org/bsm/sarama-cluster.svg?branch=master)](https://travis-ci.org/bsm/sarama-cluster) -[![Go Report Card](https://goreportcard.com/badge/github.com/bsm/sarama-cluster)](https://goreportcard.com/report/github.com/bsm/sarama-cluster) -[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) - -Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later). - -## Documentation - -Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster - -## Examples - -Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple -topics and partitions are all passed to the single channel: - -```go -package main - -import ( - "fmt" - "log" - "os" - "os/signal" - - cluster "github.com/gogf/gf/third/github.com/bsm/sarama-cluster" -) - -func main() { - - // init (custom) config, enable errors and notifications - config := cluster.NewConfig() - config.Consumer.Return.Errors = true - config.Group.Return.Notifications = true - - // init consumer - brokers := []string{"127.0.0.1:9092"} - topics := []string{"my_topic", "other_topic"} - consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) - if err != nil { - panic(err) - } - defer consumer.Close() - - // trap SIGINT to trigger a shutdown. - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - - // consume errors - go func() { - for err := range consumer.Errors() { - log.Printf("Error: %s\n", err.Error()) - } - }() - - // consume notifications - go func() { - for ntf := range consumer.Notifications() { - log.Printf("Rebalanced: %+v\n", ntf) - } - }() - - // consume messages, watch signals - for { - select { - case msg, ok := <-consumer.Messages(): - if ok { - fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) - consumer.MarkOffset(msg, "") // mark message as processed - } - case <-signals: - return - } - } -} -``` - -Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level -consumers: - -```go -package main - -import ( - "fmt" - "log" - "os" - "os/signal" - - cluster "github.com/gogf/gf/third/github.com/bsm/sarama-cluster" -) - -func main() { - - // init (custom) config, set mode to ConsumerModePartitions - config := cluster.NewConfig() - config.Group.Mode = cluster.ConsumerModePartitions - - // init consumer - brokers := []string{"127.0.0.1:9092"} - topics := []string{"my_topic", "other_topic"} - consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) - if err != nil { - panic(err) - } - defer consumer.Close() - - // trap SIGINT to trigger a shutdown. - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - - // consume partitions - for { - select { - case part, ok := <-consumer.Partitions(): - if !ok { - return - } - - // start a separate goroutine to consume messages - go func(pc cluster.PartitionConsumer) { - for msg := range pc.Messages() { - fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) - consumer.MarkOffset(msg, "") // mark message as processed - } - }(part) - case <-signals: - return - } - } -} -``` - -## Running tests - -You need to install Ginkgo & Gomega to run tests. Please see -http://onsi.github.io/ginkgo for more details. - -To run tests, call: - - $ make test - -## Troubleshooting - -### Consumer not receiving any messages? - -By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written. - -If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`. diff --git a/third/github.com/johng-cn/sarama-cluster/README.md.tpl b/third/github.com/johng-cn/sarama-cluster/README.md.tpl deleted file mode 100644 index 16b71ee57..000000000 --- a/third/github.com/johng-cn/sarama-cluster/README.md.tpl +++ /dev/null @@ -1,67 +0,0 @@ -# Sarama Cluster - -[![GoDoc](https://godoc.org/github.com/bsm/sarama-cluster?status.svg)](https://godoc.org/github.com/bsm/sarama-cluster) -[![Build Status](https://travis-ci.org/bsm/sarama-cluster.svg?branch=master)](https://travis-ci.org/bsm/sarama-cluster) -[![Go Report Card](https://goreportcard.com/badge/github.com/bsm/sarama-cluster)](https://goreportcard.com/report/github.com/bsm/sarama-cluster) -[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) - -Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later). - -## Documentation - -Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster - -## Examples - -Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple -topics and partitions are all passed to the single channel: - -```go -package main - -import ( - "fmt" - "log" - "os" - "os/signal" - - cluster "github.com/gogf/gf/third/github.com/bsm/sarama-cluster" -) - -func main() {{ "ExampleConsumer" | code }} -``` - -Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level -consumers: - -```go -package main - -import ( - "fmt" - "log" - "os" - "os/signal" - - cluster "github.com/gogf/gf/third/github.com/bsm/sarama-cluster" -) - -func main() {{ "ExampleConsumer_Partitions" | code }} -``` - -## Running tests - -You need to install Ginkgo & Gomega to run tests. Please see -http://onsi.github.io/ginkgo for more details. - -To run tests, call: - - $ make test - -## Troubleshooting - -### Consumer not receiving any messages? - -By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written. - -If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`. diff --git a/third/github.com/johng-cn/sarama-cluster/balancer.go b/third/github.com/johng-cn/sarama-cluster/balancer.go deleted file mode 100644 index 66d252f06..000000000 --- a/third/github.com/johng-cn/sarama-cluster/balancer.go +++ /dev/null @@ -1,172 +0,0 @@ -package cluster - -import ( - "math" - "sort" - - "github.com/gogf/gf/third/github.com/Shopify/sarama" -) - -// NotificationType defines the type of notification -type NotificationType uint8 - -// String describes the notification type -func (t NotificationType) String() string { - switch t { - case RebalanceStart: - return "rebalance start" - case RebalanceOK: - return "rebalance OK" - case RebalanceError: - return "rebalance error" - } - return "unknown" -} - -const ( - UnknownNotification NotificationType = iota - RebalanceStart - RebalanceOK - RebalanceError -) - -// Notification are state events emitted by the consumers on rebalance -type Notification struct { - // Type exposes the notification type - Type NotificationType - - // Claimed contains topic/partitions that were claimed by this rebalance cycle - Claimed map[string][]int32 - - // Released contains topic/partitions that were released as part of this rebalance cycle - Released map[string][]int32 - - // Current are topic/partitions that are currently claimed to the consumer - Current map[string][]int32 -} - -func newNotification(current map[string][]int32) *Notification { - return &Notification{ - Type: RebalanceStart, - Current: current, - } -} - -func (n *Notification) success(current map[string][]int32) *Notification { - o := &Notification{ - Type: RebalanceOK, - Claimed: make(map[string][]int32), - Released: make(map[string][]int32), - Current: current, - } - for topic, partitions := range current { - o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic])) - } - for topic, partitions := range n.Current { - o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic])) - } - return o -} - -// -------------------------------------------------------------------- - -type topicInfo struct { - Partitions []int32 - MemberIDs []string -} - -func (info topicInfo) Perform(s Strategy) map[string][]int32 { - if s == StrategyRoundRobin { - return info.RoundRobin() - } - return info.Ranges() -} - -func (info topicInfo) Ranges() map[string][]int32 { - sort.Strings(info.MemberIDs) - - mlen := len(info.MemberIDs) - plen := len(info.Partitions) - res := make(map[string][]int32, mlen) - - for pos, memberID := range info.MemberIDs { - n, i := float64(plen)/float64(mlen), float64(pos) - min := int(math.Floor(i*n + 0.5)) - max := int(math.Floor((i+1)*n + 0.5)) - sub := info.Partitions[min:max] - if len(sub) > 0 { - res[memberID] = sub - } - } - return res -} - -func (info topicInfo) RoundRobin() map[string][]int32 { - sort.Strings(info.MemberIDs) - - mlen := len(info.MemberIDs) - res := make(map[string][]int32, mlen) - for i, pnum := range info.Partitions { - memberID := info.MemberIDs[i%mlen] - res[memberID] = append(res[memberID], pnum) - } - return res -} - -// -------------------------------------------------------------------- - -type balancer struct { - client sarama.Client - topics map[string]topicInfo - strategy Strategy -} - -func newBalancerFromMeta(client sarama.Client, strategy Strategy, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { - balancer := newBalancer(client, strategy) - for memberID, meta := range members { - for _, topic := range meta.Topics { - if err := balancer.Topic(topic, memberID); err != nil { - return nil, err - } - } - } - return balancer, nil -} - -func newBalancer(client sarama.Client, strategy Strategy) *balancer { - return &balancer{ - client: client, - topics: make(map[string]topicInfo), - strategy: strategy, - } -} - -func (r *balancer) Topic(name string, memberID string) error { - topic, ok := r.topics[name] - if !ok { - nums, err := r.client.Partitions(name) - if err != nil { - return err - } - topic = topicInfo{ - Partitions: nums, - MemberIDs: make([]string, 0, 1), - } - } - topic.MemberIDs = append(topic.MemberIDs, memberID) - r.topics[name] = topic - return nil -} - -func (r *balancer) Perform() map[string]map[string][]int32 { - res := make(map[string]map[string][]int32, 1) - for topic, info := range r.topics { - for memberID, partitions := range info.Perform(r.strategy) { - if _, ok := res[memberID]; !ok { - res[memberID] = make(map[string][]int32, 1) - } - res[memberID][topic] = partitions - } - } - return res -} diff --git a/third/github.com/johng-cn/sarama-cluster/client.go b/third/github.com/johng-cn/sarama-cluster/client.go deleted file mode 100644 index b5350003b..000000000 --- a/third/github.com/johng-cn/sarama-cluster/client.go +++ /dev/null @@ -1,50 +0,0 @@ -package cluster - -import ( - "errors" - "sync/atomic" - - "github.com/gogf/gf/third/github.com/Shopify/sarama" -) - -var errClientInUse = errors.New("cluster: client is already used by another consumer") - -// Client is a group client -type Client struct { - sarama.Client - config Config - - inUse uint32 -} - -// NewClient creates a new client instance -func NewClient(addrs []string, config *Config) (*Client, error) { - if config == nil { - config = NewConfig() - } - - if err := config.Validate(); err != nil { - return nil, err - } - - client, err := sarama.NewClient(addrs, &config.Config) - if err != nil { - return nil, err - } - - return &Client{Client: client, config: *config}, nil -} - -// ClusterConfig returns the cluster configuration. -func (c *Client) ClusterConfig() *Config { - cfg := c.config - return &cfg -} - -func (c *Client) claim() bool { - return atomic.CompareAndSwapUint32(&c.inUse, 0, 1) -} - -func (c *Client) release() { - atomic.CompareAndSwapUint32(&c.inUse, 1, 0) -} diff --git a/third/github.com/johng-cn/sarama-cluster/cluster.go b/third/github.com/johng-cn/sarama-cluster/cluster.go deleted file mode 100644 index adcf0e9c1..000000000 --- a/third/github.com/johng-cn/sarama-cluster/cluster.go +++ /dev/null @@ -1,25 +0,0 @@ -package cluster - -// Strategy for partition to consumer assignement -type Strategy string - -const ( - // StrategyRange is the default and assigns partition ranges to consumers. - // Example with six partitions and two consumers: - // C1: [0, 1, 2] - // C2: [3, 4, 5] - StrategyRange Strategy = "range" - - // StrategyRoundRobin assigns partitions by alternating over consumers. - // Example with six partitions and two consumers: - // C1: [0, 2, 4] - // C2: [1, 3, 5] - StrategyRoundRobin Strategy = "roundrobin" -) - -// Error instances are wrappers for internal errors with a context and -// may be returned through the consumer's Errors() channel -type Error struct { - Ctx string - error -} diff --git a/third/github.com/johng-cn/sarama-cluster/config.go b/third/github.com/johng-cn/sarama-cluster/config.go deleted file mode 100644 index 340d60121..000000000 --- a/third/github.com/johng-cn/sarama-cluster/config.go +++ /dev/null @@ -1,146 +0,0 @@ -package cluster - -import ( - "regexp" - "time" - - "github.com/gogf/gf/third/github.com/Shopify/sarama" -) - -var minVersion = sarama.V0_9_0_0 - -type ConsumerMode uint8 - -const ( - ConsumerModeMultiplex ConsumerMode = iota - ConsumerModePartitions -) - -// Config extends sarama.Config with Group specific namespace -type Config struct { - sarama.Config - - // Group is the namespace for group management properties - Group struct { - - // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) - PartitionStrategy Strategy - - // By default, messages and errors from the subscribed topics and partitions are all multiplexed and - // made available through the consumer's Messages() and Errors() channels. - // - // Users who require low-level access can enable ConsumerModePartitions where individual partitions - // are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions - // themselves. - Mode ConsumerMode - - Offsets struct { - Retry struct { - // The number retries when committing offsets (defaults to 3). - Max int - } - Synchronization struct { - // The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance - // NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration - DwellTime time.Duration - } - } - - Session struct { - // The allowed session timeout for registered consumers (defaults to 30s). - // Must be within the allowed server range. - Timeout time.Duration - } - - Heartbeat struct { - // Interval between each heartbeat (defaults to 3s). It should be no more - // than 1/3rd of the Group.Session.Timeout setting - Interval time.Duration - } - - // Return specifies which group channels will be populated. If they are set to true, - // you must read from the respective channels to prevent deadlock. - Return struct { - // If enabled, rebalance notification will be returned on the - // Notifications channel (default disabled). - Notifications bool - } - - Topics struct { - // An additional whitelist of topics to subscribe to. - Whitelist *regexp.Regexp - // An additional blacklist of topics to avoid. If set, this will precede over - // the Whitelist setting. - Blacklist *regexp.Regexp - } - - Member struct { - // Custom metadata to include when joining the group. The user data for all joined members - // can be retrieved by sending a DescribeGroupRequest to the broker that is the - // coordinator for the group. - UserData []byte - } - } -} - -// NewConfig returns a new configuration instance with sane defaults. -func NewConfig() *Config { - c := &Config{ - Config: *sarama.NewConfig(), - } - c.Group.PartitionStrategy = StrategyRange - c.Group.Offsets.Retry.Max = 3 - c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime - c.Group.Session.Timeout = 30 * time.Second - c.Group.Heartbeat.Interval = 3 * time.Second - c.Config.Version = minVersion - return c -} - -// Validate checks a Config instance. It will return a -// sarama.ConfigurationError if the specified values don't make sense. -func (c *Config) Validate() error { - if c.Group.Heartbeat.Interval%time.Millisecond != 0 { - sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.") - } - if c.Group.Session.Timeout%time.Millisecond != 0 { - sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.") - } - if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin { - sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.") - } - if !c.Version.IsAtLeast(minVersion) { - sarama.Logger.Println("Version is not supported; 0.9. will be assumed.") - c.Version = minVersion - } - if err := c.Config.Validate(); err != nil { - return err - } - - // validate the Group values - switch { - case c.Group.Offsets.Retry.Max < 0: - return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0") - case c.Group.Offsets.Synchronization.DwellTime <= 0: - return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0") - case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute: - return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m") - case c.Group.Heartbeat.Interval <= 0: - return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0") - case c.Group.Session.Timeout <= 0: - return sarama.ConfigurationError("Group.Session.Timeout must be > 0") - case !c.Metadata.Full && c.Group.Topics.Whitelist != nil: - return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used") - case !c.Metadata.Full && c.Group.Topics.Blacklist != nil: - return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used") - } - - // ensure offset is correct - switch c.Consumer.Offsets.Initial { - case sarama.OffsetOldest, sarama.OffsetNewest: - default: - return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest") - } - - return nil -} diff --git a/third/github.com/johng-cn/sarama-cluster/consumer.go b/third/github.com/johng-cn/sarama-cluster/consumer.go deleted file mode 100644 index 2350d44e2..000000000 --- a/third/github.com/johng-cn/sarama-cluster/consumer.go +++ /dev/null @@ -1,952 +0,0 @@ -package cluster - -import ( - "sort" - "sync" - "sync/atomic" - "time" - - "github.com/gogf/gf/third/github.com/Shopify/sarama" -) - -// Consumer is a cluster group consumer -type Consumer struct { - client *Client - ownClient bool - - consumer sarama.Consumer - subs *partitionMap - - consumerID string - groupID string - - memberID string - generationID int32 - membershipMu sync.RWMutex - - coreTopics []string - extraTopics []string - - dying, dead chan none - closeOnce sync.Once - - consuming int32 - messages chan *sarama.ConsumerMessage - errors chan error - partitions chan PartitionConsumer - notifications chan *Notification - - customOffsets map[string]map[int32]offsetInfo - - commitMu sync.Mutex -} - -// NewConsumer initializes a new consumer -func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) { - client, err := NewClient(addrs, config) - if err != nil { - return nil, err - } - - consumer, err := NewConsumerFromClient(client, groupID, topics) - if err != nil { - return nil, err - } - consumer.ownClient = true - return consumer, nil -} - -// NewConsumerFromClient initializes a new consumer from an existing client. -// -// Please note that clients cannot be shared between consumers (due to Kafka internals), -// they can only be re-used which requires the user to call Close() on the first consumer -// before using this method again to initialize another one. Attempts to use a client with -// more than one consumer at a time will return errors. -func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) { - if !client.claim() { - return nil, errClientInUse - } - - consumer, err := sarama.NewConsumerFromClient(client.Client) - if err != nil { - client.release() - return nil, err - } - - sort.Strings(topics) - c := &Consumer{ - client: client, - consumer: consumer, - subs: newPartitionMap(), - groupID: groupID, - - coreTopics: topics, - - dying: make(chan none), - dead: make(chan none), - - messages: make(chan *sarama.ConsumerMessage), - errors: make(chan error, client.config.ChannelBufferSize), - partitions: make(chan PartitionConsumer, 1), - notifications: make(chan *Notification), - - customOffsets: make(map[string]map[int32]offsetInfo), - } - if err := c.client.RefreshCoordinator(groupID); err != nil { - client.release() - return nil, err - } - - go c.mainLoop() - return c, nil -} - -// Messages returns the read channel for the messages that are returned by -// the broker. -// -// This channel will only return if Config.Group.Mode option is set to -// ConsumerModeMultiplex (default). -func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages } - -// Partitions returns the read channels for individual partitions of this broker. -// -// This will channel will only return if Config.Group.Mode option is set to -// ConsumerModePartitions. -// -// The Partitions() channel must be listened to for the life of this consumer; -// when a rebalance happens old partitions will be closed (naturally come to -// completion) and new ones will be emitted. The returned channel will only close -// when the consumer is completely shut down. -func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions } - -// Errors returns a read channel of errors that occur during offset management, if -// enabled. By default, errors are logged and not returned over this channel. If -// you want to implement any custom error handling, set your config's -// Consumer.Return.Errors setting to true, and read from this channel. -func (c *Consumer) Errors() <-chan error { return c.errors } - -// Notifications returns a channel of Notifications that occur during consumer -// rebalancing. Notifications will only be emitted over this channel, if your config's -// Group.Return.Notifications setting to true. -func (c *Consumer) Notifications() <-chan *Notification { return c.notifications } - -// HighWaterMarks returns the current high water marks for each topic and partition -// Consistency between partitions is not guaranteed since high water marks are updated separately. -func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() } - -// MarkOffset marks the provided message as processed, alongside a metadata string -// that represents the state of the partition consumer at that point in time. The -// metadata string can be used by another consumer to restore that state, so it -// can resume consumption. -// -// Note: calling MarkOffset does not necessarily commit the offset to the backend -// store immediately for efficiency reasons, and it may never be committed if -// your application crashes. This means that you may end up processing the same -// message twice, and your processing should ideally be idempotent. -func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { - if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { - sub.MarkOffset(msg.Offset, metadata) - } else { - c.markCustomOffset(msg.Topic, msg.Partition, msg.Offset, metadata) - } -} - -// MarkPartitionOffset marks an offset of the provided topic/partition as processed. -// See MarkOffset for additional explanation. -func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { - if sub := c.subs.Fetch(topic, partition); sub != nil { - sub.MarkOffset(offset, metadata) - } else { - c.markCustomOffset(topic, partition, offset, metadata) - } -} - -func (c *Consumer) markCustomOffset(topic string, partition int32, offset int64, metadata string) { - if _, ok := c.customOffsets[topic]; !ok { - c.customOffsets[topic] = make(map[int32]offsetInfo) - } - c.customOffsets[topic][partition] = offsetInfo { - Offset : offset, - Metadata : metadata, - } -} - -// MarkOffsets marks stashed offsets as processed. -// See MarkOffset for additional explanation. -func (c *Consumer) MarkOffsets(s *OffsetStash) { - s.mu.Lock() - defer s.mu.Unlock() - - for tp, info := range s.offsets { - if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { - sub.MarkOffset(info.Offset, info.Metadata) - } else { - c.markCustomOffset(tp.Topic, tp.Partition, info.Offset, info.Metadata) - } - delete(s.offsets, tp) - } -} - -// ResetOffsets marks the provided message as processed, alongside a metadata string -// that represents the state of the partition consumer at that point in time. The -// metadata string can be used by another consumer to restore that state, so it -// can resume consumption. -// -// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset -func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { - if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { - sub.ResetOffset(msg.Offset, metadata) - } else { - c.markCustomOffset(msg.Topic, msg.Partition, msg.Offset, metadata) - } -} - -// ResetPartitionOffset marks an offset of the provided topic/partition as processed. -// See ResetOffset for additional explanation. -func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { - sub := c.subs.Fetch(topic, partition) - if sub != nil { - sub.ResetOffset(offset, metadata) - } else { - c.markCustomOffset(topic, partition, offset, metadata) - } -} - -// ResetOffsets marks stashed offsets as processed. -// See ResetOffset for additional explanation. -func (c *Consumer) ResetOffsets(s *OffsetStash) { - s.mu.Lock() - defer s.mu.Unlock() - - for tp, info := range s.offsets { - if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { - sub.ResetOffset(info.Offset, info.Metadata) - } - delete(s.offsets, tp) - } -} - -// Subscriptions returns the consumed topics and partitions -func (c *Consumer) Subscriptions() map[string][]int32 { - return c.subs.Info() -} - -// CommitOffsets allows to manually commit previously marked offsets. By default there is no -// need to call this function as the consumer will commit offsets automatically -// using the Config.Consumer.Offsets.CommitInterval setting. -// -// Please be aware that calling this function during an internal rebalance cycle may return -// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration). -func (c *Consumer) CommitOffsets() error { - c.commitMu.Lock() - defer c.commitMu.Unlock() - - memberID, generationID := c.membership() - req := &sarama.OffsetCommitRequest{ - Version: 2, - ConsumerGroup: c.groupID, - ConsumerGroupGeneration: generationID, - ConsumerID: memberID, - RetentionTime: -1, - } - - if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 { - req.RetentionTime = int64(ns / time.Millisecond) - } - - snap := c.subs.Snapshot() - dirty := false - for tp, state := range snap { - if state.Dirty { - dirty = true - req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata) - } - } - if !dirty { - return nil - } - - broker, err := c.client.Coordinator(c.groupID) - if err != nil { - c.closeCoordinator(broker, err) - return err - } - - resp, err := broker.CommitOffset(req) - if err != nil { - c.closeCoordinator(broker, err) - return err - } - - for topic, errs := range resp.Errors { - for partition, kerr := range errs { - if kerr != sarama.ErrNoError { - err = kerr - } else if state, ok := snap[topicPartition{topic, partition}]; ok { - if sub := c.subs.Fetch(topic, partition); sub != nil { - sub.markCommitted(state.Info.Offset) - } - } - } - } - return err -} - -// Close safely closes the consumer and releases all resources -func (c *Consumer) Close() (err error) { - c.closeOnce.Do(func() { - close(c.dying) - <-c.dead - - if e := c.release(); e != nil { - err = e - } - if e := c.consumer.Close(); e != nil { - err = e - } - close(c.messages) - close(c.errors) - - if e := c.leaveGroup(); e != nil { - err = e - } - close(c.partitions) - close(c.notifications) - - // drain - for range c.messages { - } - for range c.errors { - } - for p := range c.partitions { - _ = p.Close() - } - for range c.notifications { - } - - c.client.release() - if c.ownClient { - if e := c.client.Close(); e != nil { - err = e - } - } - }) - return -} - -func (c *Consumer) mainLoop() { - defer close(c.dead) - defer atomic.StoreInt32(&c.consuming, 0) - - for { - atomic.StoreInt32(&c.consuming, 0) - - // Check if close was requested - select { - case <-c.dying: - return - default: - } - - // Start next consume cycle - c.nextTick() - } -} - -func (c *Consumer) nextTick() { - // Remember previous subscriptions - var notification *Notification - if c.client.config.Group.Return.Notifications { - notification = newNotification(c.subs.Info()) - } - - // Refresh coordinator - if err := c.refreshCoordinator(); err != nil { - c.rebalanceError(err, nil) - return - } - - // Release subscriptions - if err := c.release(); err != nil { - c.rebalanceError(err, nil) - return - } - - // Issue rebalance start notification - if c.client.config.Group.Return.Notifications { - c.handleNotification(newNotification(c.subs.Info())) - } - - // Rebalance, fetch new subscriptions - subs, err := c.rebalance() - if err != nil { - c.rebalanceError(err, notification) - return - } - - // Coordinate loops, make sure everything is - // stopped on exit - tomb := newLoopTomb() - defer tomb.Close() - - // Start the heartbeat - tomb.Go(c.hbLoop) - - // Subscribe to topic/partitions - if err := c.subscribe(tomb, subs); err != nil { - c.rebalanceError(err, notification) - return - } - - // Update/issue notification with new claims - if c.client.config.Group.Return.Notifications { - notification = notification.success(subs) - c.handleNotification(notification) - } - - // Start topic watcher loop - tomb.Go(c.twLoop) - - // Start consuming and committing offsets - tomb.Go(c.cmLoop) - atomic.StoreInt32(&c.consuming, 1) - - // Wait for signals - select { - case <-tomb.Dying(): - case <-c.dying: - } -} - -// heartbeat loop, triggered by the mainLoop -func (c *Consumer) hbLoop(stopped <-chan none) { - ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - switch err := c.heartbeat(); err { - case nil, sarama.ErrNoError: - case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress: - return - default: - c.handleError(&Error{Ctx: "heartbeat", error: err}) - return - } - case <-stopped: - return - case <-c.dying: - return - } - } -} - -// topic watcher loop, triggered by the mainLoop -func (c *Consumer) twLoop(stopped <-chan none) { - ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - topics, err := c.client.Topics() - if err != nil { - c.handleError(&Error{Ctx: "topics", error: err}) - return - } - - for _, topic := range topics { - if !c.isKnownCoreTopic(topic) && - !c.isKnownExtraTopic(topic) && - c.isPotentialExtraTopic(topic) { - return - } - } - case <-stopped: - return - case <-c.dying: - return - } - } -} - -// commit loop, triggered by the mainLoop -func (c *Consumer) cmLoop(stopped <-chan none) { - ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil { - c.handleError(&Error{Ctx: "commit", error: err}) - return - } - case <-stopped: - return - case <-c.dying: - return - } - } -} - -func (c *Consumer) rebalanceError(err error, n *Notification) { - if n != nil { - n.Type = RebalanceError - c.handleNotification(n) - } - - switch err { - case sarama.ErrRebalanceInProgress: - default: - c.handleError(&Error{Ctx: "rebalance", error: err}) - } - - select { - case <-c.dying: - case <-time.After(c.client.config.Metadata.Retry.Backoff): - } -} - -func (c *Consumer) handleNotification(n *Notification) { - if c.client.config.Group.Return.Notifications { - select { - case c.notifications <- n: - case <-c.dying: - return - } - } -} - -func (c *Consumer) handleError(e *Error) { - if c.client.config.Consumer.Return.Errors { - select { - case c.errors <- e: - case <-c.dying: - return - } - } else { - sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error()) - } -} - -// Releases the consumer and commits offsets, called from rebalance() and Close() -func (c *Consumer) release() (err error) { - // Stop all consumers - c.subs.Stop() - - // Clear subscriptions on exit - defer c.subs.Clear() - - // Wait for messages to be processed - timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime) - defer timeout.Stop() - - select { - case <-c.dying: - case <-timeout.C: - } - - // Commit offsets, continue on errors - if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil { - err = e - } - - return -} - -// -------------------------------------------------------------------- - -// Performs a heartbeat, part of the mainLoop() -func (c *Consumer) heartbeat() error { - broker, err := c.client.Coordinator(c.groupID) - if err != nil { - c.closeCoordinator(broker, err) - return err - } - - memberID, generationID := c.membership() - resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{ - GroupId: c.groupID, - MemberId: memberID, - GenerationId: generationID, - }) - if err != nil { - c.closeCoordinator(broker, err) - return err - } - return resp.Err -} - -// Performs a rebalance, part of the mainLoop() -func (c *Consumer) rebalance() (map[string][]int32, error) { - memberID, _ := c.membership() - sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID) - - allTopics, err := c.client.Topics() - if err != nil { - return nil, err - } - c.extraTopics = c.selectExtraTopics(allTopics) - sort.Strings(c.extraTopics) - - // Re-join consumer group - strategy, err := c.joinGroup() - switch { - case err == sarama.ErrUnknownMemberId: - c.membershipMu.Lock() - c.memberID = "" - c.membershipMu.Unlock() - return nil, err - case err != nil: - return nil, err - } - - // Sync consumer group state, fetch subscriptions - subs, err := c.syncGroup(strategy) - switch { - case err == sarama.ErrRebalanceInProgress: - return nil, err - case err != nil: - _ = c.leaveGroup() - return nil, err - } - return subs, nil -} - -// Performs the subscription, part of the mainLoop() -func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error { - // fetch offsets - offsets, err := c.fetchOffsets(subs) - if err != nil { - _ = c.leaveGroup() - return err - } - - // create consumers in parallel - var mu sync.Mutex - var wg sync.WaitGroup - - for topic, partitions := range subs { - for _, partition := range partitions { - wg.Add(1) - - info := offsets[topic][partition] - if item, ok := c.customOffsets[topic]; ok { - if i, ok := item[partition]; ok { - info = i - } - } - go func(topic string, partition int32) { - if e := c.createConsumer(tomb, topic, partition, info); e != nil { - mu.Lock() - err = e - mu.Unlock() - } - wg.Done() - }(topic, partition) - } - } - wg.Wait() - - if err != nil { - _ = c.release() - _ = c.leaveGroup() - } - return err -} - -// -------------------------------------------------------------------- - -// Send a request to the broker to join group on rebalance() -func (c *Consumer) joinGroup() (*balancer, error) { - memberID, _ := c.membership() - req := &sarama.JoinGroupRequest{ - GroupId: c.groupID, - MemberId: memberID, - SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond), - ProtocolType: "consumer", - } - - meta := &sarama.ConsumerGroupMemberMetadata{ - Version: 1, - Topics: append(c.coreTopics, c.extraTopics...), - UserData: c.client.config.Group.Member.UserData, - } - err := req.AddGroupProtocolMetadata(string(StrategyRange), meta) - if err != nil { - return nil, err - } - err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta) - if err != nil { - return nil, err - } - - broker, err := c.client.Coordinator(c.groupID) - if err != nil { - c.closeCoordinator(broker, err) - return nil, err - } - - resp, err := broker.JoinGroup(req) - if err != nil { - c.closeCoordinator(broker, err) - return nil, err - } else if resp.Err != sarama.ErrNoError { - c.closeCoordinator(broker, resp.Err) - return nil, resp.Err - } - - var strategy *balancer - if resp.LeaderId == resp.MemberId { - members, err := resp.GetMembers() - if err != nil { - return nil, err - } - - strategy, err = newBalancerFromMeta(c.client, Strategy(resp.GroupProtocol), members) - if err != nil { - return nil, err - } - } - - c.membershipMu.Lock() - c.memberID = resp.MemberId - c.generationID = resp.GenerationId - c.membershipMu.Unlock() - - return strategy, nil -} - -// Send a request to the broker to sync the group on rebalance(). -// Returns a list of topics and partitions to consume. -func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) { - memberID, generationID := c.membership() - req := &sarama.SyncGroupRequest{ - GroupId: c.groupID, - MemberId: memberID, - GenerationId: generationID, - } - - if strategy != nil { - for memberID, topics := range strategy.Perform() { - if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ - Topics: topics, - }); err != nil { - return nil, err - } - } - } - - broker, err := c.client.Coordinator(c.groupID) - if err != nil { - c.closeCoordinator(broker, err) - return nil, err - } - - resp, err := broker.SyncGroup(req) - if err != nil { - c.closeCoordinator(broker, err) - return nil, err - } else if resp.Err != sarama.ErrNoError { - c.closeCoordinator(broker, resp.Err) - return nil, resp.Err - } - - // Return if there is nothing to subscribe to - if len(resp.MemberAssignment) == 0 { - return nil, nil - } - - // Get assigned subscriptions - members, err := resp.GetMemberAssignment() - if err != nil { - return nil, err - } - - // Sort partitions, for each topic - for topic := range members.Topics { - sort.Sort(int32Slice(members.Topics[topic])) - } - return members.Topics, nil -} - -// Fetches latest committed offsets for all subscriptions -func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) { - offsets := make(map[string]map[int32]offsetInfo, len(subs)) - req := &sarama.OffsetFetchRequest{ - Version: 1, - ConsumerGroup: c.groupID, - } - - for topic, partitions := range subs { - offsets[topic] = make(map[int32]offsetInfo, len(partitions)) - for _, partition := range partitions { - offsets[topic][partition] = offsetInfo{Offset: -1} - req.AddPartition(topic, partition) - } - } - - broker, err := c.client.Coordinator(c.groupID) - if err != nil { - c.closeCoordinator(broker, err) - return nil, err - } - - resp, err := broker.FetchOffset(req) - if err != nil { - c.closeCoordinator(broker, err) - return nil, err - } - - for topic, partitions := range subs { - for _, partition := range partitions { - block := resp.GetBlock(topic, partition) - if block == nil { - return nil, sarama.ErrIncompleteResponse - } - - if block.Err == sarama.ErrNoError { - offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata} - } else { - return nil, block.Err - } - } - } - return offsets, nil -} - -// Send a request to the broker to leave the group on failes rebalance() and on Close() -func (c *Consumer) leaveGroup() error { - broker, err := c.client.Coordinator(c.groupID) - if err != nil { - c.closeCoordinator(broker, err) - return err - } - - memberID, _ := c.membership() - if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{ - GroupId: c.groupID, - MemberId: memberID, - }); err != nil { - c.closeCoordinator(broker, err) - } - return err -} - -// -------------------------------------------------------------------- - -func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error { - memberID, _ := c.membership() - sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial)) - - // Create partitionConsumer - pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial) - if err != nil { - return err - } - - // Store in subscriptions - c.subs.Store(topic, partition, pc) - - // Start partition consumer goroutine - tomb.Go(func(stopper <-chan none) { - if c.client.config.Group.Mode == ConsumerModePartitions { - pc.waitFor(stopper) - } else { - pc.multiplex(stopper, c.messages, c.errors) - } - }) - - if c.client.config.Group.Mode == ConsumerModePartitions { - select { - case c.partitions <- pc: - case <-c.dying: - pc.Close() - } - } - return nil -} - -func (c *Consumer) commitOffsetsWithRetry(retries int) error { - err := c.CommitOffsets() - if err != nil && retries > 0 { - return c.commitOffsetsWithRetry(retries - 1) - } - return err -} - -func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) { - if broker != nil { - _ = broker.Close() - } - - switch err { - case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer: - _ = c.client.RefreshCoordinator(c.groupID) - } -} - -func (c *Consumer) selectExtraTopics(allTopics []string) []string { - extra := allTopics[:0] - for _, topic := range allTopics { - if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) { - extra = append(extra, topic) - } - } - return extra -} - -func (c *Consumer) isKnownCoreTopic(topic string) bool { - pos := sort.SearchStrings(c.coreTopics, topic) - return pos < len(c.coreTopics) && c.coreTopics[pos] == topic -} - -func (c *Consumer) isKnownExtraTopic(topic string) bool { - pos := sort.SearchStrings(c.extraTopics, topic) - return pos < len(c.extraTopics) && c.extraTopics[pos] == topic -} - -func (c *Consumer) isPotentialExtraTopic(topic string) bool { - rx := c.client.config.Group.Topics - if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) { - return false - } - if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) { - return true - } - return false -} - -func (c *Consumer) refreshCoordinator() error { - if err := c.refreshMetadata(); err != nil { - return err - } - return c.client.RefreshCoordinator(c.groupID) -} - -func (c *Consumer) refreshMetadata() (err error) { - if c.client.config.Metadata.Full { - err = c.client.RefreshMetadata() - } else { - var topics []string - if topics, err = c.client.Topics(); err == nil && len(topics) != 0 { - err = c.client.RefreshMetadata(topics...) - } - } - - // maybe we didn't have authorization to describe all topics - switch err { - case sarama.ErrTopicAuthorizationFailed: - err = c.client.RefreshMetadata(c.coreTopics...) - } - return -} - -func (c *Consumer) membership() (memberID string, generationID int32) { - c.membershipMu.RLock() - memberID, generationID = c.memberID, c.generationID - c.membershipMu.RUnlock() - return -} diff --git a/third/github.com/johng-cn/sarama-cluster/doc.go b/third/github.com/johng-cn/sarama-cluster/doc.go deleted file mode 100644 index 9c8ff16a7..000000000 --- a/third/github.com/johng-cn/sarama-cluster/doc.go +++ /dev/null @@ -1,8 +0,0 @@ -/* -Package cluster provides cluster extensions for Sarama, enabing users -to consume topics across from multiple, balanced nodes. - -It requires Kafka v0.9+ and follows the steps guide, described in: -https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design -*/ -package cluster diff --git a/third/github.com/johng-cn/sarama-cluster/offsets.go b/third/github.com/johng-cn/sarama-cluster/offsets.go deleted file mode 100644 index cfe013937..000000000 --- a/third/github.com/johng-cn/sarama-cluster/offsets.go +++ /dev/null @@ -1,69 +0,0 @@ -package cluster - -import ( - "sync" - - "github.com/gogf/gf/third/github.com/Shopify/sarama" -) - -// OffsetStash allows to accumulate offsets and -// mark them as processed in a bulk -type OffsetStash struct { - offsets map[topicPartition]offsetInfo - mu sync.Mutex -} - -// NewOffsetStash inits a blank stash -func NewOffsetStash() *OffsetStash { - return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)} -} - -// MarkOffset stashes the provided message offset -func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { - s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata) -} - -// MarkPartitionOffset stashes the offset for the provided topic/partition combination -func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { - s.mu.Lock() - defer s.mu.Unlock() - - key := topicPartition{Topic: topic, Partition: partition} - if info := s.offsets[key]; offset >= info.Offset { - info.Offset = offset - info.Metadata = metadata - s.offsets[key] = info - } -} - -// ResetPartitionOffset stashes the offset for the provided topic/partition combination. -// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets -func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { - s.mu.Lock() - defer s.mu.Unlock() - - key := topicPartition{Topic: topic, Partition: partition} - if info := s.offsets[key]; offset <= info.Offset { - info.Offset = offset - info.Metadata = metadata - s.offsets[key] = info - } -} - -// ResetOffset stashes the provided message offset -// See ResetPartitionOffset for explanation -func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { - s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata) -} - -// Offsets returns the latest stashed offsets by topic-partition -func (s *OffsetStash) Offsets() map[string]int64 { - s.mu.Lock() - defer s.mu.Unlock() - - res := make(map[string]int64, len(s.offsets)) - for tp, info := range s.offsets { - res[tp.String()] = info.Offset - } - return res -} diff --git a/third/github.com/johng-cn/sarama-cluster/partitions.go b/third/github.com/johng-cn/sarama-cluster/partitions.go deleted file mode 100644 index 40239d4ed..000000000 --- a/third/github.com/johng-cn/sarama-cluster/partitions.go +++ /dev/null @@ -1,274 +0,0 @@ -package cluster - -import ( - "sort" - "sync" - "time" - - "github.com/gogf/gf/third/github.com/Shopify/sarama" -) - -// PartitionConsumer allows code to consume individual partitions from the cluster. -// -// See docs for Consumer.Partitions() for more on how to implement this. -type PartitionConsumer interface { - sarama.PartitionConsumer - - // Topic returns the consumed topic name - Topic() string - - // Partition returns the consumed partition - Partition() int32 - - // InitialOffset returns the offset used for creating the PartitionConsumer instance. - // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest - InitialOffset() int64 - - // MarkOffset marks the offset of a message as preocessed. - MarkOffset(offset int64, metadata string) - - // ResetOffset resets the offset to a previously processed message. - ResetOffset(offset int64, metadata string) -} - -type partitionConsumer struct { - sarama.PartitionConsumer - - state partitionState - mu sync.Mutex - - topic string - partition int32 - initialOffset int64 - - closeOnce sync.Once - closeErr error - - dying, dead chan none -} - -func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) { - offset := info.NextOffset(defaultOffset) - pcm, err := manager.ConsumePartition(topic, partition, offset) - - // Resume from default offset, if requested offset is out-of-range - if err == sarama.ErrOffsetOutOfRange { - info.Offset = -1 - offset = defaultOffset - pcm, err = manager.ConsumePartition(topic, partition, offset) - } - if err != nil { - return nil, err - } - - return &partitionConsumer{ - PartitionConsumer: pcm, - state: partitionState{Info: info}, - - topic: topic, - partition: partition, - initialOffset: offset, - - dying: make(chan none), - dead: make(chan none), - }, nil -} - -// Topic implements PartitionConsumer -func (c *partitionConsumer) Topic() string { return c.topic } - -// Partition implements PartitionConsumer -func (c *partitionConsumer) Partition() int32 { return c.partition } - -// InitialOffset implements PartitionConsumer -func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset } - -// AsyncClose implements PartitionConsumer -func (c *partitionConsumer) AsyncClose() { - c.closeOnce.Do(func() { - c.closeErr = c.PartitionConsumer.Close() - close(c.dying) - }) -} - -// Close implements PartitionConsumer -func (c *partitionConsumer) Close() error { - c.AsyncClose() - <-c.dead - return c.closeErr -} - -func (c *partitionConsumer) waitFor(stopper <-chan none) { - select { - case <-stopper: - case <-c.dying: - } - close(c.dead) -} - -func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { - defer close(c.dead) - - for { - select { - case msg, ok := <-c.Messages(): - if !ok { - return - } - select { - case messages <- msg: - case <-stopper: - return - case <-c.dying: - return - } - case err, ok := <-c.Errors(): - if !ok { - return - } - select { - case errors <- err: - case <-stopper: - return - case <-c.dying: - return - } - case <-stopper: - return - case <-c.dying: - return - } - } -} - -func (c *partitionConsumer) getState() partitionState { - c.mu.Lock() - state := c.state - c.mu.Unlock() - - return state -} - -func (c *partitionConsumer) markCommitted(offset int64) { - c.mu.Lock() - if offset == c.state.Info.Offset { - c.state.Dirty = false - } - c.mu.Unlock() -} - -// MarkOffset implements PartitionConsumer -func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { - c.mu.Lock() - if next := offset + 1; next > c.state.Info.Offset { - c.state.Info.Offset = next - c.state.Info.Metadata = metadata - c.state.Dirty = true - } - c.mu.Unlock() -} - -// ResetOffset implements PartitionConsumer -func (c *partitionConsumer) ResetOffset(offset int64, metadata string) { - c.mu.Lock() - if next := offset + 1; next <= c.state.Info.Offset { - c.state.Info.Offset = next - c.state.Info.Metadata = metadata - c.state.Dirty = true - } - c.mu.Unlock() -} - -// -------------------------------------------------------------------- - -type partitionState struct { - Info offsetInfo - Dirty bool - LastCommit time.Time -} - -// -------------------------------------------------------------------- - -type partitionMap struct { - data map[topicPartition]*partitionConsumer - mu sync.RWMutex -} - -func newPartitionMap() *partitionMap { - return &partitionMap{ - data: make(map[topicPartition]*partitionConsumer), - } -} - -func (m *partitionMap) IsSubscribedTo(topic string) bool { - m.mu.RLock() - defer m.mu.RUnlock() - - for tp := range m.data { - if tp.Topic == topic { - return true - } - } - return false -} - -func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer { - m.mu.RLock() - pc, _ := m.data[topicPartition{topic, partition}] - m.mu.RUnlock() - return pc -} - -func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) { - m.mu.Lock() - m.data[topicPartition{topic, partition}] = pc - m.mu.Unlock() -} - -func (m *partitionMap) Snapshot() map[topicPartition]partitionState { - m.mu.RLock() - defer m.mu.RUnlock() - - snap := make(map[topicPartition]partitionState, len(m.data)) - for tp, pc := range m.data { - snap[tp] = pc.getState() - } - return snap -} - -func (m *partitionMap) Stop() { - m.mu.RLock() - defer m.mu.RUnlock() - - var wg sync.WaitGroup - for tp := range m.data { - wg.Add(1) - go func(p *partitionConsumer) { - _ = p.Close() - wg.Done() - }(m.data[tp]) - } - wg.Wait() -} - -func (m *partitionMap) Clear() { - m.mu.Lock() - for tp := range m.data { - delete(m.data, tp) - } - m.mu.Unlock() -} - -func (m *partitionMap) Info() map[string][]int32 { - info := make(map[string][]int32) - m.mu.RLock() - for tp := range m.data { - info[tp.Topic] = append(info[tp.Topic], tp.Partition) - } - m.mu.RUnlock() - - for topic := range info { - sort.Sort(int32Slice(info[topic])) - } - return info -} diff --git a/third/github.com/johng-cn/sarama-cluster/util.go b/third/github.com/johng-cn/sarama-cluster/util.go deleted file mode 100644 index e7cb5dd1b..000000000 --- a/third/github.com/johng-cn/sarama-cluster/util.go +++ /dev/null @@ -1,75 +0,0 @@ -package cluster - -import ( - "fmt" - "sort" - "sync" -) - -type none struct{} - -type topicPartition struct { - Topic string - Partition int32 -} - -func (tp *topicPartition) String() string { - return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition) -} - -type offsetInfo struct { - Offset int64 - Metadata string -} - -func (i offsetInfo) NextOffset(fallback int64) int64 { - if i.Offset > -1 { - return i.Offset - } - return fallback -} - -type int32Slice []int32 - -func (p int32Slice) Len() int { return len(p) } -func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -func (p int32Slice) Diff(o int32Slice) (res []int32) { - on := len(o) - for _, x := range p { - n := sort.Search(on, func(i int) bool { return o[i] >= x }) - if n < on && o[n] == x { - continue - } - res = append(res, x) - } - return -} - -// -------------------------------------------------------------------- - -type loopTomb struct { - c chan none - o sync.Once - w sync.WaitGroup -} - -func newLoopTomb() *loopTomb { - return &loopTomb{c: make(chan none)} -} - -func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) } -func (t *loopTomb) Close() { t.stop(); t.w.Wait() } - -func (t *loopTomb) Dying() <-chan none { return t.c } -func (t *loopTomb) Go(f func(<-chan none)) { - t.w.Add(1) - - go func() { - defer t.stop() - defer t.w.Done() - - f(t.c) - }() -}