• This repository has been archived on 08/Jan/2020
  • Stars
    star
    1,008
  • Rank 45,589 (Top 0.9 %)
  • Language
    Go
  • License
    MIT License
  • Created almost 10 years ago
  • Updated almost 5 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]

Sarama Cluster

GoDoc Build Status Go Report Card License

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

DEPRECATION NOTICE

Please note that since IBM/sarama#1099 was merged and released (>= v1.19.0) this library is officially deprecated. The native implementation supports a variety of use cases that are not available through this library.

Documentation

Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster

Examples

Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple topics and partitions are all passed to the single channel:

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume errors
	go func() {
		for err := range consumer.Errors() {
			log.Printf("Error: %s\n", err.Error())
		}
	}()

	// consume notifications
	go func() {
		for ntf := range consumer.Notifications() {
			log.Printf("Rebalanced: %+v\n", ntf)
		}
	}()

	// consume messages, watch signals
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "")	// mark message as processed
			}
		case <-signals:
			return
		}
	}
}

Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level consumers:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"

  cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, set mode to ConsumerModePartitions
	config := cluster.NewConfig()
	config.Group.Mode = cluster.ConsumerModePartitions

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume partitions
	for {
		select {
		case part, ok := <-consumer.Partitions():
			if !ok {
				return
			}

			// start a separate goroutine to consume messages
			go func(pc cluster.PartitionConsumer) {
				for msg := range pc.Messages() {
					fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
					consumer.MarkOffset(msg, "")	// mark message as processed
				}
			}(part)
		case <-signals:
			return
		}
	}
}

Running tests

You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.

To run tests, call:

$ make test

Troubleshooting

Consumer not receiving any messages?

By default, sarama's Config.Consumer.Offsets.Initial is set to sarama.OffsetNewest. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.

If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial to sarama.OffsetOldest.

More Repositories

1

redislock

Simplified distributed locking implementation using Redis
Go
1,456
star
2

redeo

High-performance framework for building redis-protocol compatible TCP servers/services
Go
439
star
3

openrtb

OpenRTB protocol defintions for Go
Go
288
star
4

grpclb

External Load Balancing Service solution for gRPC written in Go
Go
263
star
5

grape-kaminari

kaminari paginator integration for grape API framework
Ruby
160
star
6

redis-lock

[DEPRECATED] Please see https://github.com/bsm/redislock instead
Go
142
star
7

ratelimit

Simple, thread-safe Go rate-limiter
Go
81
star
8

attribute-defaults

Simple ActiveRecord plugin that allows to specify default values for attributes
Ruby
52
star
9

poseidon_cluster

Poseidon cluster extensions
Ruby
42
star
10

fakengx

Library for testing Lua scripts embedded into Nginx
Lua
40
star
11

sidekiq-datadog

Ruby
31
star
12

lua-resty-http

Lua HTTP client driver for ngx_lua
Lua
31
star
13

activesupport-cache-database

ActiveSupport::Cache::Store implementation backed by a database via ActiveRecord
Ruby
28
star
14

ssdb-rb

Ruby client library for SSDB
Ruby
17
star
15

go-guid

MongoDB style globally unique identifiers in Go
Go
16
star
16

bfs

Multi-adapter bucket-based file system abstraction. #golang
Go
14
star
17

bitmap.lua

Lua bitmaps (aka bitstrings or bitsets) implemented in C
C
14
star
18

planb

Build distributed, low-latency services with a redis-compatible protocol and sentinel client support
Go
13
star
19

go-geohex

GeoHex implementation in Go
Go
11
star
20

bps

Pub/sub & message processing abstraction
Go
11
star
21

appdash-rb

Ruby client for Appdash
Ruby
10
star
22

grape-pagy

Ruby
10
star
23

redpear

A simple, elegant & efficient ORM for Redis, optimised for speed!
Ruby
9
star
24

preferable

User preference management for ActiveRecord
Ruby
9
star
25

extsort

External merge sort algorithm, implemented in Go
Go
9
star
26

raft-badger

Raft backend implementation using Badger
Go
9
star
27

paperclip_remote

Plugin for Paperclip: Allows fetching attachments from remote locations
Ruby
8
star
28

openmetrics

A standalone, dependency-free implementation of OpenMetrics v1.0
Go
8
star
29

histogram

Streamining histograms in Go
Go
8
star
30

qualify

Match a fact against large number of pre-defined rules in Go
Go
8
star
31

fiddle

Ruby
7
star
32

datadog-notifications

Ruby
7
star
33

drone-s3-cache

Go
6
star
34

macdaddy

MAC Daddy is a Go library for generating encrypted messages and verifying their authenticity using the Poly1305 message authentication code with a ChaCha20 cipher
Go
6
star
35

grape-app

Ruby
6
star
36

pivotable

Ruby
6
star
37

go-sparkey

Go
6
star
38

bitset.lua

Bitsets (aka bitstrings or bitmaps) implemented in pure Lua
Lua
5
star
39

flood

Go
5
star
40

pgpq

Priority queues with Postgres
Go
5
star
41

httpx

Useful and opinionated helpers for building secure HTTP services
Go
4
star
42

sntable

Fast, custom SSTable implementation with numeric keys.
Go
4
star
43

redeoraft

Raft transport implementation for Redeo servers #raft #golang #redeo #redis
Go
4
star
44

reason

Go
4
star
45

redis-balancer

Go
4
star
46

mlmetrics

Common metrics for evaluation of machine learning models
Go
4
star
47

constrainable

Filtering for ActiveRecord. Sanitizes readable query parameters - great for building APIs & HTML filters.
Ruby
4
star
48

accord

Go
3
star
49

serialization_scopes

Define output scopes for XML/JSON serialization of your ActiveRecord models
Ruby
3
star
50

sarama

Sarama cluster extensions
3
star
51

streamsort

DEPRECATED: please use https://github.com/bsm/extsort instead
Go
3
star
52

sortable-by

Ruby
3
star
53

ccdb

Go
3
star
54

fleiss

Ruby
3
star
55

multiredis

Go abstraction of various redis client types for simpler testing
Go
3
star
56

cdb64

Go
3
star
57

ginkgo

Straight copy of the excellent Ginkgo library, stripped to the bare core to be free of third-party dependencies.
Go
3
star
58

zetasketch

Go
2
star
59

cntdb

Go
2
star
60

gomega

Straight copy of the excellent Gomega library, stripped to the bare core to be free of third-party dependencies
Go
2
star
61

dbx

Useful extensions to stdlib's database/sql
Go
2
star
62

nanoid

Go
2
star
63

geohex.lua

GeoHex V3 library for Lua
Lua
2
star
64

fluq

Coming soon
Ruby
2
star
65

redis-tools

Simple redis CLI helpers
Ruby
2
star
66

filterable-by

Ruby
2
star
67

grape-apidoc

Ruby
2
star
68

grpctools

Go
2
star
69

go-x

Useful extensions to the Go stdlib.
Go
2
star
70

shutdown

Go
2
star
71

firejwt

Validation for Firebase JWT
Go
2
star
72

pool

Go
2
star
73

strset

Go
2
star
74

devise_imap_authenticatable

Ruby
2
star
75

feedx

Go
2
star
76

pbio

Ruby
1
star
77

bst

Fast and generic Set and Map implementations using binary-search-trees.
Go
1
star
78

bsm.github.io

HTML
1
star
79

geo_hex

Ruby
1
star
80

pipa

Go
1
star
81

redis_recipes

Collection of Redis LUA recipes. Require Redis 2.6.0 or higher.
Ruby
1
star
82

geohashi

Go(lang) clone of geohash-int
Go
1
star
83

forensiq

Small tool for collecting forensic stats and storing them in Redis.
Go
1
star
84

redis-cluster

Go
1
star
85

disquo

Minimalist, threaded high-performance Ruby workers on top of Disque
Ruby
1
star
86

mgmt

Ruby
1
star
87

go-benchmark

Go (Awesome) Benchmarks
Go
1
star
88

intset

Go
1
star
89

paperclip_removable

Paperclip plugin: Allows removal of previously uploaded files
Ruby
1
star
90

grape-app-doc

Ruby
1
star
91

redpear.lua

Simplistic object mapper for Redis/Lua
Lua
1
star
92

gemical

Gemical command-line client.
Ruby
1
star
93

go-vlq

Variable-length quantity encoding
Go
1
star
94

paperclip_bsm_s3

Custom extensions to Paperclip
Ruby
1
star
95

capistrano-golang

Ruby
1
star
96

quasizero

TCP server (and client) implementation, optimised for low latency and pipelined throughput
Go
1
star
97

active-record-atomic

Ruby
1
star
98

timed_lru

Simple, thread-safe LRU with (optional) TTLs and constant time operations
Ruby
1
star
99

tdigest

Implementation of Ted Dunning's t-digest in Go
Go
1
star