• This repository has been archived on 25/May/2023
  • Stars
    star
    191
  • Rank 202,877 (Top 4 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created almost 7 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Thin Scala wrapper around Kafka Streams Java API

Note: Scala API for Kafka Streams have been accepted for inclusion in Apache Kafka. We have been working with the Kafka team since the last couple of months working towards meeting the standards and guidelines for this activity. Lightbend and Alexis Seigneurin have contributed this library (with some changes) to the Kafka community. This is already available on Apache Kafka trunk and will be included in the upcoming release of Kafka. Hence it does not make much sense to update this project on a regular basis. For some time however, we will continue to provide support for fixing bugs only.

A Thin Scala Wrapper Around the Kafka Streams Java API

Build Status

The library wraps Java APIs in Scala thereby providing:

  1. much better type inference in Scala
  2. less boilerplate in application code
  3. the usual builder-style composition that developers get with the original Java API
  4. complete compile time type safety

The design of the library was inspired by the work started by Alexis Seigneurin in this repository.

Quick Start

kafka-streams-scala is published and cross-built for Scala 2.11, and 2.12, so you can just add the following to your build:

val kafka_streams_scala_version = "0.2.1"

libraryDependencies ++= Seq("com.lightbend" %%
  "kafka-streams-scala" % kafka_streams_scala_version)

Note: kafka-streams-scala supports onwards Kafka Streams 1.0.0.

The API docs for kafka-streams-scala is available here for Scala 2.12 and here for Scala 2.11.

Running the Tests

The library comes with an embedded Kafka server. To run the tests, simply run sbt testOnly and all tests will run on the local embedded server.

The embedded server is started and stopped for every test and takes quite a bit of resources. Hence it's recommended that you allocate more heap space to sbt when running the tests. e.g. sbt -mem 2000.

$ sbt -mem 2000
> +clean
> +test

Type Inference and Composition

Here's a sample code fragment using the Scala wrapper library. Compare this with the Scala code from the same example in Confluent's repository.

// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableS[String, Long] = userClicksStream

  // Join the stream against the table.
  .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))

  // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
  .map((_, regionWithClicks) => regionWithClicks)

  // Compute the total per region by summing the individual click counts per region.
  .groupByKey
  .reduce(_ + _)

Implicit Serdes

One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library uses the power of Scala implicits towards this end. The library makes some decisions that help implement more succinct serdes in a type safe manner:

  1. No use of configuration based default serdes. Java APIs allow the user to define default key and value serdes as part of the configuration. This configuration, being implemented as java.util.Properties is type-unsafe and hence can result in runtime errors in case the user misses any of the serdes to be specified or plugs in an incorrect serde. kafka-streams-scala makes this completely type-safe by allowing all serdes to be specified through Scala implicits.
  2. The library offers implicit conversions from serdes to Serialized, Produced, Consumed or Joined. Hence as a user you just have to pass in the implicit serde and all conversions to Serialized, Produced, Consumed or Joined will be taken care of automatically.

Default Serdes

The library offers a module that contains all the default serdes for the primitives. Importing the object will bring in scope all such primitives and helps reduce implicit hell.

object DefaultSerdes {
  implicit val stringSerde: Serde[String] = Serdes.String()
  implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
  implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
  implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
  implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
  implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
  implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
}

Compile time typesafe

Not only the serdes, but DefaultSerdes also brings into scope implicit Serialized, Produced, Consumed and Joined instances. So all APIs that accept Serialized, Produced, Consumed or Joined will get these instances automatically with an import DefaultSerdes._.

Just one import of DefaultSerdes._ and the following code does not need a bit of Serialized, Produced, Consumed or Joined to be specified explicitly or through the default config. And the best part is that for any missing instances of these you get a compilation error. ..

import DefaultSerdes._

val clicksPerRegion: KTableS[String, Long] =
  userClicksStream

  // Join the stream against the table.
  .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))

  // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
  .map((_, regionWithClicks) => regionWithClicks)

  // Compute the total per region by summing the individual click counts per region.
  .groupByKey
  .reduce(_ + _)

  // Write the (continuously updating) results to the output topic.
  clicksPerRegion.toStream.to(outputTopic)

More Repositories

1

config

configuration library for JVM languages using HOCON files
Java
6,125
star
2

cloudflow

Cloudflow enables users to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes.
Scala
322
star
3

paradox

Markdown documentation
Scala
245
star
4

kafka-with-akka-streams-kafka-streams-tutorial

Code samples for the Lightbend tutorial on writing microservices with Akka Streams, Kafka Streams, and Kafka
Scala
213
star
5

kafka-streams-query

Library offering http based query on top of Kafka Streams Interactive Queries
Scala
70
star
6

akka-cluster-operator

Run Akka Cluster applications in Kubernetes.
Go
69
star
7

model-serving-tutorial

Code and presentation for Strata Model Serving tutorial
Scala
69
star
8

ssl-config

SSL configuration logic, extracted from Play's WS (for use in Akka et al).
Scala
67
star
9

reactive-streams-utils

Java
64
star
10

service-locator-dns

This project is deprecated
Scala
61
star
11

genjavadoc

A compiler plugin for generating doc’able Java source from Scala source
Scala
58
star
12

kalix-jvm-sdk

Java and Scala SDKs for Kalix
Scala
58
star
13

kubeflow-recommender

Kubeflow example of machine learning/model serving
Jupyter Notebook
35
star
14

reactive-integration-examples

Examples for demonstrating the concept of Reactive Integration
Java
32
star
15

flink-k8s-operator

An example of building kubernetes operator (Flink) using Abstract operator's framework
Scala
26
star
16

sbt-google-cloud-storage

A SBT resolver and publisher for Google Cloud Storage
Scala
23
star
17

reactive-cli

Scala
23
star
18

kalix-javascript-sdk

JavaScript and TypeScript SDKs for Kalix
TypeScript
22
star
19

reactive-lib

Scala
18
star
20

sbt-reactive-app

Scala
18
star
21

mesos-spark-integration-tests

Mesos Integration Tests on Docker/Ec2
Shell
16
star
22

spark-history-server-docker

Docker image for Spark history server on Kubernetes
Shell
15
star
23

fdp-sample-applications

All sample applications for Fast Data Platform
Scala
15
star
24

pipelines-examples

Pipelines Example Applications
Scala
15
star
25

cinnamon-ui

Lightbend Cinnamon UI
14
star
26

microprofile-reactive-streams

Java
13
star
27

microprofile-reactive-messaging

Lightbend's implementation of the MicroProfile Reactive Messaging spec
Java
13
star
28

console-charts

Lightbend Console Helm Charts
Go
11
star
29

flink-operator

Helm Chart for lyft/flinkk8soperator
Smarty
11
star
30

sbt-whitesource

An sbt plugin to keep your WhiteSource project up to date
Scala
10
star
31

akka-grpc-ddata-shopping-cart

Scala
10
star
32

pipelines-model-serving

Implementation of Model serving in pipelines
Scala
8
star
33

akka-grpc-hands-on

Akka gRPC Hands-On
Scala
7
star
34

lb-demos

Demo applications showcasing the Lightbend Platform
Java
7
star
35

fdp-speculative-model-serving

Experimental implementation of speculative model serving
Scala
6
star
36

lightbend-markdown

Support for writing Lagom documentation in Markdown
Scala
5
star
37

telemetry-samples

Lightbend Telemetry Samples
Java
5
star
38

vscode-kalix-tools

A VS Code extension for developers building stateful serverless applications on Kalix
TypeScript
5
star
39

sbt-bill-of-materials

Create 'Bill of Materials' (BOM) POM files from sbt for consumption in Maven and Gradle.
Scala
5
star
40

RSocketCloudflow

RSocket Ingress for cloudflow
Scala
5
star
41

sbt-paradox-project-info

A Paradox directive to include standardised project information into the generated documentation
Scala
5
star
42

sbt-paradox-apidoc

Apidoc (javadoc and scaladoc) support for the project documentation tool paradox
Scala
4
star
43

spark-streaming-testbed

Set of applications to test the performances of Spark Streaming
Scala
4
star
44

KnativeSamples

Collection of Knative examples using Scala
Scala
4
star
45

cloudflow-contrib

Community Supported Cloudflow integrations
Scala
3
star
46

seldoncloudflow

Seldon Cloudflow Integration
Jupyter Notebook
3
star
47

akka-operator-helm

Helm charts for Akka platform operator
Smarty
3
star
48

lightbend-orchestration-docs

Documentation for Lightbend Orchestration
Scala
3
star
49

sbt-paradox-dependencies

A Paradox directive to show module's transitive dependencies in the Paradox generated documentation
Scala
3
star
50

fdp-dynamically-controlled-streams

Implementation of dynamically controlled streams patterns
Scala
3
star
51

ML-metadata-tutorial

Presentation and exercise code for ML metadata tutorial
Scala
3
star
52

antora-ui-lightbend-cloud-theme

CSS
2
star
53

akka-projection-grpc-benchmark

2
star
54

console-akka-cluster-example

Example for setting up an Akka Cluster app to run in Kubernetes with Lightbend Console monitoring
Scala
2
star
55

cloudflow-installer

Installation Scripts for Cloudflow
Shell
2
star
56

kalix-trial-scala-shoppingcart

Scala
2
star
57

kalix-action

Run Kalix CLI commands in GitHub workflows
Dockerfile
2
star
58

akka-cluster-orchestration-example

Scala
2
star
59

deckhand

sbt companion for Kubernetes and OpenShift
Scala
2
star
60

reactive-app-maven-plugin

Java
2
star
61

akka-cloud-platform-deploy

Provisioning solution for Akka Cloud Platform to supported cloud providers
TypeScript
2
star
62

homebrew-brew

Homebrew is a package manager for macOS which provides easy installation and update management of additional software. This Tap (repository) contains the Formulae for tools that Lightbend offers.
Ruby
2
star
63

kalix-value-entity.g8

This is an sbt template for setting up a Scala Kalix project with a Value Entity.
Scala
2
star
64

play-openshift-example

Scala
1
star
65

reactive-sandbox

Shell
1
star
66

reactive-cli-build

Shell
1
star
67

sbt-paradox-lightbend-project-info

A flavour of sbt-paradox-project-info for Lightbend's use.
Scala
1
star
68

kalix-trial-java-shoppingcart

An e-commerce example for Kalix
Java
1
star
69

telemetry

Lightbend Telemetry - public
Scala
1
star
70

nfs

NFS server and PVC
Smarty
1
star
71

cloudflow-helm-charts

Cloudflow Helm repository
1
star
72

IBMIntegrationGluster

Gluster support for IBM integration
Shell
1
star
73

kalix-cicd-github-actions

Sample JS project using Github Actions CI/CD support for Kalix
JavaScript
1
star
74

monitoringsamples

Sample applications that can be used to test monitoring tools (Insight team)
Scala
1
star
75

flink-k8s-build

This is a simple project to build Flink image with Prometheus jar out of the "standard" Flink images
Shell
1
star
76

akkaserverless-cicd-github-actions

JavaScript
1
star
77

sbt-publish-rsync

An sbt plugin that adds rsync capabilities to your project
Scala
1
star
78

kalix-ws-loan-application-scala

Scala
1
star
79

akkaserverless-value-entity.g8

Template for an Akkaserverless project with a service backed by a value entity
Scala
1
star
80

homebrew-cloudstate

Homebrew tap for csctl - The Lightbend Cloudstate CLI
Ruby
1
star