• This repository has been archived on 09/Apr/2022
  • Stars
    star
    110
  • Rank 316,770 (Top 7 %)
  • Language
    C#
  • License
    Apache License 2.0
  • Created over 8 years ago
  • Updated over 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A C# Kafka driver

UPDATE 2022: project no longer maintained

kafka-sharp was built at a time where there was no .NET Kafka driver available matching our performance requirements. As the Kafka protocol evolved a lot, it became costly to maintain. In the meantime, the team behind Kafka has developed their own driver and in 2019 the version 1.0 was released Also, new use-cases arised at Criteo for which this software was not design for. For these reasons, Criteo migrated to confluent-kafka-dotnet and this project is no longer maintained. Users should consider moving to the official driver.

"High Performance" .NET Kafka Driver

A .NET implementation of the Apache Kafka client side protocol geared toward performance (both throughput and memory wise). It is especially suited for scenarios where applications are streaming a large number of messages across a fair number of topics.

Features

  • Fully asynchronous batched Producer
  • Simple asynchronous event based Consumer
  • Consumer groups support
  • No Zookeeper dependency
  • Compression support (both Gzip and Snappy)
  • High configurability
  • Memory friendly APIs and internals

Cluster

The ClusterClient class is the main entry point to access a Kafka cluster. It provides an untyped API to Produce and Consume operations.

	var cluster = new ClusterClient(new Configuration{Seeds = "broker.local:9091"}, new SomeLogger());
	cluster.Produce("some_topic", someValue);

	cluster.MessageReceived += kafkaRecord => { /* do something */ };
	cluster.ConsumeFromLatest("some_topic", somePartition);
	// OR (for consumer group usage)
	cluster.Subscribe("some group", new[] { "topic", "some_other_topic" }, new ConsumerGroupConfiguration { AutoCommitEveryMs = 5000 });

Producer

Producer API is accessed either through the ClusterClient class or the KafkaProducer class if you prefer a typed interface. Configuration options allow:

  • setting the batch size and time window for Produce requests
  • setting a strategy for error management (either discard messages or retry)
  • setting the maximum number of pending messages allowed in the system (can be uncapped)
  • setting a TTL on messages: messages will be kept up to the TTL in the driver in case of difficulties to send Produce request to the Kafka brokers.

Partitioning

By default messages are round robined between all available partitions. The NumberOfMessagesBeforeRoundRobin enables tweaking the round robin scheme to pack more messages in a partition before switching to another. This is especially useful when compressing the data.

If you need custom partitioning it's possible to send messages directly to a given partition.

Serialization

Serialization of messages can be handled two ways:

  • you can provide a serializer for each topic, as well as providing a default serializer for all topics.
  • if your message keys or values implement the provided IMemorySerializable interface, that will be used to serialize data.

You also have the option serialize the messages a soon as they enter the producer or at send time. In the first case no reference will be held on the original data (key and value) so it may be more efficient memory wise, however in case of error ending in discarding/expiring messages the producer won't be able to provide the original message when signaling the error. In the later case, references will be held and the original data will be returned in case of error. This means the original data will survive for the duration of a batching cycle, so more generation 2 garbage collection is to be expected, however if the original data implements IDisposable, Dispose will be called in due time, which may help implementing a pooling mechanism.

Compression

Both gzip and snappy compression are supported. This is controlled by a parameter of the ClusterClient configuration. Snappy makes use of the native Google library.

Error handling

Most errors will be automanaged by the driver. "Irrecoverable" errors which end up discarding some messages are signaled through events.

Consumer

You can either consume messages as member of a consumer group or as a standalone simple consumer. In simple mode you can discover topics and partitions and choose which ones to consume. In consumer group mode you just start a subscription and let the protocol handle the partition attribution.

Messages are received as a stream of events. The API is accessed either through ClusterClient (untyped API) or KafkaConsumer (typed API). When using a consumer group you can manage offsets via specifying an autcommit period or requiring commits directly (or mix both). In simple mode you can directly specify which offsets to start consuming from (including latest and earliest offsets). Pause and Resume methods are available in both modes to stop / resume consuming from a topic.

Deserialization

You can provide a deserializer for each topic so that messages are returned as objects instead of byte arrays.

Configuration

The client is configured via the Configuration class, by setting the following properties:

    /// <summary>
    /// Kafka version compatibility mode.
    /// 0.8.2 compatibility will work with any kafka version >= 0.8.2.
    /// However consumer group support will require brokers >= 0.9 to actually work in this mode.
    /// 0.10.1 compatibility will work with kafka >= 0.10.1.
    /// </summary>
    public Compatibility Compatibility = Compatibility.V0_8_2;

    /// <summary>
    /// Maximum amount a message can stay alive before being discarded in case of repeated errors.
    /// </summary>
    public TimeSpan MessageTtl = TimeSpan.FromMinutes(1);

    /// <summary>
    /// Period between each metadata autorefresh.
    /// </summary>
    public TimeSpan RefreshMetadataInterval = TimeSpan.FromMinutes(5);

    /// <summary>
    /// Strategy in case of network errors.
    /// </summary>
    public ErrorStrategy ErrorStrategy = ErrorStrategy.Discard;

    /// <summary>
    /// Time slice for batching messages. We wait  that much time at most before processing
    /// a batch of messages.
    /// </summary>
    public TimeSpan ProduceBufferingTime = TimeSpan.FromMilliseconds(5000);

    /// <summary>
    /// Maximum size of message batches.
    /// </summary>
    public int ProduceBatchSize = 200;

    /// <summary>
    /// Strategy for batching (per node, or global).
    /// </summary>
    public BatchStrategy BatchStrategy = BatchStrategy.ByNode;

    /// <summary>
    /// The compression codec used to compress messages to this cluster.
/// Lz4 is only supported with compatibility set to 0.10.1+
    /// </summary>
    public CompressionCodec CompressionCodec = CompressionCodec.None;

    /// <summary>
    /// If you don't provide a partition when producing a message, the partition selector will
    /// round robin between all available partitions. Use this variable to delay switching between
    /// partitions until a set number of messages has been sent (on a given topic).
    /// This is useful if you have a large number of partitions per topic and want to fully take
    /// advantage of compression (because message sets are compressed per topic/per partition).
    /// </summary>
    public int NumberOfMessagesBeforeRoundRobin = 1;

    /// <summary>
    /// Socket buffer size for send.
    /// </summary>
    public int SendBufferSize = 100 * 1024;

    /// <summary>
    /// Socket buffer size for receive.
    /// </summary>
    public int ReceiveBufferSize = 64 * 1024;

    /// <summary>
    /// Acknowledgements required.
    /// </summary>
    public RequiredAcks RequiredAcks = RequiredAcks.AllInSyncReplicas;

    /// <summary>
    /// Minimum in sync replicas required to consider a partition as alive.
    /// <= 0 means only leader is required.
    /// </summary>
    public int MinInSyncReplicas = -1;

    /// <summary>
    /// If you use MinInSyncReplicas and a broker replies with the NotEnoughReplicasAfterAppend error,
    /// we will resend the messages if this value is true.
    /// </summary>
    public bool RetryIfNotEnoughReplicasAfterAppend = false;

    /// <summary>
    /// Kafka server side timeout for requests.
    /// </summary>
    public int RequestTimeoutMs = 10000;

    /// <summary>
    /// Client side timeout: if a request is not acknowledged
    /// by this time, the connection will be reset. Obviously
    /// it should be greater than RequestTimeoutMs
    /// </summary>
    public int ClientRequestTimeoutMs = 20000;

    /// <summary>
    /// The maximum number of unacknowledged requests the client will
    /// send on a single connection before async waiting for the connection.
    /// Note that if this setting is set to be greater than 1 and there are
    /// failed sends, there is a risk of message re-ordering due to retries.
    /// </summary>
    public int MaxInFlightRequests = 5;

    /// <summary>
    /// Your client name.
    /// </summary>
    public string ClientId = "Kafka#";

    /// <summary>
    /// Brokers to which to connect to boostrap the cluster and discover the toplogy.
    /// </summary>
    public string Seeds = "";

    /// <summary>
    /// A TaskScheduler to use for all driver internal work.
    /// Useful if you want to limit the ressources taken by the driver.
    /// By default we use the default scheduler (which maps to .NET thread pool),
    /// which may induce "busy neighbours" problems in case of high overload.
    ///
    /// If this is not TaskScheduler.Default, this will always superseed
    /// the MaximumConcurrency variable.
    /// </summary>
    public TaskScheduler TaskScheduler = TaskScheduler.Default;

    /// <summary>
    /// Maximum concurrency used inside the driver. This is only taken into
    /// account if TaskScheduler is set to its default value. This is
    /// implemented using a special custom TaskScheduler which makes use of the
    /// .NET threadpool without squatting threads when there's nothing to do.
    /// </summary>
    public int MaximumConcurrency = 3;

    /// <summary>
    /// Maximum number of messages in the system before blocking/discarding send from clients.
    /// By default we never block and the number is unbounded.
    /// </summary>
    public int MaxBufferedMessages = -1;

    /// <summary>
    /// The strategy to use when the maximum number of pending produce messages
    /// has been reached. By default we block.
    /// </summary>
    public OverflowStrategy OverflowStrategy = OverflowStrategy.Block;

    /// <summary>
    /// The maximum amount of time in ms brokers will block before answering fetch requests
    /// if there isn't sufficient data to immediately satisfy FetchMinBytes
    /// </summary>
    public int FetchMaxWaitTime = 100;

    /// <summary>
    /// The minimum amount of data brokers should return for a fetch request.
    /// If insufficient data is available the request will wait for that much
    /// data to accumulate before answering the request.
    /// </summary>
    public int FetchMinBytes = 1;

    /// <summary>
    /// The number of bytes of messages to attempt to fetch for each topic-partition
    /// in each fetch request. These bytes will be read into memory for each partition,
    /// so this helps control the memory used by the consumer. The fetch request size
    /// must be at least as large as the maximum message size the server allows or else
    /// it is possible for producers to send messages larger than the consumer can fetch.
    /// </summary>
    public int FetchMessageMaxBytes = 1024 * 1024;

    /// <summary>
    /// Time slice for batching messages used when consuming (Offset and Fetch).
    /// We wait  that much time at most before processing a batch of messages.
    /// </summary>
    public TimeSpan ConsumeBufferingTime = TimeSpan.FromMilliseconds(1000);

    /// <summary>
    /// Maximum size of consume message batches (Offset and Fetch). Keep it small.
    /// </summary>
    public int ConsumeBatchSize = 10;

    /// <summary>
    /// Serialization configuration options and (de)serializers.
    /// </summary>
    public SerializationConfig SerializationConfig = new SerializationConfig();

Technical details

Internals

The driver makes heavy use of the Tpl.Dataflow library for its actors system. Rx.NET streams of all events are also provided. Of course plain .NET events are also provided.

Network layer

The driver makes direct use of the .NET asynchronous Socket API (the one based on SocketAsyncEventArgs).

Memory management

Pretty much everything is pooled in the driver and message buffers are never pinned so the stress of the driver on the GC should stay low.

When producing messages you can control the maximum number of pending messages via the MaxBufferedMessages configuration option. When the limit is reached the Send operations will become blocking or will discard messages according to the value of the OverflowStrategy configuration option.

You can also decide if messages are preserialized upon calling Produce methods or kept until an entire batch is serialized. In the latter case if your message keys/values implement IDisposable they will be disposed when the messages have effectively been sent (you may take advantage of this to implement a pooling mechanism for keys/values).

When consuming messages, the FetchMessageMaxBytes configuration option can help limit the maximum amount of data retrieved in consumer operations. Also the MessageReceived event is always emitted from one thread at a time and until the subscribers return the consumer will not perform any operation. You can take advantage of that to effectively throttle the consumer.

Threading

Threading can be controlled by the MaximumConcurrency configuration parameter. Alternatively you can provide your own TaskScheduler via the TaskScheduler configuration option. In any case the driver will never squat threads, all IO operations are non blocking and the driver never does blocking wait, so any concurrency configuration is only a hint on up to how many threads the driver may be using at a given time. If you don't provide your own TaskScheduler, the threads will be picked from the .NET threadpool. By default the driver will use at most 3 threads from the threadpool.

Acknowledgements

More Repositories

1

autofaiss

Automatically create Faiss knn indices with the most optimal similarity search parameters.
Python
811
star
2

cassandra_exporter

Apache Cassandra® metrics exporter for Prometheus
Java
169
star
3

biggraphite

Simple Scalable Time Series Database
Python
130
star
4

babar

Profiler for large-scale distributed java applications (Spark, Scalding, MapReduce, Hive,...) on YARN.
Java
125
star
5

kerberos-docker

Run kerberos environment in docker containers
Shell
124
star
6

cuttle

An embedded job scheduler.
Scala
114
star
7

lolhttp

An HTTP Server and Client library for Scala.
Scala
91
star
8

tf-yarn

Train TensorFlow models on YARN in just a few lines of code!
Python
86
star
9

consul-templaterb

consul-template-like with erb (ruby) template expressiveness
Ruby
77
star
10

Spark-RSVD

Randomized SVD of large sparse matrices on Spark
Scala
77
star
11

JVips

Java wrapper for libvips using JNI.
Java
72
star
12

deepr

The deepr module provide abstractions (layers, readers, prepro, metrics, config) to help build tensorflow models on top of tf estimators
Python
50
star
13

cluster-pack

A library on top of either pex or conda-pack to make your Python code easily available on a cluster
Python
45
star
14

findjars

Gradle plugin to debug classpath issues
Kotlin
45
star
15

py-consul

Python client for Consul (http://www.consul.io/)
Python
44
star
16

kafka-ganglia

Kafka Ganglia Metrics Reporter
Java
39
star
17

garmadon

Java event logs collector for hadoop and frameworks
Java
39
star
18

graphite-remote-adapter

Fully featured graphite remote adapter for Prometheus
Go
38
star
19

command-launcher

A command launcher 🚀 made with ❤️
Go
36
star
20

marathon_exporter

A Prometheus metrics exporter for the Marathon Mesos framework
Go
34
star
21

haproxy-spoe-auth

Plugin for authorizing users against LDAP
Go
33
star
22

netprobify

Network probing tool crafted for datacenters (but not only)
Python
31
star
23

vizsql

Scala and SQL happy together.
Scala
28
star
24

haproxy-spoe-go

An implementation of the SPOP protocol in Go. https://www.haproxy.org/download/2.0/doc/SPOE.txt
Go
28
star
25

CriteoDisplayCTR-TFOnSpark

Python
27
star
26

netcompare

Python
26
star
27

loop

enhance your web application development workflow
JavaScript
26
star
28

openapi-comparator

C#
25
star
29

fromconfig

A library to instantiate any Python object from configuration files.
Python
24
star
30

vertica-hyperloglog

C++
22
star
31

slab

An extensible Scala framework for creating monitoring dashboards.
Scala
22
star
32

consul-bench

A tool to bench Consul Clusters
Go
20
star
33

socco

A Scala compiler plugin to generate documentation from Scala source files.
Scala
20
star
34

hwbench

hwbench is a benchmark orchestration tool to automate the low-level testing of servers.
Python
20
star
35

mesos-term

Web terminal and sandbox explorer for your mesos containers
TypeScript
19
star
36

memcache-driver

Criteo's .NET MemCache driver
C#
16
star
37

NinjaTurtlesMutation

C#
16
star
38

defcon

DefCon - Status page and API for production status
Python
16
star
39

vagrant-winrm

Vagrant 1.6+ plugin extending WinRM communication features
Ruby
16
star
40

criteo-python-marketing-sdk

Official Python SDK to access the Criteo Marketing API
Python
15
star
41

mesos-external-container-logger

Mesos container logger module for logging to processes, backported from MESOS-6003
C++
13
star
42

android-publisher-sdk

Criteo Publisher SDK for Android
Java
12
star
43

lobster

Simple loop job runner
Ruby
12
star
44

ocserv-exporter

ocserv exporter for Prometheus
Go
11
star
45

berilia

Create hadoop cluster in aws ec2 for development
Scala
11
star
46

mlflow-yarn

Backend implementation for running MLFlow projects on Hadoop/YARN.
Python
10
star
47

openpass

TypeScript
10
star
48

ios-publisher-sdk

Criteo Publisher SDK for iOS
Objective-C
10
star
49

traffic-mirroring

Go
8
star
50

ipam-client

Python ipam-client library
Python
7
star
51

eslint-plugin-criteo

JavaScript
7
star
52

tableau-parser

Scala
7
star
53

gourde

Flask sugar for Python microservices
Python
7
star
54

criteo-java-marketing-sdk

Official Java SDK to access the Criteo Marketing API
Java
7
star
55

metrics-net

Archived: Capturing CLR and application-level metrics. So you know what's going on.
C#
6
star
56

casspoke

Prometheus probe exporter for Cassandra latency and availability
Java
6
star
57

newman-server

A simple webserver to run Postman collections using the newman engine
TypeScript
6
star
58

mewpoke

Memcached / couchbase probe
Java
6
star
59

je-code-crazy-filters

Python
6
star
60

http-proxy-exporter

Expose proxy performance statistics in a Prometheus-friendly way.
Go
5
star
61

kitchen-transport-speedy

Speed up kitchen file transfer using archives
Ruby
5
star
62

AFK

5
star
63

vertica-datasketch

C++
5
star
64

django-memcached-consul

Used consul discovered memcached servers
Python
4
star
65

skydive-visualizer

Go
4
star
66

log4j-jndi-jar-detector

Application trying to detect processes vulnerable to log4j JNDI exploit
Go
4
star
67

criteo-api-python-sdk

Python
4
star
68

RabbitMQHare

High-level RabbitMQ C# client
C#
4
star
69

hardware-manifesto

Criteo's hardware operating principles manifest
4
star
70

automerge-plugin

Gerrit plugin to automatically merge reviews
Java
4
star
71

cassback

This project aims to backup Cassandra SSTables and store them into HDFS
Ruby
4
star
72

vertica-hll-druid

C++
3
star
73

fromconfig-mlflow

A fromconfig Launcher for MlFlow
Python
3
star
74

hive-client

A Pure Scala/Thrift Hive Client
Thrift
3
star
75

android-events-sdk

Java
3
star
76

rundeck-dsl

Groovy
3
star
77

tableau-maven-plugin

Java
3
star
78

ml-hadoop-experiment

Python
3
star
79

android-publisher-sdk-examples

Java
3
star
80

vault-auth-plugin-chef

Go
3
star
81

mesos-command-modules

Mesos modules running external commands
C++
3
star
82

sonic-saltstack

Saltstack modules for SONiC
Python
3
star
83

scala-schemas

use scala classes as schema definition across different systems
Scala
3
star
84

tf-collective-all-reduce

Lightweight framework for distributed TensorFlow training based on dmlc/rabit
Python
3
star
85

criteo-marketing-sdk-generator

A Gradle project to generate custom SDKs for Criteo's marketing API
Mustache
3
star
86

s3-probe

Go
3
star
87

blackbox-prober

Go
3
star
88

netbox-network-cmdb

Python
3
star
89

kitchen-vagrant_winrm

A test-kitchen driver using vagrant-winrm
Ruby
2
star
90

graphite-dashboard-api

Graphite Dashboard API
Ruby
2
star
91

nrpe_exporter

Go
2
star
92

criteo-dotnet-blog

C#
2
star
93

criteo-java-marketing-transition-sdk

Java
2
star
94

pgwrr

Python
2
star
95

criteo-python-marketing-transition-sdk

Python
2
star
96

privacy

2
star
97

knife-ssh-agent

Authenticate to a chef server using a SSH agent
Ruby
2
star
98

carbonate-utils

Utilities for carbonate - resync whisper easilly
Python
2
star
99

ios-events-sdk

Objective-C
2
star
100

gtm-criteo-useridentification

Smarty
2
star