• Stars
    star
    107
  • Rank 323,587 (Top 7 %)
  • Language
    Clojure
  • License
    Eclipse Public Li...
  • Created over 7 years ago
  • Updated almost 5 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Clojure client for Kafka

dvlopt.kafka

Clojars Project

This Apache Kafka client library is a Clojure wrapper for the official Java libraries. It strives for a balance between being idiomatic but not too clever. Users accustomed to the java libraries will be right at home, although it is not a prerequisite.

It provides namespaces for handling consumers, producers, and doing some administration. Also, we have the pleasure to announce that Kafka Streams is fully supported. The user can create a mock Kafka Streams application which do not need a running Kafka Cluster. This is perfect for learning the library as well as building applications at the REPL and seeing how they behave right away.

Ready for Kafka 2.1.0. Previously known as "Milena", the API of this iteration is considered stable unless something significantly changes in the Java libraries. In general, only the namespace relative to administration is at risk.

We try to provide good documention because many important concepts are poorly explained or confusing in the Java libraries. Feel free to provide feedback, contribute, and let us know if something is not clear.

Usage

First, read the fairly detailed API. Specially if you are not used to the java libraries and their various concepts.

Alternatively, documentation can be generated by running :

$ lein codox
$ cd doc/auto

Then, have a look at the following examples. Just so we are prepared, let us require all namespaces involved.

You can clone this repo and start a REPL, everything needed is imported in the dev namespace.

;; For Kafka :

(require '[dvlopt.kafka       :as K]
         '[dvlopt.kafka.admin :as K.admin]
         '[dvlopt.kafka.in    :as K.in]
         '[dvlopt.kafka.out   :as K.out])


;; For Kafka Streams :

(require '[dvlopt.kstreams          :as KS]
         '[dvlopt.kstreams.mock     :as KS.mock]
         '[dvlopt.kstreams.topology :as KS.topology]
         '[dvlopt.kstreams.ctx      :as KS.ctx]
         '[dvlopt.kstreams.store    :as KS.store]
         '[dvlopt.kstreams.builder  :as KS.builder]
         '[dvlopt.kstreams.stream   :as KS.stream]
         '[dvlopt.kstreams.table    :as KS.table])

Administration

Creating topic "my-topic" using the dvlopt.kafka.admin namespace.

(with-open [admin (K.admin/admin)]
  (K.admin/create-topics admin
                         {"my-topic" {::K.admin/number-of-partitions 4
                                      ::K.admin/replication-factor   3
                                      ::K.admin/configuration        {"cleanup.policy" "compact"}}})
  (println "Existing topics : " (keys @(K.admin/topics admin
                                                       {::K/internal? false}))))

Producing records

Sending 25 records to "my-topic" using the dvlopt.kafka.out namespace.

(with-open [producer (K.out/producer {::K/nodes             [["localhost" 9092]]
                                      ::K/serializer.key    (K/serializers :long)
                                      ::K/serializer.value  :long
                                      ::K.out/configuration {"client.id" "my-producer"}})]
  (doseq [i (range 25)]
    (K.out/send producer
                {::K/topic "my-topic"
                 ::K/key   i
                 ::K/value (* 100 i)}
                (fn callback [exception metadata]
                  (println (format "Record %d : %s"
                                   i
                                   (if exception
                                     "FAILURE"
                                     "SUCCESS")))))))

Consuming records

Reading a batch of records from "my-topic" and manually commit the offset of where we are using the dvlopt.kafka.in namespace.

(with-open [consumer (K.in/consumer {::K/nodes              [["localhost" 9092]]
                                     ::K/deserializer.key   :long
                                     ::K/deserializer.value :long
                                     ::K.in/configuration   {"auto.offset.reset" "earliest"
                                                             "enable.auto.commit" false
                                                             "group.id"           "my-group"}})]
  (K.in/register-for consumer
                     ["my-topic"])
  (doseq [record (K.in/poll consumer
                            {::K/timeout [5 :seconds]})]
    (println (format "Record %d @%d - Key = %d, Value = %d"
                     (::K/offset record)
                     (::K/timestamp record)
                     (::K/key record)
                     (::K/value record))))
  (K.in/commit-offsets consumer))

Kafka Streams low-level API

Useless but simple example of grouping records in two categories based on their key, "odd" and "even", and continuously summing values in each category.

First, we create a topology. We then add a source node fetching records from "my-input-topic". Those records are processed by "my-processor" which needs "my-store" in order to persist the current sum for each category. Finally, a sink node receives processed records and sends them to "my-output-topic".

For testing the topology, we create a mock Kafka Streams application which emulate a Kafka cluster. This is perfect for learning, testing, and fiddling at the REPL. We pipe a few records to see how it is behaving. Of course, you could test it with a running cluster.

(def topology
     (-> (KS.topology/topology)
         (KS.topology/add-source "my-source"
                                 ["my-input-topic"]
                                 {::K/deserializer.key   :long
                                  ::K/deserializer.value :long
                                  ::KS/offset-reset      :earliest})
         (KS.topology/add-processor "my-processor"
                                    ["my-source"]
                                    {::KS/processor.init      (fn [ctx]
                                                                (KS.ctx/kv-store ctx
                                                                                 "my-store"))
                                     ::KS/processor.on-record (fn [ctx my-store record]
                                                                (println "Processing record : " record)
                                                                (let [key' (if (odd? (::K/key record))
                                                                             "odd"
                                                                             "even")
                                                                      sum  (+ (or (KS.store/kv-get my-store
                                                                                                   key')
                                                                                  0)
                                                                              (::K/value record))]
                                                                  (KS.store/kv-put my-store
                                                                                   key'
                                                                                   sum)
                                                                  (KS.ctx/forward ctx
                                                                                  {::K/key   key'
                                                                                   ::K/value sum})))})
         (KS.topology/add-store ["my-processor"]
                                {::K/deserializer.key   :string
                                 ::K/deserializer.value :long
                                 ::K/serializer.key     :string
                                 ::K/serializer.value   :long
                                 ::KS.store/name        "my-store"
                                 ::KS.store/type        :kv.in-memory
                                 ::KS.store/cache?      false})
         (KS.topology/add-sink "my-sink"
                               ["my-processor"]
                               "my-output-topic"
                               {::K/serializer.key   :string
                                ::K/serializer.value :long})))


;; This will run without a Kafka Cluster

(def mock-app
     (KS.mock/mock-app "KS-low-level-test"
                       topology))


;; We pipe a few records into our fake runtime

(dotimes [i 25]
  (KS.mock/pipe-record mock-app
                       {::K/topic "my-input-topic"
                        ::K/partition 0
                        ::K/offset    i
                        ::K/key       ((K/serializers :long) i)
                        ::K/value     ((K/serializers :long) i)}))


;; Run this a few times to se the result

(KS.mock/read-record mock-app
       	             "my-output-topic"
                     {::K/deserializer.key   :string
                      ::K/deserializer.value :long})


;; If you wish to use a real Kafka cluster instead

(comment
    (def app
         (KS/app "my-app-1"
                 topology
                 {::K/nodes         [["localhost" 9092]]
                  ::KS/on-exception (fn [exception _thread]
                                      (println "Exception : " exception))}))


    (KS/start app))

Kafka Streams high-level API

Same example as previously but in a more functional style. In addition, values are aggregated in 2 seconds windows (it is best to run the producer example a few times first).

First, we need a builder. Then, we add a stream fetching records from "my-input-topic". Records are then grouped into our categories and then each category is windowed in 2 seconds windows. Each window is then reduced for computing a sum. We are now ready and we can build a topology out of our builder. It is always a good idea to have a look at the description of the built topology to have a better idea of what is created by the high-level API.

A window store is then retrieved and each window for each category is printed.

(def topology
     (let [builder (KS.builder/builder)]
       (-> builder
           (KS.builder/stream ["my-input-topic"]
                              {::K/deserializer.key   :long
                               ::K/deserializer.value :long
                               ::KS/offset-reset      :earliest})
           (KS.stream/group-by (fn [k v]
                                 (println (format "Grouping [%d %d]"
                                                  k
                                                  v))
                                 (if (odd? k)
                                   "odd"
                                   "even"))
                               {::K/deserializer.key   :string
                                ::K/deserializer.value :long
                                ::K/serializer.key     :string
                                ::K/serializer.value   :long})
           (KS.stream/window [2 :seconds])
           (KS.stream/reduce-windows (fn reduce-window [sum k v]
                                       (println (format "Adding value %d to sum %s for key '%s'"
                                                        v
                                                        sum
                                                        k))
                                       (+ sum
                                          v))
                                     (fn seed []
                                       0)
                                     {::K/deserializer.key   :string
                                      ::K/deserializer.value :long
                                      ::K/serializer.key     :string
                                      ::K/serializer.value   :long
                                      ::KS.store/name        "my-store"
                                      ::KS.store/type        :kv.in-memory
                                      ::KS.store/cache?      false}))
       (KS.topology/topology builder)))


;; Always interesting to see what is the actual topology.

(KS.topology/describe topology)


;; Just like in the previous example, we create a fake runtime and pipe a few records.

(def mock-app
     (KS.mock/mock-app "KS-high-level-test"
                       topology))


(dotimes [i 25]
  (KS.mock/pipe-record mock-app
                       {::K/topic "my-input-topic"
                        ::K/partition 0
                        ::K/offset    i
                        ::K/key       ((K/serializers :long) i)
                        ::K/value     ((K/serializers :long) i)}))


;; And here is how we can read from a store.

(def my-store
     (KS.mock/window-store mock-app
             	           "my-store"))


(with-open [cursor (KS.store/ws-multi-range my-store)]
  (doseq [db-record (iterator-seq cursor)]
    (println (format "Aggregated key = '%s', time windows = [%d;%d), value = %d"
                     (::K/key db-record)
                     (::K/timestamp.from db-record)
                     (::K/timestamp.to db-record)
                     (::K/value db-record)))))

Testing

It is possible to create mock consumers (dvlopt.kafka.in.mock namespace) and producers (dvlopt.kafka.out.mock) for testing purposes. They can use the normal API without needing to contact a Kafka cluster but not everything behave strictly the same (cf. documentation).

Kafka Streams applications can get fairly complex. Providing a fake runtime, as in the examples, is a great solution for unit tests. While building a new application at the REPL, you can gradually inspect if everything seem to work as expected. Hence, this is a valuable tool.

License

Copyright Β© 2017 Adam Helinski

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

More Repositories

1

wasm.cljc

Spec compliant WebAssembly compiler, decompiler, and generator
Clojure
244
star
2

binf.cljc

Handling binary formats in all shapes and forms
Clojure
131
star
3

dsim.cljc

Idiomatic and purely functional discrete event-simulation
Clojure
119
star
4

clojure-of-things

Documentation about how to run Clojure on the Raspberry Pi
61
star
5

interval.cljc

Immutable interval trees and utilities
Clojure
60
star
6

fdat.cljc

Function serialization between Clojure processes and dialects
Clojure
55
star
7

linux.gpio.clj

Use the standard Linux GPIO API from Clojure JVM
Clojure
35
star
8

maestro.clj

Zen way for managing a Clojure/script monorepo
Clojure
32
star
9

timer.cljs

Scheduling async operations in Clojurescript
Clojure
28
star
10

void.cljc

About void and absence of information
Clojure
19
star
11

linux-gpio.java

Use the standard Linux GPIO api from Java
Java
17
star
12

canvas.cljs

Accessing the Canvas API
Clojure
16
star
13

rktree.cljc

Trees where leaves are located both in time and space
Clojure
15
star
14

medium.cljc

Utilities for targeting different compilation environments in Clojure/script
Clojure
14
star
15

linux.i2c.clj

Use the standard Linux I2C API from Clojure JVM
Clojure
12
star
16

linux-i2c.java

Use the standard Linux I2C API from the JVM
Java
11
star
17

templ-lib.cljc

Template for CLJC libraries
Clojure
11
star
18

mprop.cljc

Multiplexing `test.check` properties for thorough generative testing
Clojure
7
star
19

linux-epoll.java

Use Linux's epoll from java
Java
6
star
20

mqtt.clj

Async MQTT 3.x clojure client
Clojure
6
star
21

rxtx.clj

Serial IO based on RXTX from Clojure JVM
Clojure
3
star
22

coload.cljc

Loading Clojure in sync with Clojurescript during dev
Clojure
3
star
23

byte_buffer.cpp

Easily and safely read/write any type to byte arrays
C++
2
star
24

mbus.clj

Using the Meter-Bus protocol from Clojure JVM
Clojure
2
star
25

linux.spi.clj

Clojure library for talking to SPI devices from Linux
Clojure
2
star
26

sysrun.clj

Miscellaneous system utilities for Clojure JVM
Clojure
2
star
27

fcss.cljc

Minifying Garden classes showing up in advanced CLJS builds
Clojure
2
star
28

linux.i2c.mcp342x.clj

Talking to the MCP342x family of ADC via I2C from Clojure JVM
Clojure
2
star
29

linux-common.java

Miscellaneous JNA utilities related to Linux
Java
2
star
30

htm.clj

Clojure implementation of Hierarchical Temporal Memory
Clojure
1
star
31

dvlopt-cljs

Dvlopt lein template for clojurescript
Clojure
1
star
32

linux.i2c.horter-i2hae.clj

A/D conversion via I2C with Horter I2HAE from Clojure JVM
Clojure
1
star
33

linux.i2c.bme280.clj

Talking to BME280 sensors via I2C from Clojure JVM
Clojure
1
star
34

linux-io.java

Basic Linux IO utilities for java through JNA
Java
1
star
35

utimbre.clj

Miscellaneous utilities for Timbre
Clojure
1
star
36

pi4clj

Clojure library for IO on the Raspberry Pi
Clojure
1
star
37

ex.clj

Java exceptions as clojure data
Clojure
1
star
38

fn.cpp

Pass around c++ fns and methods, get performance
C++
1
star
39

utimbre.appenders.kafka.clj

Timbre appender for Apache Kafka
Clojure
1
star
40

fulcro.initLocalState

Repro case demonstrating how :initLocalState seem to misbehave in Fulcro 3
Clojure
1
star