• Stars
    star
    2,560
  • Rank 17,183 (Top 0.4 %)
  • Language
    C#
  • License
    Apache License 2.0
  • Created over 7 years ago
  • Updated 11 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Confluent's Apache Kafka .NET client

Confluent's .NET Client for Apache KafkaTM

Build status Chat on Slack

confluent-kafka-dotnet is Confluent's .NET client for Apache Kafka and the Confluent Platform.

Features:

  • High performance - confluent-kafka-dotnet is a lightweight wrapper around librdkafka, a finely tuned C client.

  • Reliability - There are a lot of details to get right when writing an Apache Kafka client. We get them right in one place (librdkafka) and leverage this work across all of our clients (also confluent-kafka-python and confluent-kafka-go).

  • Supported - Commercial support is offered by Confluent.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

confluent-kafka-dotnet is derived from Andreas Heider's rdkafka-dotnet. We're fans of his work and were very happy to have been able to leverage rdkafka-dotnet as the basis of this client. Thanks Andreas!

Referencing

confluent-kafka-dotnet is distributed via NuGet. We provide five packages:

  • Confluent.Kafka [net462, netstandard1.3, netstandard2.0] - The core client library.
  • Confluent.SchemaRegistry.Serdes.Avro [netstandard2.0] - Provides a serializer and deserializer for working with Avro serialized data with Confluent Schema Registry integration.
  • Confluent.SchemaRegistry.Serdes.Protobuf [netstandard2.0] - Provides a serializer and deserializer for working with Protobuf serialized data with Confluent Schema Registry integration.
  • Confluent.SchemaRegistry.Serdes.Json [netstandard2.0] - Provides a serializer and deserializer for working with Json serialized data with Confluent Schema Registry integration.
  • Confluent.SchemaRegistry [netstandard1.4, netstandard2.0] - Confluent Schema Registry client (a dependency of the Confluent.SchemaRegistry.Serdes packages).

To install Confluent.Kafka from within Visual Studio, search for Confluent.Kafka in the NuGet Package Manager UI, or run the following command in the Package Manager Console:

Install-Package Confluent.Kafka -Version 2.3.0

To add a reference to a dotnet core project, execute the following at the command line:

dotnet add package -v 2.3.0 Confluent.Kafka

Note: Confluent.Kafka depends on the librdkafka.redist package which provides a number of different builds of librdkafka that are compatible with common platforms. If you are on one of these platforms this will all work seamlessly (and you don't need to explicitly reference librdkafka.redist). If you are on a different platform, you may need to build librdkafka manually (or acquire it via other means) and load it using the Library.Load method.

Branch builds

Nuget packages corresponding to all commits to release branches are available from the following nuget package source (Note: this is not a web URL - you should specify it in the nuget package manager): https://ci.appveyor.com/nuget/confluent-kafka-dotnet. The version suffix of these nuget packages matches the appveyor build number. You can see which commit a particular build number corresponds to by looking at the AppVeyor build history

Usage

For a step-by-step guide and code samples, see Getting Started with Apache Kafka and .NET on Confluent Developer.

You can also take the free self-paced training course Apache Kafka for .NET Developers on Confluent Developer.

Take a look in the examples directory and at the integration tests for further examples.

For an overview of configuration properties, refer to the librdkafka documentation.

Basic Producer Examples

You should use the ProduceAsync method if you would like to wait for the result of your produce requests before proceeding. You might typically want to do this in highly concurrent scenarios, for example in the context of handling web requests. Behind the scenes, the client will manage optimizing communication with the Kafka brokers for you, batching requests as appropriate.

using System;
using System.Threading.Tasks;
using Confluent.Kafka;

class Program
{
    public static async Task Main(string[] args)
    {
        var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

        // If serializers are not specified, default serializers from
        // `Confluent.Kafka.Serializers` will be automatically used where
        // available. Note: by default strings are encoded as UTF8.
        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync("test-topic", new Message<Null, string> { Value = "test" });
                Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }
}

Note that a server round-trip is slow (3ms at a minimum; actual latency depends on many factors). In highly concurrent scenarios you will achieve high overall throughput out of the producer using the above approach, but there will be a delay on each await call. In stream processing applications, where you would like to process many messages in rapid succession, you would typically use the Produce method instead:

using System;
using Confluent.Kafka;

class Program
{
    public static void Main(string[] args)
    {
        var conf = new ProducerConfig { BootstrapServers = "localhost:9092" };

        Action<DeliveryReport<Null, string>> handler = r =>
            Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}"
                : $"Delivery Error: {r.Error.Reason}");

        using (var p = new ProducerBuilder<Null, string>(conf).Build())
        {
            for (int i = 0; i < 100; ++i)
            {
                p.Produce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
            }

            // wait for up to 10 seconds for any inflight messages to be delivered.
            p.Flush(TimeSpan.FromSeconds(10));
        }
    }
}

Basic Consumer Example

using System;
using System.Threading;
using Confluent.Kafka;

class Program
{
    public static void Main(string[] args)
    {
        var conf = new ConsumerConfig
        {
            GroupId = "test-consumer-group",
            BootstrapServers = "localhost:9092",
            // Note: The AutoOffsetReset property determines the start offset in the event
            // there are not yet any committed offsets for the consumer group for the
            // topic/partitions of interest. By default, offsets are committed
            // automatically, so in this example, consumption will only start from the
            // earliest message in the topic 'my-topic' the first time you run the program.
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe("my-topic");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                // Prevent the process from terminating.
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

IHostedService and Web Application Integration

The Web example demonstrates how to integrate Apache Kafka with a web application, including how to implement IHostedService to realize a long running consumer poll loop, how to register a producer as a singleton service, and how to bind configuration from an injected IConfiguration instance.

Exactly Once Processing

The .NET Client has full support for transactions and idempotent message production, allowing you to write horizontally scalable stream processing applications with exactly once semantics. The ExactlyOnce example demonstrates this capability by way of an implementation of the classic "word count" problem, also demonstrating how to use the FASTER Key/Value store (similar to RocksDb) to materialize working state that may be larger than available memory, and incremental rebalancing to avoid stop-the-world rebalancing operations and unnecessary reloading of state when you add or remove processing nodes.

Schema Registry Integration

The three "Serdes" packages provide serializers and deserializers for Avro, Protobuf and JSON with Confluent Schema Registry integration. The Confluent.SchemaRegistry nuget package provides a client for interfacing with Schema Registry's REST API.

Note: All three serialization formats are supported across Confluent Platform. They each make different tradeoffs, and you should use the one that best matches to your requirements. Avro is well suited to the streaming data use-case, but the quality and maturity of the non-Java implementations lags that of Java - this is an important consideration. Protobuf and JSON both have great support in .NET.

Error Handling

Errors delivered to a client's error handler should be considered informational except when the IsFatal flag is set to true, indicating that the client is in an un-recoverable state. Currently, this can only happen on the producer, and only when enable.idempotence has been set to true. In all other scenarios, clients will attempt to recover from all errors automatically.

Although calling most methods on the clients will result in a fatal error if the client is in an un-recoverable state, you should generally only need to explicitly check for fatal errors in your error handler, and handle this scenario there.

Producer

When using Produce, to determine whether a particular message has been successfully delivered to a cluster, check the Error field of the DeliveryReport during the delivery handler callback.

When using ProduceAsync, any delivery result other than NoError will cause the returned Task to be in the faulted state, with the Task.Exception field set to a ProduceException containing information about the message and error via the DeliveryResult and Error fields. Note: if you await the call, this means a ProduceException will be thrown.

Consumer

All Consume errors will result in a ConsumeException with further information about the error and context available via the Error and ConsumeResult fields.

3rd Party

There are numerous libraries that expand on the capabilities provided by Confluent.Kafka, or use Confluent.Kafka to integrate with Kafka. For more information, refer to the 3rd Party Libraries page.

Confluent Cloud

For a step-by-step guide on using the .NET client with Confluent Cloud see Getting Started with Apache Kafka and .NET on Confluent Developer.

You can also refer to the Confluent Cloud example which demonstrates how to configure the .NET client for use with Confluent Cloud.

Developer Notes

Instructions on building and testing confluent-kafka-dotnet can be found here.

Copyright (c) 2016-2019 Confluent Inc. 2015-2016 Andreas Heider

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-dotnet. confluent-kafka-dotnet has no affiliation with and is not endorsed by The Apache Software Foundation.

More Repositories

1

librdkafka

The Apache Kafka C/C++ library
C
7,234
star
2

ksql

The database purpose-built for stream processing applications.
Java
5,533
star
3

confluent-kafka-go

Confluent's Apache Kafka Golang client
Go
4,402
star
4

confluent-kafka-python

Confluent's Kafka Python Client
Python
3,388
star
5

kafka-streams-examples

Demo applications and code examples for Apache Kafka's Streams API.
Java
2,169
star
6

kafka-rest

Confluent REST Proxy for Kafka
Java
2,137
star
7

schema-registry

Confluent Schema Registry for Kafka
Java
2,022
star
8

examples

Apache Kafka and Confluent Platform examples and demos
Shell
1,848
star
9

bottledwater-pg

Change data capture from PostgreSQL into Kafka
C
1,521
star
10

demo-scene

👾Scripts and samples to support Confluent Demos and Talks. ⚠️Might be rough around the edges ;-) 👉For automated tutorials and QA'd code, see https://github.com/confluentinc/examples/
Shell
1,356
star
11

cp-docker-images

[DEPRECATED] Docker images for Confluent Platform.
Python
1,140
star
12

kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Java
953
star
13

cp-all-in-one

docker-compose.yml files for cp-all-in-one , cp-all-in-one-community, cp-all-in-one-cloud, Apache Kafka Confluent Platform
Python
882
star
14

cp-helm-charts

The Confluent Platform Helm charts enable you to deploy Confluent Platform services on Kubernetes for development, test, and proof of concept environments.
Mustache
764
star
15

kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Java
715
star
16

parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
Java
612
star
17

cp-demo

Confluent Platform Demo including Apache Kafka, ksqlDB, Control Center, Schema Registry, Security, Schema Linking, and Cluster Linking
Shell
527
star
18

cp-ansible

Ansible playbooks for the Confluent Platform
Jinja
469
star
19

kafka-connect-hdfs

Kafka Connect HDFS connector
Java
465
star
20

kafka-tutorials

Tutorials and Recipes for Apache Kafka
Java
296
star
21

kafka-images

Confluent Docker images for Apache Kafka
Python
295
star
22

ducktape

System integration and performance tests
Python
294
star
23

kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Java
255
star
24

jmx-monitoring-stacks

📊 Monitoring examples for Confluent Cloud and Confluent Platform
Shell
236
star
25

confluent-kubernetes-examples

Example scenario workflows for Confluent for Kubernetes
Shell
147
star
26

kafka-rest-node

Node.js client for the Kafka REST proxy
JavaScript
146
star
27

confluent-platform-security-tools

Security tools for the Confluent Platform.
Shell
146
star
28

kafka-connect-datagen

Connector that generates data for demos
Java
143
star
29

docker-images

DEPRECATED - Dockerfiles for Confluent Stream Data Platform
Shell
116
star
30

rest-utils

Utilities and a small framework for building REST services with Jersey, Jackson, and Jetty.
Java
111
star
31

terraform-provider-confluent

Terraform Provider for Confluent
Go
105
star
32

cli

CLI for Confluent Cloud and Confluent Platform
Go
103
star
33

camus

Mirror of Linkedin's Camus
Java
88
star
34

openmessaging-benchmark

Java
87
star
35

confluent-sigma

JavaScript
87
star
36

common

Common utilities library containing metrics, config and utils
Java
85
star
37

streaming-ops

Simulated production environment running Kubernetes targeting Apache Kafka and Confluent components on Confluent Cloud. Managed by declarative infrastructure and GitOps.
Shell
83
star
38

learn-kafka-courses

Learn the basics of Apache Kafka® from leaders in the Kafka community with these video courses covering the Kafka ecosystem and hands-on exercises.
Shell
76
star
39

kafka-workshop

JavaScript
75
star
40

training-developer-src

Source Code accompanying the Confluent Kafka for Developers course
Java
70
star
41

kafka-connect-storage-common

Shared software among connectors that target distributed filesystems and cloud storage.
Java
69
star
42

ccloud-tools

Running Tools from Confluent Platform along with your Confluent Cloud™ Cluster
HCL
67
star
43

bincover

Easily measure code coverage of Golang binaries
Go
62
star
44

libserdes

Avro Serialization/Deserialization C/C++ library with Confluent schema-registry support
C
62
star
45

kafka-connect-blog

Demo for Kafka Connect with JDBC and HDFS Connectors
Shell
59
star
46

confluent-cli

Confluent Platform CLI
Shell
58
star
47

ksqldb-graphql

Node.js GraphQL integration for ksqlDB
TypeScript
56
star
48

data-mesh-demo

A Data Mesh proof-of-concept built on Confluent Cloud
Java
55
star
49

commercial-workshops

Confluent Commercial SE Team's Demo and Workshop Repository
Python
51
star
50

terraform-provider-confluentcloud

Confluent Cloud Terraform Provider is deprecated in favor of Confluent Terraform Provider
Go
51
star
51

confluent-hybrid-cloud-workshop

Confluent Hybrid Cloud Workshop
HCL
42
star
52

qcon-microservices

Example online orders app composed of event-driven microservices. Built for QCon workshop.
Java
38
star
53

securing-kafka-blog

Secure Kafka cluster (in a VM) for development and testing
Puppet
38
star
54

operator-earlyaccess

Confluent Operator Early Access docs
37
star
55

training-administration-src

Contains docker-compose file needed for Apache Kafka Administration by Confluent training
HTML
36
star
56

mox

A hybrid mock and proxy server - easily programmable and runs on express
JavaScript
35
star
57

terraform-state-s3

Terraform module to create the S3/DynamoDB backend to store the Terraform state+lock
HCL
34
star
58

common-docker

Confluent Commons with support for building and testing Docker images.
Java
34
star
59

ksql-recipes-try-it-at-home

Files needed to try out KSQL Recipes for yourself
Shell
34
star
60

demo-database-modernization

This demo shows how to stream data to cloud databases with Confluent. It includes fully-managed connectors (Oracle CDC, RabbitMQ, MongoDB Atlas), ksqlDB/Flink SQL as stream processing engine.
HCL
32
star
61

ccloud-connectivity

Setup and testing connectivity to Confluent Cloud
Shell
31
star
62

training-ksql-and-streams-src

Sample solutions for the exercises of the course KSQL & Kafka Streams
Java
30
star
63

pmm

Java
30
star
64

DotNetStreamProcessing

This repository explores stream processing with the Confluent .NET Producer, Consumer and the TPL library
C#
30
star
65

schema-registry-images

Docker Images for Schema Registry
Python
29
star
66

confluent-docker-utils

Common Python utils for testing Confluent's Docker images
Python
28
star
67

ksql-images

KSQL platform docker images
Shell
27
star
68

live-labs

This repository contains demos created by Technical Marketing team and delivered as part of Confluent Live Labs program. The team members will continuously publish more demos over time, meanwhile we encourage you to create a Confluent Cloud account and try the existing content.
HTML
26
star
69

proto-go-setter

Go
23
star
70

online-inferencing-blog-application

Source code and application accompanying the online inferencing blog
Java
21
star
71

coding-in-motion

Source code for the "Coding in Motion" series.
Nix
21
star
72

stream-me-up-scotty

A wide range of Digital Assets from Confluent's Solution Engineering team for Confluent Cloud
20
star
73

training-fundamentals-src

Source code accompanying the course "Apache Kafka Technical Essentials"
Shell
19
star
74

infoq-kafka-ksql

Code samples to go with InfoQ article
Shell
17
star
75

kafka-rest-images

Docker Images for Kafka REST
Python
17
star
76

kafka-mqtt-images

Confluent Docker images for Kafka MQTT
Shell
16
star
77

training-cao-src

Source code accompanying the course "Monitoring, Troubleshooting and Tuning"
Java
13
star
78

demo-realtime-data-warehousing

Streaming data pipelines for real-time data warehousing. Includes fully managed connectors (PostgreSQL CDC, Snowflake).
HCL
13
star
79

event-streaming-patterns

A collection of Event Streaming Patterns, including problem statements, solutions, and implementation examples.
HTML
12
star
80

kibosh

C
12
star
81

ksql-workshop

KSQL Workshop
11
star
82

control-center-images

Docker images for enterprise control center images
Python
11
star
83

learn-building-flink-applications-in-java-exercises

Java
11
star
84

kafka-connect-http-demo

A demo target for running the Confluent HTTP sink connector
Java
11
star
85

castle

Castle is a test harness for Apache Kafka, Trogdor, and related projects.
Java
11
star
86

kafkacat-images

Docker Images for Kafkacat
10
star
87

ksqldb-recipes

Makefile
10
star
88

confluent-kafka-go-dev

[EXPERIMENTAL] Development / WIP / exploratory / test fork of confluent-kafka-go
Go
10
star
89

apac-workshops

Pull Requests for GitHub repository settings
Jupyter Notebook
10
star
90

cfk-workshop

Java
10
star
91

csid-secrets-providers

Enables use of external third-party systems for storing/retrieving key/value pairs with Confluent clusters.
Java
10
star
92

demo-change-data-capture

This demo shows how to capture data changes from relational databases (Oracle and PostgreSQL) and stream them to Confluent Cloud, use ksqlDB for real-time stream processing, send enriched data to cloud data warehouses (Snowflake and Amazon Redshift).
HCL
10
star
93

learn-apache-flink-101-exercises

Dockerfile
9
star
94

demo-stream-designer

Current 2022 Confluent Keynote Demo covering Stream Designer, Stream Catalog, and Stream Sharing.
Python
9
star
95

learn-kafka-kraft

KRaft mode playground
Shell
9
star
96

policy-library-confluent-terraform

HCL
9
star
97

hackathons

Contains skeleton projects for hackathons.
Python
8
star
98

ksql-elasticsearch-demo

TSQL
8
star
99

demo-application-modernization

Application modernization example including Confluent Cloud, ksqlDB, Postgres, and Elasticsearch.
JavaScript
8
star
100

strata-tutorials

Content for Spring 2016 Strata tutorials
Java
7
star