• This repository has been archived on 15/Apr/2022
  • Stars
    star
    160
  • Rank 234,703 (Top 5 %)
  • Language
    Scala
  • Created over 9 years ago
  • Updated about 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

reactive kafka client

kafka-rx Build Status Maven Central Join the chat at https://gitter.im/cjdev/kafka-rx

General Purpose Kafka Client that Just Behaves

Features

  • thin, reactive adapter around kafka's producer and consumer
  • per record, fine grained commits semantics
  • offset management to keep track of consumer positions

Consuming records:

kafka-rx provides a push alternative to kafka's pull-based stream

To connect to your zookeeper cluster and process a stream:

val consumer = new RxConsumer("zookeeper:2181", "consumer-group")

consumer.getRecordStream("cool-topic-(x|y|z)")
  .map(deserialize)
  .take(42 seconds)
  .foreach(println)

consumer.shutdown()

All of the standard rx transforms are available on the resulting stream.

Producing records

kafka-rx can also be used to produce kafka streams

tweetStream.map(parse)
  .groupBy(hashtag)
  .foreach { (tag, subStream) =>
    subStream.map(toProducerRecord)
      .saveToKafka(kafkaProducer)
      .foreach { savedRecord =>
        savedRecord.commit() // checkpoint position in the source stream
      }
  }

Check out the words-to-WORDS producer or the twitter-stream demo for a full working example.

Reliable Record Processing

kafka-rx was built with reliable record processing in mind

To support this, every kafka-rx record has a .commit() method which optionally takes a user provided merge function, giving the program an opportunity to reconcile offsets with zookeeper and manage delivery guarantees.

stream.buffer(23).foreach { bucket =>
  process(bucket)
  bucket.last.commit()
}

If you can afford possible gaps in record processing you can also use kafka's automatic offset commit behavior, but you are encouraged to manage commits yourself.

In general you should aim for idempotent processing, where it is no different to process a record once or many times. In addition, remember that records are delivered across different topic partitions in a non-deterministic order. If this is important you are encouraged to process each topic partition as an individual stream to ensure there is no interleaving.

val numStreams = numPartitions
val streams = consumer.getRecordStreams(topic, numStreams)
for (stream <- streams) yield Future { process(stream) }

Configuration

Wherever possible, kafka-rx delegates to kafka's internal configuration.

Use kafka's ConsumerConfig for configuring the consumer, and ProducerConfig for configuring your producer.

Including in your project

Currently kafka-rx is built against kafka 0.8.2.1 and scala 2.11, but should work fine with other similar versions.

From maven:

<dependency>
  <groupId>com.cj</groupId>
  <artifactId>kafka-rx_2.11</artifactId>
  <version>0.3.1</version>
</dependency>

From sbt:

libraryDependencies += "com.cj" %% "kafka-rx" % "0.3.1"

Videos & Examples

For more code and help getting started, see the examples.

Or, if videos are more your style:

stream processing with kafka-rx

Contributing

Have a question, improvement, or something you want to discuss?

Issues and pull requests welcome!

License

Eclipse Public License v.1 - Commission Junction 2015

More Repositories

1

monad-mock

A Haskell package that provides a monad transformer for mocking mtl-style typeclasses
Haskell
71
star
2

jshint-mojo

A simple, speedy maven plugin for running javascript code through jshint
JavaScript
40
star
3

interview-preparation

Preparation for the CJ Affiliate job interview process
Java
40
star
4

text-conversions

Safe conversions between Haskell textual types
Haskell
39
star
5

test-fixture

Testing with monadic side-effects
Haskell
33
star
6

routedux

A router that maps urls to redux actions and vice-versa
JavaScript
18
star
7

aws-acm-certificate-request-approver

Automatically approve ACM certificate requests for Route53-managed domains
JavaScript
18
star
8

visual-stack

CJ's UI Component Library
JavaScript
15
star
9

monad-persist

An mtl-style typeclass and transformer for persistent
Haskell
11
star
10

httpobjects

a watertight API for speaking http
Java
7
star
11

monad-io-adapter

A Haskell package that adapts between MonadIO and MonadBase IO
Haskell
7
star
12

cloud-seeder

a Haskell library for interacting with CloudFormation stacks
Haskell
6
star
13

haskell-fundamentals

A project for learning Haskell
Haskell
5
star
14

matryoshka

A repository of common, opinionated Cloudformation templates, designed to be used as nested stacks.
Shell
5
star
15

qunit-mojo

A maven plugin for quickly and easily creating, refactoring and running qunit tests
JavaScript
5
star
16

react-redux-saga-test-driven

JavaScript
4
star
17

aws-access

A simple tool for managing multiple AWS accounts
Common Lisp
2
star
18

genesis

Opinionated bootstrapping for Haskell web services
Haskell
2
star
19

rest-specs

A test-friendly mechanism for expressing RESTful http contracts
Java
2
star
20

squads-system

Tools for defining, visualizing and analyzing a spotify-style product engineering organziation.
HTML
1
star
21

serialization

Serialization library exposing a typeclass-like functional interface for Scala 2.11 and 2.12.
Scala
1
star