• Stars
    star
    282
  • Rank 141,681 (Top 3 %)
  • Language
    Rust
  • License
    Apache License 2.0
  • Created over 2 years ago
  • Updated about 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A minimal Rust client for Apache Kafka

RSKafka

CircleCI Crates.io Documentation License

This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log.

It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of IOx.

This crate has:

  • No support for offset tracking, consumer groups, transactions, etc...
  • No built-in buffering, aggregation, linger timeouts, etc...
  • Independent write streams per partition

It will be a good fit for workloads that:

  • Perform offset tracking independently of Kafka
  • Read/Write reasonably sized payloads per-partition
  • Have a low number of high-throughput partitions 1

Usage

# async fn test() {
use rskafka::{
    client::{
        ClientBuilder,
        partition::{Compression, UnknownTopicHandling},
    },
    record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;

// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();

// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
    topic,
    2,      // partitions
    1,      // replication factor
    5_000,  // timeout (ms)
).await.unwrap();

// get a partition-bound client
let partition_client = client
    .partition_client(
        topic.to_owned(),
        0,  // partition
        UnknownTopicHandling::Retry,
     )
     .await
    .unwrap();

// produce some data
let record = Record {
    key: None,
    value: Some(b"hello kafka".to_vec()),
    headers: BTreeMap::from([
        ("foo".to_owned(), b"bar".to_vec()),
    ]),
    timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();

// consume data
let (records, high_watermark) = partition_client
    .fetch_records(
        0,  // offset
        1..1_000_000,  // min..max bytes
        1_000,  // max wait time
    )
   .await
   .unwrap();
# }

For more advanced production and consumption, see [crate::client::producer] and [crate::client::consumer].

Features

  • compression-gzip (default): Support compression and decompression of messages using gzip.
  • compression-lz4 (default): Support compression and decompression of messages using LZ4.
  • compression-snappy (default): Support compression and decompression of messages using Snappy.
  • compression-zstd (default): Support compression and decompression of messages using zstd.
  • full: Includes all stable features (compression-gzip, compression-lz4, compression-snappy, compression-zstd, transport-socks5, transport-tls).
  • transport-socks5: Allow transport via SOCKS5 proxy.
  • transport-tls: Allows TLS transport via rustls.
  • unstable-fuzzing: Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable feature / API!

Testing

Redpanda

To run integration tests against Redpanda, run:

$ docker-compose -f docker-compose-redpanda.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test

in another session.

Apache Kafka

To run integration tests against Apache Kafka, run:

$ docker-compose -f docker-compose-kafka.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 KAFKA_SASL_CONNECT=localhost:9097 cargo test

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other environment variables.

Using a SOCKS5 Proxy

To run the integration test via a SOCKS5 proxy, you need to set the environment variable SOCKS_PROXY. The following command requires a running proxy on the local machine.

$ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full

The SOCKS5 proxy will automatically be started by the docker compose files. Note that KAFKA_CONNECT was extended by addresses that are reachable via the proxy.

Java Interopt

To test if RSKafka can produce/consume records to/from the official Java client, you need to have Java installed and the TEST_JAVA_INTEROPT=1 environment variable set.

Fuzzing

RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have cargo-fuzz installed. Select one of the following fuzzers:

  • protocol_reader: Selects an API key and API version and then reads message frames and tries to decode the response object. The message frames are read w/o the length marker for more efficient fuzzing.
  • record_batch_body_reader: Reads the inner part of a record batch (w/o the prefix that contains length and CRC) and tries to decode it. In theory this is covered by protocol_reader as well but the length fields and CRC make it hard for the fuzzer to traverse this data structure.

Then run the fuzzer with:

$ cargo +nightly fuzz run protocol_reader
...

Let it running for how long you wish or until it finds a crash:

...
Failing input:

        fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Output of `std::fmt::Debug`:

        [0, 18, 0, 3, 0, 0, 0, 0, 71, 88, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 18, 18, 0, 164, 0, 164, 164, 164, 30, 164, 164, 0, 0, 0, 0, 63]

Reproduce with:

        cargo fuzz run protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Minimize test case with:

        cargo fuzz tmin protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Sadly the backtraces that you might get are not really helpful and you need a debugger to detect the exact source locations:

$ rust-lldb ./target/x86_64-unknown-linux-gnu/release/protocol_reader fuzz/artifacts/protocol_reader/crash-7b824dad6e26002e5488e8cc84ce16728222dcf5
...

(lldb) r
...
Process 177543 launched: '/home/mneumann/src/rskafka/target/x86_64-unknown-linux-gnu/release/protocol_reader' (x86_64)
INFO: Running with entropic power schedule (0xFF, 100).
INFO: Seed: 3549747846
...
==177543==ABORTING
(lldb) AddressSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.
Process 177543 stopped
...

(lldb) bt
* thread #1, name = 'protocol_reader', stop reason = AddressSanitizer detected: allocation-size-too-big
  * frame #0: 0x0000555556c04f20 protocol_reader`::AsanDie() at asan_rtl.cpp:45:7
    frame #1: 0x0000555556c1a33c protocol_reader`__sanitizer::Die() at sanitizer_termination.cpp:55:7
    frame #2: 0x0000555556c01471 protocol_reader`::~ScopedInErrorReport() at asan_report.cpp:190:7
    frame #3: 0x0000555556c021f4 protocol_reader`::ReportAllocationSizeTooBig() at asan_report.cpp:313:1
...

Then create a unit test and fix the bug.

For out-of-memory errors LLDB does not stop automatically. You can however set a breakpoint before starting the execution that hooks right into the place where it is about to exit:

(lldb) b fuzzer::PrintStackTrace()

Benchmarks

Install cargo-criterion, make sure you have some Kafka cluster running, and then you can run all benchmarks with:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features

If you find a benchmark that is too slow, you can may want to profile it. Get cargo-with, and perf, then run (here for the parallel/rskafka benchmark):

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
    bench --all-features --bench write_throughput -- \
    --bench --noplot parallel/rskafka

Have a look at the report:

$ perf report

License

Licensed under either of these:

Contributing

Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.

Footnotes

  1. Kafka's design makes it hard for any client to support the converse, as ultimately each partition is an independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest ↩

More Repositories

1

influxdb

Scalable datastore for metrics, events, and real-time analytics
Rust
27,320
star
2

telegraf

The plugin-driven server agent for collecting & reporting metrics.
Go
13,778
star
3

kapacitor

Open source framework for processing, monitoring, and alerting on time series data
Go
2,279
star
4

influxdb_iox

Pronounced (influxdb eye-ox), short for iron oxide. This is the new core of InfluxDB written in Rust on top of Apache Arrow.
Rust
1,803
star
5

influxdb-python

Python client for InfluxDB
Python
1,678
star
6

chronograf

Open source monitoring and visualization UI for the TICK stack
TypeScript
1,477
star
7

influxdb-java

Java client for InfluxDB
Java
1,156
star
8

influxdb-relay

Service to replicate InfluxDB data for high availability
Python
830
star
9

flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
FLUX
753
star
10

influxdb-client-python

InfluxDB 2.0 python client
Python
664
star
11

influxdb-client-go

InfluxDB 2 Go Client
Go
572
star
12

sandbox

A sandbox for the full TICK stack
Shell
475
star
13

go-syslog

Blazing fast syslog parser
Go
468
star
14

influxdb-php

influxdb-php: A PHP Client for InfluxDB, a time series database
PHP
430
star
15

influxdb-client-java

InfluxDB 2 JVM Based Clients
Java
412
star
16

influxdb-client-csharp

InfluxDB 2.x C# Client
C#
337
star
17

community-templates

InfluxDB Community Templates: Quickly collect & analyze time series data from a range of sources: Kubernetes, MySQL, Postgres, AWS, Nginx, Jenkins, and more.
Python
332
star
18

influxdb-client-js

InfluxDB 2.0 JavaScript client
TypeScript
316
star
19

influxdata-docker

Official docker images for the influxdata stack
Shell
314
star
20

influxdb-comparisons

Code for comparison write ups of InfluxDB and other solutions
Go
306
star
21

docs.influxdata.com-ARCHIVE

ARCHIVE - 1.x docs for InfluxData
Less
253
star
22

helm-charts

Official Helm Chart Repository for InfluxData Applications
Mustache
212
star
23

influxdb-rails

Ruby on Rails bindings to automatically write metrics into InfluxDB
Ruby
205
star
24

influxdb-csharp

A .NET library for efficiently sending points to InfluxDB 1.x
C#
198
star
25

influxdb1-client

The old clientv2 for InfluxDB 1.x
Go
187
star
26

giraffe

A foundation for visualizations in the InfluxDB UI
TypeScript
178
star
27

influxql

Package influxql implements a parser for the InfluxDB query language.
Go
163
star
28

influxdb-client-php

InfluxDB (v2+) Client Library for PHP
PHP
140
star
29

tdigest

An implementation of Ted Dunning's t-digest in Go.
Go
126
star
30

influx-stress

New tool for generating artificial load on InfluxDB
Go
118
star
31

tick-charts

A repository for Helm Charts for the full TICK Stack
Smarty
90
star
32

ui

UI for InfluxDB
TypeScript
86
star
33

telegraf-operator

telegraf-operator helps monitor application on Kubernetes with Telegraf
Go
79
star
34

pbjson

Auto-generate serde implementations for prost types
Rust
79
star
35

inch

An InfluxDB benchmarking tool.
Go
78
star
36

influxdata-operator

A k8s operator for InfluxDB
Go
76
star
37

docs-v2

InfluxData Documentation that covers InfluxDB Cloud, InfluxDB OSS 2.x, InfluxDB OSS 1.x, InfluxDB Enterprise, Telegraf, Chronograf, Kapacitor, and Flux.
SCSS
66
star
38

wirey

Manage local wireguard interfaces in a distributed system
Go
62
star
39

influxdb-go

61
star
40

influx-cli

CLI for managing resources in InfluxDB v2
Go
58
star
41

terraform-aws-influx

Reusable infrastructure modules for running TICK stack on AWS
HCL
50
star
42

grade

Track Go benchmark performance over time by storing results in InfluxDB
Go
43
star
43

influxdb-r

R library for InfluxDB
R
43
star
44

influxdb-observability

Go
43
star
45

clockface

UI Kit for building Chronograf
TypeScript
43
star
46

nginx-influxdb-module

C
40
star
47

influxdb2-sample-data

Sample data for InfluxDB 2.0
JavaScript
40
star
48

influxdb-client-ruby

InfluxDB 2.0 Ruby Client
Ruby
40
star
49

tensorflow-influxdb

Jupyter Notebook
34
star
50

nifi-influxdb-bundle

InfluxDB Processors For Apache NiFi
Java
33
star
51

line-protocol

Go
33
star
52

whisper-migrator

A tool for migrating data from Graphite Whisper files to InfluxDB TSM files (version 0.10.0).
Go
33
star
53

iot-center-flutter

InlfuxDB 2.0 dart client flutter demo
Dart
31
star
54

kube-influxdb

Configuration to monitor Kubernetes with the TICK stack
Shell
30
star
55

k8s-kapacitor-autoscale

Demonstration of using Kapacitor to autoscale a k8s deployment
Go
30
star
56

terraform-aws-influxdb

Deploys InfluxDB Enterprise to AWS
HCL
29
star
57

catslack

Shell -> Slack the easy way
Go
28
star
58

influxdb-operator

The Kubernetes operator for InfluxDB and the TICK stack.
Go
27
star
59

flux-lsp

Implementation of Language Server Protocol for the flux language
Rust
26
star
60

influxdb-client-swift

InfluxDB (v2+) Client Library for Swift
Swift
26
star
61

flightsql-dbapi

DB API 2 interface for Flight SQL with SQLAlchemy extras.
Python
26
star
62

influxdb-c

C
25
star
63

influxdb-client-dart

InfluxDB (v2+) Client Library for Dart and Flutter
Dart
24
star
64

ansible-chrony

A role to manage chrony on Linux systems
Ruby
24
star
65

kapacitor-course

24
star
66

vsflux

Flux language extension for VSCode
TypeScript
24
star
67

grafana-flightsql-datasource

Grafana plugin for Flight SQL APIs.
TypeScript
24
star
68

influxdb-scala

Scala client for InfluxDB
Scala
22
star
69

cron

A fast, zero-allocation cron parser in ragel and golang
Go
21
star
70

influxdb-plugin-fluent

A buffered output plugin for Fluentd and InfluxDB 2
Ruby
21
star
71

terraform-google-influx

Reusable infrastructure modules for running TICK stack on GCP
Shell
20
star
72

influxdb3_core

InfluxData's core functionality for InfluxDB Edge and IOx
Rust
18
star
73

openapi

An OpenAPI specification for influx (cloud/oss) apis.
Shell
17
star
74

influxdb-university

InfluxDB University
Python
16
star
75

influxdb-client-r

InfluxDB (v2+) Client R Package
R
14
star
76

cd-gitops-reference-architecture

Details of the CD/GitOps architecture in use at InfluxData
Shell
13
star
77

kafka-connect-influxdb

InfluxDB 2 Connector for Kafka
Scala
13
star
78

oats

An OpenAPI to TypeScript generator.
TypeScript
12
star
79

awesome

SCSS
12
star
80

windows-packager

Create a windows installer
Shell
12
star
81

iot-api-ui

Common React UI for iot-api-<js, python, etc.> example apps designed for InfluxDB client library tutorials.
TypeScript
12
star
82

promql

Go
11
star
83

yarpc

Yet Another RPC for Go
Go
11
star
84

iot-api-python

Python
11
star
85

influxdb-gds-connector

Google Data Studio Connector for InfluxDB.
JavaScript
11
star
86

object_store_rs

Rust
10
star
87

ansible-influxdb-enterprise

Ansible role for deploying InfluxDB Enterprise.
10
star
88

influxdb-sample-data

Sample time series data used to test InfluxDB
9
star
89

ingen

ingen is a tool for directly generating TSM data
Go
8
star
90

ansible-kapacitor

Official Kapacitor Ansible Role for Linux
Jinja
7
star
91

wlog

Simple log level based Go logger.
Go
7
star
92

iot-api-js

An example IoT app built with NextJS (NodeJS + React) and the InfluxDB API client library for Javascript.
JavaScript
7
star
93

influxdb-iox-client-go

InfluxDB/IOx Client for Go
Go
7
star
94

k8s-jsonnet-libs

Jsonnet Libs repo - mostly generated with jsonnet-libs/k8s project
Jsonnet
7
star
95

google-deployment-manager-influxdb-enterprise

GCP Deployment Manager templates for InfluxDB Enterprise.
HTML
6
star
96

jaeger-influxdb

Go
6
star
97

influxdb-action

A GitHub action for setting up and configuring InfluxDB and the InfluxDB Cloud CLI
Shell
6
star
98

influxdb-fsharp

A F# client library for InfluxDB, a time series database http://influxdb.com
F#
6
star
99

qprof

A tool for profiling the performance of InfluxQL queries
Go
6
star
100

influxdb-nodejs

InfluxDB client library for NodeJS
5
star