• Stars
    star
    121
  • Rank 293,924 (Top 6 %)
  • Language
    Clojure
  • License
    Other
  • Created almost 4 years ago
  • Updated 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A clojure kafka client with core.async integration.

Ketu

Build Status Coverage Status Clojars Project cljdoc badge

A Clojure Apache Kafka client with core.async api

[com.appsflyer/ketu "1.0.0"]

Features

  • Channels API: Take kafka data from a channel and send data to kafka through a channel.
  • Consumer Source: Polls records from kafka and puts them on a channel.
  • Producer Sink: Takes records from a channel and sends them to kafka.
  • Shapes: Transform the original objects of the java client to clojure data and back.
  • Simple Configuration: Friendly, validated configuration.

Minimal Example

Consume a name string from kafka and produce a greeting string for that name back into kafka, all through channels:

(ns example
  (:require [clojure.core.async :refer [chan close! <!! >!!]]
            [ketu.async.source :as source]
            [ketu.async.sink :as sink]))
  
(let [<names (chan 10)
      source-opts {:name "greeter-consumer"
                   :brokers "broker1:9092"
                   :topic "names"
                   :group-id "greeter"
                   :value-type :string
                   :shape :value}
      source (source/source <names source-opts)

      >greets (chan 10)
      sink-opts {:name "greeter-producer"
                 :brokers "broker2:9091"
                 :topic "greetings"
                 :value-type :string
                 :shape :value}
      sink (sink/sink >greets sink-opts)]

  ;; Consume a name and produce a greeting. You could also do this with e.g. clojure.core.async/pipeline.
  (->> (<!! <names)
       (str "Hi, ")
       (>!! >greets))

  ;; Close the source. It automatically closes the source channel `<names`.
  (source/stop! source)
  ;; Close the sink channel `>greets`. It causes the sink to close itself as a consequence.
  (close! >greets))

Configuration reference

Anything that is not documented is not supported and might change.

Read more about the default values used by the underlying Kafka clients v3.3.1 here

Note: int is used for brevity but can also mean long. Don't worry about it.

Common options (both source and sink accept these)

Key Type Req? Notes
:brokers string required Comma separated host:port values e.g "broker1:9092,broker2:9092"
:topic string required
:name string required Simple human-readable identifier, used in logs and thread names
:key-type :string,:byte-array optional Default :byte-array, used in configuring key serializer/deserializer
:value-type :string,:byte-array optional Default :byte-array, used in configuring value serializer/deserializer
:internal-config map optional A map of the underlying java client properties, for any extra lower level config

Consumer-source options

Key Type Req? Notes
:group-id string required
:shape :value:, [:vector <fields>],[:map <fields>], or an arity-1 function of ConsumerRecord optional If unspecified, channel will contain ConsumerRecord objects. Examples

Producer-sink options

Key Type Req? Notes
:shape :value, [:vector <fields>],[:map <fields>], or an arity-1 function of the input returning ProducerRecord optional If unspecified, you must put ProducerRecord objects on the channel. Examples
:compression-type "none" "gzip" "snappy" "lz4" "zstd" optional Default "none", values are same as "compression.type" of the java producer
:workers int optional Default 1, number of threads that take from the channel and invoke the internal producer

Data shapes

You don't have to deal with ConsumerRecord or ProducerRecord objects.
To get a clojure data structure with any of the ConsumerRecord fields, configure the consumer shape:

; Value only:
{:topic "names"
 :key-type :string
 :value-type :string
 :shape :value}
(<!! consumer-chan)
;=> "v"

; Vector:
{:shape [:vector :key :value :topic]}
(<!! consumer-chan)
;=> ["k" "v" "names"]

; Map
{:shape [:map :key :value :topic]}
(<!! consumer-chan)
;=> {:key "k", :value "v", :topic "names"}

Similarly, to put a clojure data structure on the producer channel:

; Value only:
{:key-type :string
 :value-type :string
 :shape :value}
(>!! producer-chan "v")

; Vector:
{:shape [:vector :key :value]}
(>!! producer-chan ["k" "v"])

; Vector with topic in each message:
{:shape [:vector :key :value :topic]}
(>!! producer-chan ["k1" "v1" "names"])
(>!! producer-chan ["k2" "v2" "events"])

Development & Contribution

We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests, or contact us at clojurians slack.

More Repositories

1

go-sundheit

A library built to provide support for defining service health for golang services. It allows you to register async health checks for your dependencies and the service itself, provides a health endpoint that exposes their status, and health metrics.
Go
549
star
2

donkey

Modern Clojure HTTP server and client built for ease of use and performance
Java
291
star
3

pronto

Clojure support for protocol buffers
Clojure
107
star
4

terra-crust

Terra Crust was created to allow Platform teams to expose Terraform as the main API to developers
Go
65
star
5

mate-clj

Clojure library for debugging core functions
Clojure
58
star
6

aerospike-clj

Clojure client for the Aerospike database.
Clojure
29
star
7

lein-protodeps

Leiningen plugin for consuming and compiling protobuf schemas
Clojure
26
star
8

kafka-mirror-tester

A tool to test the performance and correctness of kafka mirroring.
Go
25
star
9

wrk3

A golang generic benchmarking tool based mostly on Gil Tene's wrk2, only rewritten in go, and extended to allow running arbitrary protocols
Go
17
star
10

go-consul-resolver

A library of composable layers that is designed to provide client-side load balancing for (but not limited to) HTTP client-server communication, using Consul as the service discovery layer
Go
12
star
11

local-pvc-releaser

A Kubernetes controller designed to oversee Persistent Volume Claims (PVCs) associated with local storage on worker nodes. Its purpose is to enhance resilience and facilitate automatic recovery in the event of node termination.
Go
11
star
12

elb-log-replay

Replays an ELB log to a provided host
Go
7
star
13

unleash-client-clojure

Unleash Client SDK for Clojure
Clojure
7
star
14

srealip

Go package for securely extracting HTTP client's real public IP
Go
6
star
15

engineering-org-resources

List of talks, abstracts, speaker profiles, and resources from the AppsFlyer Engineering Bakery
5
star
16

go-sundheit-opentelemetry

Open telemetry metrics support for go-sundheit
3
star
17

AppsFlyerUnityPlugin

AppsFlyer Unity Plugin (iOS)
Perl
2
star
18

DevOpsDaysTLV2020Challenge

Programming challenge for DevOpsDays Tel Aviv 2020
2
star
19

appsflyer.github.io

Quick access to AppsFlyer's public & open source repos
JavaScript
1
star
20

af-sast-clojure

Clojure
1
star