• Stars
    star
    292
  • Rank 142,152 (Top 3 %)
  • Language
    Rust
  • License
    Apache License 2.0
  • Created almost 3 years ago
  • Updated 3 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A minimal Rust client for Apache Kafka

RSKafka

CircleCI Crates.io Documentation License

This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log.

It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of IOx.

This crate has:

  • No support for offset tracking, consumer groups, transactions, etc...
  • No built-in buffering, aggregation, linger timeouts, etc...
  • Independent write streams per partition

It will be a good fit for workloads that:

  • Perform offset tracking independently of Kafka
  • Read/Write reasonably sized payloads per-partition
  • Have a low number of high-throughput partitions 1

Usage

# async fn test() {
use rskafka::{
    client::{
        ClientBuilder,
        partition::{Compression, UnknownTopicHandling},
    },
    record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;

// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();

// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
    topic,
    2,      // partitions
    1,      // replication factor
    5_000,  // timeout (ms)
).await.unwrap();

// get a partition-bound client
let partition_client = client
    .partition_client(
        topic.to_owned(),
        0,  // partition
        UnknownTopicHandling::Retry,
     )
     .await
    .unwrap();

// produce some data
let record = Record {
    key: None,
    value: Some(b"hello kafka".to_vec()),
    headers: BTreeMap::from([
        ("foo".to_owned(), b"bar".to_vec()),
    ]),
    timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();

// consume data
let (records, high_watermark) = partition_client
    .fetch_records(
        0,  // offset
        1..1_000_000,  // min..max bytes
        1_000,  // max wait time
    )
   .await
   .unwrap();
# }

For more advanced production and consumption, see [crate::client::producer] and [crate::client::consumer].

Features

  • compression-gzip (default): Support compression and decompression of messages using gzip.
  • compression-lz4 (default): Support compression and decompression of messages using LZ4.
  • compression-snappy (default): Support compression and decompression of messages using Snappy.
  • compression-zstd (default): Support compression and decompression of messages using zstd.
  • full: Includes all stable features (compression-gzip, compression-lz4, compression-snappy, compression-zstd, transport-socks5, transport-tls).
  • transport-socks5: Allow transport via SOCKS5 proxy.
  • transport-tls: Allows TLS transport via rustls.
  • unstable-fuzzing: Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable feature / API!

Testing

Redpanda

To run integration tests against Redpanda, run:

$ docker-compose -f docker-compose-redpanda.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test

in another session.

Apache Kafka

To run integration tests against Apache Kafka, run:

$ docker-compose -f docker-compose-kafka.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 KAFKA_SASL_CONNECT=localhost:9097 cargo test

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other environment variables.

Using a SOCKS5 Proxy

To run the integration test via a SOCKS5 proxy, you need to set the environment variable SOCKS_PROXY. The following command requires a running proxy on the local machine.

$ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full

The SOCKS5 proxy will automatically be started by the docker compose files. Note that KAFKA_CONNECT was extended by addresses that are reachable via the proxy.

Java Interopt

To test if RSKafka can produce/consume records to/from the official Java client, you need to have Java installed and the TEST_JAVA_INTEROPT=1 environment variable set.

Fuzzing

RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have cargo-fuzz installed. Select one of the following fuzzers:

  • protocol_reader: Selects an API key and API version and then reads message frames and tries to decode the response object. The message frames are read w/o the length marker for more efficient fuzzing.
  • record_batch_body_reader: Reads the inner part of a record batch (w/o the prefix that contains length and CRC) and tries to decode it. In theory this is covered by protocol_reader as well but the length fields and CRC make it hard for the fuzzer to traverse this data structure.

Then run the fuzzer with:

$ cargo +nightly fuzz run protocol_reader
...

Let it running for how long you wish or until it finds a crash:

...
Failing input:

        fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Output of `std::fmt::Debug`:

        [0, 18, 0, 3, 0, 0, 0, 0, 71, 88, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 18, 18, 0, 164, 0, 164, 164, 164, 30, 164, 164, 0, 0, 0, 0, 63]

Reproduce with:

        cargo fuzz run protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Minimize test case with:

        cargo fuzz tmin protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Sadly the backtraces that you might get are not really helpful and you need a debugger to detect the exact source locations:

$ rust-lldb ./target/x86_64-unknown-linux-gnu/release/protocol_reader fuzz/artifacts/protocol_reader/crash-7b824dad6e26002e5488e8cc84ce16728222dcf5
...

(lldb) r
...
Process 177543 launched: '/home/mneumann/src/rskafka/target/x86_64-unknown-linux-gnu/release/protocol_reader' (x86_64)
INFO: Running with entropic power schedule (0xFF, 100).
INFO: Seed: 3549747846
...
==177543==ABORTING
(lldb) AddressSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.
Process 177543 stopped
...

(lldb) bt
* thread #1, name = 'protocol_reader', stop reason = AddressSanitizer detected: allocation-size-too-big
  * frame #0: 0x0000555556c04f20 protocol_reader`::AsanDie() at asan_rtl.cpp:45:7
    frame #1: 0x0000555556c1a33c protocol_reader`__sanitizer::Die() at sanitizer_termination.cpp:55:7
    frame #2: 0x0000555556c01471 protocol_reader`::~ScopedInErrorReport() at asan_report.cpp:190:7
    frame #3: 0x0000555556c021f4 protocol_reader`::ReportAllocationSizeTooBig() at asan_report.cpp:313:1
...

Then create a unit test and fix the bug.

For out-of-memory errors LLDB does not stop automatically. You can however set a breakpoint before starting the execution that hooks right into the place where it is about to exit:

(lldb) b fuzzer::PrintStackTrace()

Benchmarks

Install cargo-criterion, make sure you have some Kafka cluster running, and then you can run all benchmarks with:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features

If you find a benchmark that is too slow, you can may want to profile it. Get cargo-with, and perf, then run (here for the parallel/rskafka benchmark):

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
    bench --all-features --bench write_throughput -- \
    --bench --noplot parallel/rskafka

Have a look at the report:

$ perf report

License

Licensed under either of these:

Contributing

Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.

Footnotes

  1. Kafka's design makes it hard for any client to support the converse, as ultimately each partition is an independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest ↩

More Repositories

1

influxdb

Scalable datastore for metrics, events, and real-time analytics
Rust
28,401
star
2

telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
Go
14,568
star
3

kapacitor

Open source framework for processing, monitoring, and alerting on time series data
Go
2,310
star
4

influxdb-python

Python client for InfluxDB
Python
1,689
star
5

chronograf

Open source monitoring and visualization UI for the TICK stack
TypeScript
1,480
star
6

influxdb-java

Java client for InfluxDB
Java
1,178
star
7

influxdb-relay

Service to replicate InfluxDB data for high availability
Python
830
star
8

flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
FLUX
767
star
9

influxdb-client-python

InfluxDB 2.0 python client
Python
709
star
10

influxdb-client-go

InfluxDB 2 Go Client
Go
599
star
11

go-syslog

Blazing fast syslog parser
Go
478
star
12

sandbox

A sandbox for the full TICK stack
Shell
475
star
13

influxdb-client-java

InfluxDB 2 JVM Based Clients
Java
433
star
14

influxdb-php

influxdb-php: A PHP Client for InfluxDB, a time series database
PHP
431
star
15

influxdb-client-csharp

InfluxDB 2.x C# Client
C#
357
star
16

community-templates

InfluxDB Community Templates: Quickly collect & analyze time series data from a range of sources: Kubernetes, MySQL, Postgres, AWS, Nginx, Jenkins, and more.
Python
350
star
17

influxdb-client-js

InfluxDB 2.0 JavaScript client
TypeScript
326
star
18

influxdata-docker

Official docker images for the influxdata stack
Shell
314
star
19

influxdb-comparisons

Code for comparison write ups of InfluxDB and other solutions
Go
306
star
20

docs.influxdata.com-ARCHIVE

ARCHIVE - 1.x docs for InfluxData
Less
252
star
21

helm-charts

Official Helm Chart Repository for InfluxData Applications
Mustache
226
star
22

influxdb-rails

Ruby on Rails bindings to automatically write metrics into InfluxDB
Ruby
212
star
23

influxdb-csharp

A .NET library for efficiently sending points to InfluxDB 1.x
C#
198
star
24

influxdb1-client

The old clientv2 for InfluxDB 1.x
Go
190
star
25

giraffe

A foundation for visualizations in the InfluxDB UI
TypeScript
183
star
26

influxql

Package influxql implements a parser for the InfluxDB query language.
Go
168
star
27

influxdb-client-php

InfluxDB (v2+) Client Library for PHP
PHP
149
star
28

tdigest

An implementation of Ted Dunning's t-digest in Go.
Go
133
star
29

influx-stress

New tool for generating artificial load on InfluxDB
Go
118
star
30

ui

UI for InfluxDB
TypeScript
93
star
31

tick-charts

A repository for Helm Charts for the full TICK Stack
Smarty
90
star
32

pbjson

Auto-generate serde implementations for prost types
Rust
89
star
33

telegraf-operator

telegraf-operator helps monitor application on Kubernetes with Telegraf
Go
80
star
34

inch

An InfluxDB benchmarking tool.
Go
78
star
35

influxdata-operator

A k8s operator for InfluxDB
Go
76
star
36

docs-v2

InfluxData Documentation that covers InfluxDB Cloud, InfluxDB OSS 2.x, InfluxDB OSS 1.x, InfluxDB Enterprise, Telegraf, Chronograf, Kapacitor, and Flux.
SCSS
72
star
37

wirey

Manage local wireguard interfaces in a distributed system
Go
66
star
38

influx-cli

CLI for managing resources in InfluxDB v2
Go
63
star
39

influxdb-go

61
star
40

terraform-aws-influx

Reusable infrastructure modules for running TICK stack on AWS
HCL
51
star
41

influxdb2-sample-data

Sample data for InfluxDB 2.0
JavaScript
46
star
42

influxdb-observability

Go
46
star
43

influxdb-client-ruby

InfluxDB 2.0 Ruby Client
Ruby
45
star
44

clockface

UI Kit for building Chronograf
TypeScript
44
star
45

grade

Track Go benchmark performance over time by storing results in InfluxDB
Go
43
star
46

influxdb-r

R library for InfluxDB
R
43
star
47

nginx-influxdb-module

C
39
star
48

nifi-influxdb-bundle

InfluxDB Processors For Apache NiFi
Java
36
star
49

line-protocol

Go
36
star
50

tensorflow-influxdb

Jupyter Notebook
34
star
51

iot-center-flutter

InlfuxDB 2.0 dart client flutter demo
Dart
34
star
52

whisper-migrator

A tool for migrating data from Graphite Whisper files to InfluxDB TSM files (version 0.10.0).
Go
33
star
53

flightsql-dbapi

DB API 2 interface for Flight SQL with SQLAlchemy extras.
Python
32
star
54

kube-influxdb

Configuration to monitor Kubernetes with the TICK stack
Shell
31
star
55

k8s-kapacitor-autoscale

Demonstration of using Kapacitor to autoscale a k8s deployment
Go
30
star
56

terraform-aws-influxdb

Deploys InfluxDB Enterprise to AWS
HCL
29
star
57

catslack

Shell -> Slack the easy way
Go
28
star
58

flux-lsp

Implementation of Language Server Protocol for the flux language
Rust
27
star
59

influxdb-operator

The Kubernetes operator for InfluxDB and the TICK stack.
Go
27
star
60

influxdb3_core

InfluxData's core functionality for InfluxDB Edge and IOx
Rust
26
star
61

influxdb-client-swift

InfluxDB (v2+) Client Library for Swift
Swift
26
star
62

influxdb-client-dart

InfluxDB (v2+) Client Library for Dart and Flutter
Dart
25
star
63

kapacitor-course

25
star
64

influxdb-c

C
25
star
65

vsflux

Flux language extension for VSCode
TypeScript
25
star
66

grafana-flightsql-datasource

Grafana plugin for Flight SQL APIs.
TypeScript
25
star
67

ansible-chrony

A role to manage chrony on Linux systems
Ruby
24
star
68

influxdb-scala

Scala client for InfluxDB
Scala
22
star
69

cron

A fast, zero-allocation cron parser in ragel and golang
Go
21
star
70

influxdb-plugin-fluent

A buffered output plugin for Fluentd and InfluxDB 2
Ruby
21
star
71

terraform-google-influx

Reusable infrastructure modules for running TICK stack on GCP
Shell
20
star
72

iot-api-python

Python
18
star
73

openapi

An OpenAPI specification for influx (cloud/oss) apis.
Shell
17
star
74

influxdb-university

InfluxDB University
Python
16
star
75

influxdb-client-r

InfluxDB (v2+) Client R Package
R
14
star
76

kafka-connect-influxdb

InfluxDB 2 Connector for Kafka
Scala
13
star
77

cd-gitops-reference-architecture

Details of the CD/GitOps architecture in use at InfluxData
Shell
13
star
78

iot-api-ui

Common React UI for iot-api-<js, python, etc.> example apps designed for InfluxDB client library tutorials.
TypeScript
13
star
79

oats

An OpenAPI to TypeScript generator.
TypeScript
12
star
80

awesome

SCSS
12
star
81

windows-packager

Create a windows installer
Shell
12
star
82

influxdb-gds-connector

Google Data Studio Connector for InfluxDB.
JavaScript
11
star
83

promql

Go
11
star
84

object_store_rs

Rust
10
star
85

yarpc

Yet Another RPC for Go
Go
10
star
86

ansible-influxdb-enterprise

Ansible role for deploying InfluxDB Enterprise.
10
star
87

influxdb-sample-data

Sample time series data used to test InfluxDB
9
star
88

ingen

ingen is a tool for directly generating TSM data
Go
9
star
89

parquet-bloom-filter-analysis

Generate Parquet Files
Rust
8
star
90

ansible-kapacitor

Official Kapacitor Ansible Role for Linux
Jinja
7
star
91

wlog

Simple log level based Go logger.
Go
7
star
92

iot-api-js

An example IoT app built with NextJS (NodeJS + React) and the InfluxDB API client library for Javascript.
JavaScript
7
star
93

influxdb-iox-client-go

InfluxDB/IOx Client for Go
Go
7
star
94

influxdb-templates

This repo is a collection of dashboard templates used in the InfluxDB UI.
JavaScript
7
star
95

k8s-jsonnet-libs

Jsonnet Libs repo - mostly generated with jsonnet-libs/k8s project
Jsonnet
7
star
96

google-deployment-manager-influxdb-enterprise

GCP Deployment Manager templates for InfluxDB Enterprise.
HTML
6
star
97

jaeger-influxdb

Go
6
star
98

influxdb-action

A GitHub action for setting up and configuring InfluxDB and the InfluxDB Cloud CLI
Shell
6
star
99

influxdb-fsharp

A F# client library for InfluxDB, a time series database http://influxdb.com
F#
6
star
100

qprof

A tool for profiling the performance of InfluxQL queries
Go
6
star