• Stars
    star
    4,402
  • Rank 9,724 (Top 0.2 %)
  • Language
    Go
  • License
    Apache License 2.0
  • Created over 8 years ago
  • Updated 8 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM

confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.

Features:

  • High performance - confluent-kafka-go is a lightweight wrapper around librdkafka, a finely tuned C client.

  • Reliability - There are a lot of details to get right when writing an Apache Kafka client. We get them right in one place (librdkafka) and leverage this work across all of our clients (also confluent-kafka-python and confluent-kafka-dotnet).

  • Supported - Commercial support is offered by Confluent.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

The Golang bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9 and above.

See the API documentation for more information.

For a step-by-step guide on using the client see Getting Started with Apache Kafka and Golang.

Examples

High-level balanced consumer

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

	// A signal handler or similar could be used to set this to false to break the loop.
	run := true

	for run {
		msg, err := c.ReadMessage(time.Second)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else if !err.(kafka.Error).IsTimeout() {
			// The client will automatically try to recover from all errors.
			// Timeout is not considered an error because it is raised by
			// ReadMessage in absence of messages.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()
}

Producer

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}

	defer p.Close()

	// Delivery report handler for produced messages
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	// Produce messages to topic (asynchronously)
	topic := "myTopic"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}

	// Wait for message deliveries before shutting down
	p.Flush(15 * 1000)
}

More elaborate examples are available in the examples directory, including how to configure the Go client for use with Confluent Cloud.

Getting Started

Supports Go 1.17+ and librdkafka 2.4.0+.

Using Go Modules

You can use Go Modules to install confluent-kafka-go.

Import the kafka package from GitHub in your code:

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

Build your project:

go build ./...

If you are building for Alpine Linux (musl), -tags musl must be specified.

go build -tags musl ./...

A dependency to the latest stable version of confluent-kafka-go should be automatically added to your go.mod file.

Install the client

Manual install:

go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka

Golang import:

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

librdkafka

Prebuilt librdkafka binaries are included with the Go client and librdkafka does not need to be installed separately on the build or target system. The following platforms are supported by the prebuilt librdkafka binaries:

  • Mac OSX x64 and arm64
  • glibc-based Linux x64 and arm64 (e.g., RedHat, Debian, CentOS, Ubuntu, etc) - without GSSAPI/Kerberos support
  • musl-based Linux amd64 and arm64 (Alpine) - without GSSAPI/Kerberos support
  • Windows amd64 - without GSSAPI/Kerberos support

When building your application for Alpine Linux (musl libc) you must pass -tags musl to go get, go build, etc.

CGO_ENABLED must NOT be set to 0 since the Go client is based on the C library librdkafka.

If GSSAPI/Kerberos authentication support is required you will need to install librdkafka separately, see the Installing librdkafka chapter below, and then build your Go application with -tags dynamic.

Installing librdkafka

If the bundled librdkafka build is not supported on your platform, or you need a librdkafka with GSSAPI/Kerberos support, you must install librdkafka manually on the build and target system using one of the following alternatives:

  • For Debian and Ubuntu based distros, install librdkafka-dev from the standard repositories or using Confluent's Deb repository.
  • For Redhat based distros, install librdkafka-devel using Confluent's YUM repository.
  • For MacOS X, install librdkafka from Homebrew. You may also need to brew install pkg-config if you don't already have it: brew install librdkafka pkg-config.
  • For Alpine: apk add librdkafka-dev pkgconf
  • For Windows: there are no official/supported packages, but static builds are included for Windows/x64. Installing from source is needed only for GSSAPI/Kerberos support.
  • For source builds, see instructions below.

Build from source:

git clone https://github.com/confluentinc/librdkafka.git
cd librdkafka
./configure
make
sudo make install

After installing librdkafka you will need to build your Go application with -tags dynamic.

Note: If you use the master branch of the Go client, then you need to use the master branch of librdkafka.

confluent-kafka-go requires librdkafka v1.9.0 or later.

Static builds on Linux

Since we are using cgo, Go builds a dynamically linked library even when using the prebuilt, statically-compiled librdkafka as described in the librdkafka chapter.

For glibc based systems, if the system where the client is being compiled is different from the target system, especially when the target system is older, there is a glibc version error when trying to run the compiled client.

Unfortunately, if we try building a statically linked binary, it doesn't solve the problem, since there is no way to have truly static builds using glibc. This is because there are some functions in glibc, like getaddrinfo which need the shared version of the library even when the code is compiled statically.

One way around this is to either use a container/VM to build the binary, or install an older version of glibc on the system where the client is being compiled.

The other way is using musl to create truly static builds for Linux. To do this, install it for your system.

Static compilation command, meant to be used alongside the prebuilt librdkafka bundle:

CC=/path/to/musl-gcc go build --ldflags '-linkmode external -extldflags "-static"' -tags musl

API Strands

The recommended API strand is the Function-Based one, the Channel-Based one is documented in examples/legacy.

Function-Based Consumer

Messages, errors and events are polled through the consumer.Poll() function.

It has direct mapping to underlying librdkafka functionality.

See examples/consumer_example

Function-Based Producer

Application calls producer.Produce() to produce messages. Delivery reports are emitted on the producer.Events() or specified private channel.

Warnings

  • Produce() is a non-blocking call, if the internal librdkafka queue is full the call will fail and can be retried.

See examples/producer_example

License

Apache License v2.0

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-go. confluent-kafka-go has no affiliation with and is not endorsed by The Apache Software Foundation.

Developer Notes

See kafka/README

Contributions to the code, examples, documentation, et.al, are very much appreciated.

Make your changes, run gofmt, tests, etc, push your branch, create a PR, and sign the CLA.

Confluent Cloud

For a step-by-step guide on using the Golang client with Confluent Cloud see Getting Started with Apache Kafka and Golang on Confluent Developer.

More Repositories

1

ksql

The database purpose-built for stream processing applications.
Java
5,533
star
2

confluent-kafka-python

Confluent's Kafka Python Client
Python
3,388
star
3

confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
C#
2,560
star
4

kafka-streams-examples

Demo applications and code examples for Apache Kafka's Streams API.
Java
2,169
star
5

kafka-rest

Confluent REST Proxy for Kafka
Java
2,137
star
6

schema-registry

Confluent Schema Registry for Kafka
Java
2,022
star
7

examples

Apache Kafka and Confluent Platform examples and demos
Shell
1,903
star
8

bottledwater-pg

Change data capture from PostgreSQL into Kafka
C
1,521
star
9

demo-scene

👾Scripts and samples to support Confluent Demos and Talks. ⚠️Might be rough around the edges ;-) 👉For automated tutorials and QA'd code, see https://github.com/confluentinc/examples/
Shell
1,356
star
10

cp-docker-images

[DEPRECATED] Docker images for Confluent Platform.
Python
1,142
star
11

kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Java
953
star
12

cp-all-in-one

docker-compose.yml files for cp-all-in-one , cp-all-in-one-community, cp-all-in-one-cloud, Apache Kafka Confluent Platform
Python
889
star
13

cp-helm-charts

The Confluent Platform Helm charts enable you to deploy Confluent Platform services on Kubernetes for development, test, and proof of concept environments.
Mustache
764
star
14

kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Java
715
star
15

parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
Java
654
star
16

kafka-connect-hdfs

Kafka Connect HDFS connector
Java
465
star
17

kafka-tutorials

Tutorials and Recipes for Apache Kafka
Java
302
star
18

kafka-images

Confluent Docker images for Apache Kafka
Python
295
star
19

ducktape

System integration and performance tests
Python
294
star
20

librdkafka

The Apache Kafka C/C++ library
C
180
star
21

kafka-rest-node

Node.js client for the Kafka REST proxy
JavaScript
146
star
22

confluent-platform-security-tools

Security tools for the Confluent Platform.
Shell
146
star
23

kafka-connect-datagen

Connector that generates data for demos
Java
143
star
24

docker-images

DEPRECATED - Dockerfiles for Confluent Stream Data Platform
Shell
116
star
25

rest-utils

Utilities and a small framework for building REST services with Jersey, Jackson, and Jetty.
Java
111
star
26

cli

CLI for Confluent Cloud and Confluent Platform
Go
103
star
27

openmessaging-benchmark

Java
89
star
28

camus

Mirror of Linkedin's Camus
Java
88
star
29

common

Common utilities library containing metrics, config and utils
Java
85
star
30

kafka-workshop

JavaScript
75
star
31

confluent-kafka-javascript

Confluent's Apache Kafka JavaScript client
JavaScript
71
star
32

training-developer-src

Source Code accompanying the Confluent Kafka for Developers course
Java
70
star
33

ccloud-tools

Running Tools from Confluent Platform along with your Confluent Cloud™ Cluster
HCL
67
star
34

bincover

Easily measure code coverage of Golang binaries
Go
62
star
35

libserdes

Avro Serialization/Deserialization C/C++ library with Confluent schema-registry support
C
62
star
36

kafka-connect-blog

Demo for Kafka Connect with JDBC and HDFS Connectors
Shell
59
star
37

confluent-cli

Confluent Platform CLI
Shell
58
star
38

ksqldb-graphql

Node.js GraphQL integration for ksqlDB
TypeScript
56
star
39

confluent-sigma

JavaScript
52
star
40

terraform-provider-confluentcloud

Confluent Cloud Terraform Provider is deprecated in favor of Confluent Terraform Provider
Go
52
star
41

jmx-monitoring-stacks

📊 Monitoring examples for Confluent Cloud and Confluent Platform
C#
44
star
42

confluent-kubernetes-examples

Example scenario workflows for Confluent for Kubernetes
Shell
43
star
43

qcon-microservices

Example online orders app composed of event-driven microservices. Built for QCon workshop.
Java
38
star
44

securing-kafka-blog

Secure Kafka cluster (in a VM) for development and testing
Puppet
38
star
45

cp-demo

Confluent Platform Demo including Apache Kafka, ksqlDB, Control Center, Schema Registry, Security, Schema Linking, and Cluster Linking
Shell
36
star
46

training-administration-src

Contains docker-compose file needed for Apache Kafka Administration by Confluent training
HTML
36
star
47

mox

A hybrid mock and proxy server - easily programmable and runs on express
JavaScript
35
star
48

terraform-state-s3

Terraform module to create the S3/DynamoDB backend to store the Terraform state+lock
HCL
34
star
49

common-docker

Confluent Commons with support for building and testing Docker images.
Java
34
star
50

ksql-recipes-try-it-at-home

Files needed to try out KSQL Recipes for yourself
Shell
34
star
51

training-ksql-and-streams-src

Sample solutions for the exercises of the course KSQL & Kafka Streams
Java
30
star
52

schema-registry-images

Docker Images for Schema Registry
Python
29
star
53

terraform-provider-confluent

Terraform Provider for Confluent
Go
29
star
54

confluent-docker-utils

Common Python utils for testing Confluent's Docker images
Python
28
star
55

flink-cookbook

Java
28
star
56

cp-ansible

Ansible playbooks for the Confluent Platform
Jinja
28
star
57

ksql-images

KSQL platform docker images
Shell
27
star
58

coding-in-motion

Source code for the "Coding in Motion" series.
Nix
25
star
59

proto-go-setter

Go
23
star
60

online-inferencing-blog-application

Source code and application accompanying the online inferencing blog
Java
21
star
61

stream-me-up-scotty

A wide range of Digital Assets from Confluent's Solution Engineering team for Confluent Cloud
21
star
62

training-fundamentals-src

Source code accompanying the course "Apache Kafka Technical Essentials"
Shell
19
star
63

infoq-kafka-ksql

Code samples to go with InfoQ article
Shell
17
star
64

kafka-rest-images

Docker Images for Kafka REST
Python
17
star
65

kafka-mqtt-images

Confluent Docker images for Kafka MQTT
Shell
16
star
66

learn-kafka-courses

Learn the basics of Apache Kafka® from leaders in the Kafka community with these video courses covering the Kafka ecosystem and hands-on exercises.
Shell
16
star
67

commercial-workshops

Confluent Commercial SE Team's Demo and Workshop Repository
Python
14
star
68

training-cao-src

Source code accompanying the course "Monitoring, Troubleshooting and Tuning"
Java
13
star
69

ccloud-connectivity

Setup and testing connectivity to Confluent Cloud
Shell
13
star
70

event-streaming-patterns

A collection of Event Streaming Patterns, including problem statements, solutions, and implementation examples.
HTML
13
star
71

vscode

Confluent for Visual Studio Code
TypeScript
12
star
72

ksqldb-recipes

Makefile
12
star
73

ksql-workshop

KSQL Workshop
11
star
74

demo-stream-designer

Current 2022 Confluent Keynote Demo covering Stream Designer, Stream Catalog, and Stream Sharing.
Python
11
star
75

control-center-images

Docker images for enterprise control center images
Python
11
star
76

kafka-connect-http-demo

A demo target for running the Confluent HTTP sink connector
Java
11
star
77

castle

Castle is a test harness for Apache Kafka, Trogdor, and related projects.
Java
11
star
78

demo-change-data-capture

This demo shows how to capture data changes from relational databases (Oracle and PostgreSQL) and stream them to Confluent Cloud, use ksqlDB for real-time stream processing, send enriched data to cloud data warehouses (Snowflake and Amazon Redshift).
HCL
11
star
79

kafkacat-images

Docker Images for Kafkacat
10
star
80

confluent-kafka-go-dev

[EXPERIMENTAL] Development / WIP / exploratory / test fork of confluent-kafka-go
Go
10
star
81

confluent-hybrid-cloud-workshop

Confluent Hybrid Cloud Workshop
HCL
10
star
82

learn-practical-event-modeling

Kotlin
9
star
83

ksql-elasticsearch-demo

TSQL
8
star
84

strata-tutorials

Content for Spring 2016 Strata tutorials
Java
7
star
85

demo-database-modernization

This demo shows how to stream data to cloud databases with Confluent. It includes fully-managed connectors (Oracle CDC, RabbitMQ, MongoDB Atlas), ksqlDB/Flink SQL as stream processing engine.
HCL
7
star
86

flink-table-api-java-examples

Java Examples for running Apache Flink® Table API on Confluent Cloud
Java
6
star
87

confluent-oauth-extensions

Java
6
star
88

kafka-replicator-images

Docker images for Kafka Connect
Shell
6
star
89

etl

Code for ETL data pipelines
Python
6
star
90

operator-earlyaccess

Confluent Operator Early Access docs
6
star
91

schema-registry-workshop

JavaScript
6
star
92

learn-building-flink-applications-in-java-exercises

Java
6
star
93

demo-application-modernization

Application modernization example including Confluent Cloud, ksqlDB, Postgres, and Elasticsearch.
JavaScript
6
star
94

csid-secrets-providers

Enables use of external third-party systems for storing/retrieving key/value pairs with Confluent clusters.
Java
6
star
95

support-metrics-common

Common utilities for metrics collection of proactive support
Java
6
star
96

flink-table-api-python-examples

Python Examples for running Apache Flink® Table API on Confluent Cloud
Python
5
star
97

confluent-kafka-go-example

Example application using the confluent-kafka-go client
Go
5
star
98

learn-kafka-kraft

KRaft mode playground
Shell
5
star
99

ccloud-sdk-go-v2

SDK for interacting with Confluent Cloud
Makefile
5
star
100

streaming-ops

Simulated production environment running Kubernetes targeting Apache Kafka and Confluent components on Confluent Cloud. Managed by declarative infrastructure and GitOps.
Shell
5
star