• This repository has been archived on 02/Oct/2020
  • Stars
    star
    220
  • Rank 180,422 (Top 4 %)
  • Language
    Go
  • License
    MIT License
  • Created almost 7 years ago
  • Updated over 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Go client library for Apache Kafka

Go Kafka Client Library Mit License Build Status Coverage Status

A high level Go client library for Apache Kafka that provides the following primitives on top of sarama-cluster:

  • Competing consumer semantics with dead letter queue (DLQ)
    • Ability to process messages across multiple goroutines
    • Ability to Ack or Nack messages out of order (with optional DLQ)
  • Ability to consume from topics spread across different kafka clusters

Stability

This library is in alpha. APIs are subject to change, use at your own risk

Contributing

If you are interested in contributing, please sign the License Agreement and see our development guide

Installation

go get -u github.com/uber-go/kafka-client

Quick Start

package main

import (
	"os"
	"os/signal"

	"github.com/uber-go/kafka-client"
	"github.com/uber-go/kafka-client/kafka"
	"github.com/uber-go/tally"
	"go.uber.org/zap"
)

func main() {
	// mapping from cluster name to list of broker ip addresses
	brokers := map[string][]string{
		"sample_cluster":     []string{"127.0.0.1:9092"},
		"sample_dlq_cluster": []string{"127.0.0.1:9092"},
	}
	// mapping from topic name to cluster that has that topic
	topicClusterAssignment := map[string][]string{
		"sample_topic": []string{"sample_cluster"},
	}

	// First create the kafkaclient, its the entry point for creating consumers or producers
	// It takes as input a name resolver that knows how to map topic names to broker ip addrs
	client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zap.NewNop(), tally.NoopScope)

	// Next, setup the consumer config for consuming from a set of topics
	config := &kafka.ConsumerConfig{
		TopicList: kafka.ConsumerTopicList{
			kafka.ConsumerTopic{ // Consumer Topic is a combination of topic + dead-letter-queue
				Topic: kafka.Topic{ // Each topic is a tuple of (name, clusterName)
					Name:    "sample_topic",
					Cluster: "sample_cluster",
				},
				DLQ: kafka.Topic{
					Name:    "sample_consumer_dlq",
					Cluster: "sample_dlq_cluster",
				},
			},
		},
		GroupName:   "sample_consumer",
		Concurrency: 100, // number of go routines processing messages in parallel
	}

	// Create the consumer through the previously created client
	consumer, err := client.NewConsumer(config)
	if err != nil {
		panic(err)
	}

	// Finally, start consuming
	if err := consumer.Start(); err != nil {
		panic(err)
	}

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, os.Interrupt)

	for {
		select {
		case msg, ok := <-consumer.Messages():
			if !ok {
				return // channel closed
			}
			if err := process(msg); err != nil {
				msg.Nack()
			} else {
				msg.Ack()
			}
		case <-sigCh:
			consumer.Stop()
			<-consumer.Closed()
		}
	}
}

More Repositories

1

zap

Blazing fast, structured, leveled logging in Go.
Go
21,782
star
2

guide

The Uber Go Style Guide.
Makefile
15,636
star
3

fx

A dependency injection based application framework for Go.
Go
5,742
star
4

goleak

Goroutine leak detector
Go
4,509
star
5

ratelimit

A Go blocking leaky-bucket rate limit implementation
Go
3,934
star
6

dig

A reflection based dependency injection toolkit for Go.
Go
3,802
star
7

automaxprocs

Automatically set GOMAXPROCS to match Linux container CPU quota.
Go
3,459
star
8

mock

GoMock is a mocking framework for the Go programming language.
Go
2,165
star
9

atomic

Wrapper types for sync/atomic which enforce atomic access
Go
1,342
star
10

multierr

Combine one or more Go errors together
Go
1,012
star
11

tally

A Go metrics interface with fast buffered metrics and third party reporters
Go
847
star
12

gopatch

Refactoring and code transformation tool for Go.
Go
681
star
13

nilaway

Static Analysis tool to detect potential Nil panics in Go code
Go
400
star
14

config

Configuration for Go applications
Go
382
star
15

cadence-client

Framework for authoring workflows and activities running on top of the Cadence orchestration engine.
Go
339
star
16

sally

A tiny HTTP server for supporting custom Golang import paths
Go
228
star
17

dosa

DOSA is a data object abstraction layer
Go
197
star
18

cff

Concurrency toolkit for Go
Go
124
star
19

tools

A collection of golang tools used at Uber
Go
58
star
20

go-helix

A Go implementation of Apache Helix (currently the participant part only).
Go
56
star
21

icu4go

A Go binding for the icu4c library
Go
49
star
22

mapdecode

Implement YAML/JSON decoding in one place.
Go
48
star
23

hackeroni

A Go API client for HackerOne (api.hackerone.com)
Go
41
star
24

gwr

Get / Watch / Report -ing of operational data. This project is deprecated and not maintained.
Go
38
star
25

flagoverride

An automatic way of creating command line options to override fields from a struct.
Go
20
star