• Stars
    star
    211
  • Rank 186,867 (Top 4 %)
  • Language
    Clojure
  • License
    Eclipse Public Li...
  • Created almost 13 years ago
  • Updated over 8 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Wrapper to the Java API for interacting with Kafka

clj-kafka

Clojure library for Kafka.

Current build status: Build Status

Development is against the 0.8 release of Kafka.

Installing

Add the following to your Leiningen project.clj:

latest clj-kafka version

Usage

Producer

Discovery of Kafka brokers from Zookeeper:

(brokers {"zookeeper.connect" "127.0.0.1:2181"})
;; ({:host "localhost", :jmx_port -1, :port 9999, :version 1})
(use 'clj-kafka.producer)

(def p (producer {"metadata.broker.list" "localhost:9999"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(send-message p (message "test" (.getBytes "this is my message")))

See: clj-kafka.producer

New Producer

As of 0.3.1 we also support the "new" pure-Java producer. The interface is superficially similar but we've chosen to keep names close to their Java equivalent.

(use 'clj-kafka.new.producer)

(with-open [p (producer {"bootstrap.servers" "127.0.0.1:9092"} (byte-array-serializer) (byte-array-serializer))]
  (send p (record "test-topic" (.getBytes "hello world!"))))

One key difference is that sending is asynchronous by default. send returns a Future immediately. If you want synchronous behaviour you can deref it right away:

(with-open [p (producer {"bootstrap.servers" "127.0.0.1:9092"} (byte-array-serializer) (byte-array-serializer))]
  @(send p (record "test-topic" (.getBytes "hello world!"))))

See: clj-kafka.new.producer

Zookeeper Consumer

The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.

(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)

(def config {"zookeeper.connect" "localhost:2182"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(with-resource [c (consumer config)]
  shutdown
  (take 2 (messages c "test")))

The messages function provides the easy-case of single topic and single thread consumption. This is a stricter form of the same API that was in earlier releases. messages is built on two key other functions: create-message-streams and stream-seq that create the underlying streams and turn them into lazy sequences respectively; this change makes it easier to consume across multiple partitions and threads.

See: clj-kafka.consumer.zk

Usage with transducers

An alternate way of consuming is using create-message-stream or create-message-streams to obtain KafkaStream instances. These are Iterable which means, amongst other things, that they work nicely with transducers.

Continuing previous example:

;; hypothetical transformation
(def xform (comp (map deserialize-message)
                 (filter production-traffic)
                 (map parse-user-agent-string)))

(with-resource [c (consumer config)]
  shutdown
  (let [stream (create-message-stream c "test-topic")]
    (run! write-to-database! (eduction xform stream))))

Administration Operations

There is support the following simple administration operations:

  • checking if a topic exists
  • creating a topic
  • deleting a topic (requires that the Kafka cluster supports deletion and has delete.topic.enable set to true)
  • retrieving topic configuration
  • changing topic configuration
(require '[clj-kafka.admin :as admin])

(with-open [zk (admin/zk-client "127.0.0.1:2181")]
  (if-not (admin/topic-exists? zk "test-topic")
    (admin/create-topic zk "test-topic"
                        {:partitions 3
                         :replication-factor 1
                         :config {"cleanup.policy" "compact"}})))

See: clj-kafka.admin

Kafka Offset Manager Operations

There is support the following simple Kafka offset management operations:

  • fetch the current offsets of a consumer group
  • reset the current offsets of a consumer group
(require '[clj-kafka.offset :as offset])

(fetch-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer")
(reset-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer" :earliest)
(reset-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer" :latest)

See: clj-kafka.admin

License

Copyright © 2013 Paul Ingles

Distributed under the Eclipse Public License, the same as Clojure.

Thanks

YourKit is kindly supporting this open source project with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit's leading software products: YourKit Java Profiler and YourKit .NET Profiler.

More Repositories

1

redshift-r

Small R package for accessing Redshift
R
68
star
2

bandit

Multi-armed bandit algorithms in Clojure
Clojure
62
star
3

clj-hector

Simple Cassandra client for Clojure
Clojure
42
star
4

clj-esper

Thin layer around Esper event stream processing for Clojure
Clojure
40
star
5

curator

Clojurified Apache Curator
Clojure
22
star
6

googlecloud

Google Cloud clients in Clojure
Clojure
21
star
7

machine-learning-in-action

Playing around converting the code samples from Machine Learning in Action to Clojure
Clojure
15
star
8

c-programming-a-modern-approach

C
15
star
9

cascading.cassandra

Cascading Tap and Scheme for Cassandra
Java
14
star
10

clj-detector

Clojure interface to UADetector for parsing User-Agent strings
Clojure
12
star
11

kafka-riemann-reporter

Report Kafka metrics to Riemann
Java
12
star
12

gclouj

Clojure-friendly gcloud-java
Clojure
12
star
13

mahout-sample

Playing with Mahout in Clojure
Clojure
12
star
14

clj-quartz

Quartz job scheduling
Clojure
9
star
15

clj-hdfs

Small lib to make it easier to work with data on Hadoop's Distributed FileSystem (HDFS)
Clojure
9
star
16

go-metrics-riemann

Report go-metrics data to Riemann
Go
8
star
17

homebrew-psqlodbc

Postgres' ODBC library
Ruby
8
star
18

paradigms-of-artificial-intelligence-clojure

Examples from the Paradigms of AI book by Peter Norvig
Clojure
8
star
19

clj-aws

Clojure wrapper for the Amazon AWS SDK
Clojure
8
star
20

redirectly-realtime

realtime fun with clojure, esper and rabbitmq
Clojure
7
star
21

cascading.protobuf

Protocol Buffer Scheme for Cascading
Java
5
star
22

node-moonshado

Send SMSs from Node.js through the Moonshado gateway
JavaScript
4
star
23

clj-hbase

Clojure client for HBase. Evolving as I work through the O'Reilly HBase book.
Clojure
4
star
24

cascalog-cassandra

Cassandra Tap and Scheme for use with Cascalog
Clojure
4
star
25

rack-scheme-detect

Ruby
4
star
26

clj-mixpanel

Clojure interface for sending events to Mixpanel
Clojure
4
star
27

clj-emr

Clojure wrapper to Amazon Elastic MapReduce API
Clojure
3
star
28

sicp-examples

Working through SICP
Clojure
3
star
29

paradigms-of-artificial-intelligence

Start working through code examples of Paradigms of Artificial Intelligence in Clojure
Clojure
3
star
30

ring-canonical-domain

Ring middleware to ease 301'ing to a canonical domain
Clojure
3
star
31

curator-example

Sample Clojure application using the Curator library
Clojure
3
star
32

cascading-clojure-sample

Playground of examples with cascading-clojure. Uses my fork of cascading-clojure for tagged input formats
Clojure
2
star
33

machine-learning-for-hackers

Code written whilst working through the book
2
star
34

clj-stream-stats

Statistical functions useful when processing unbounded streams of numbers
Clojure
2
star
35

go-metrics-googlecloud

Report metrics to Google Cloud Monitoring
Go
2
star
36

cascading.neo4j

Neo4j tap
Java
2
star
37

autotools-samples

Examples from the Autotools book by J. Calcote, published by No Starch Press
2
star
38

crawly

Sample 80legs crawler app in Clojure
Java
2
star
39

docker-etcdctl

2
star
40

learn-c-the-hard-way

1
star
41

plumbing

General purpose functions for use throughout clj-sys projects
Clojure
1
star
42

node-native-examples

Playing around with building native extensions for node.js. Most of these are taken from other samples around the web. I promise I'll add URLs to sources next.
C++
1
star
43

feedback

Clojure library for Feedback Control
Clojure
1
star
44

presentations

presentations
1
star
45

introduction-to-algorithms

Code from the Cormen et al book
1
star
46

hello-esper-clj

A set of simple examples for the esper-clj library
Clojure
1
star
47

pingles.github.com

GitHub Homepage
Ruby
1
star
48

crdt-go

CRDTs in Go
Go
1
star
49

cpp-concurrency

C++
1
star
50

go-postcode-search

Playing around with search trees in Go
Go
1
star
51

libev-sample

Messing around with libeio and libev
1
star
52

try

*nix command to repeatedly run a command until it "succeeds"
C
1
star
53

c-and-cpp-adventures

Me playing around with various C and C++ libs
C
1
star
54

lein-beanstalk

Easy deploy apps to Amazon EC2 Elastic Beanstalk with leiningen
1
star
55

algorithms-unplugged

Implementations of algorithms unplugged book in Clojure
Clojure
1
star
56

programming-challenges

answers to problems in programming challenges book
C
1
star
57

the-linux-programming-interface

1
star
58

zilch

ZeroMQ Clojure Library
Clojure
1
star
59

crdt-example

Example Go app to play with CRDTs
Go
1
star
60

clj-bdb

Clojure wrapper to Berkeley DB
Clojure
1
star
61

simple-ring

1
star
62

cpp-primer

Code from the C++ Primer book
1
star
63

clojure-syslog4j-sample

1
star
64

zeromq-sample

Clojure
1
star
65

glock

Go-implemented Global Lock
Go
1
star
66

r-in-action

Notes and work throughs of topics from R in Action book
1
star
67

cloudstorage

1
star
68

riak-coreos

Shell
1
star
69

syslog-zeromq-bridge

Playing with building a syslog <--> zeromq bridge with node
1
star
70

consul-coreos

Bootstraps a Consul cluster on CoreOS using fleet and etcd
Makefile
1
star
71

go-playground

Early experiments with Go
Go
1
star
72

cascading-clojure-cassandra

Playground using cascading clojure with cassandra
Clojure
1
star
73

mandy

Easy Peasy Map/Reduce
Ruby
1
star