• Stars
    star
    185
  • Rank 202,307 (Top 5 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 7 years ago
  • Updated over 3 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Scala DSL for Unit-Testing Processing Topologies in Kafka Streams

Mocked Streams

Build Status Codacy Badge codecov License GitHub stars Maven Central

Documentation located at http://mockedstreams.madewithtea.com/

Mocked Streams 3.9.0 (git) is a library for Scala 2.12 and 2.13 which allows you to unit-test processing topologies of Kafka Streams applications (since Apache Kafka >=0.10.1) without Zookeeper and Kafka Brokers. Further, you can use your favourite Scala testing framework e.g. ScalaTest and Specs2. Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your SBT dependencies:

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.9.0" % "test"

Java 8 port of Mocked Streams is Mockafka

Apache Kafka Compatibility

Mocked Streams Version Apache Kafka Version
3.9.0 2.7.0.0
3.8.0 2.6.1.0
3.7.0 2.5.0.0
3.6.0 2.4.1.0
3.5.2 2.4.0.0
3.5.1 2.4.0.0
3.5.0 2.4.0.0
3.4.0 2.3.0.0
3.3.0 2.2.0.0
3.2.0 2.1.1.0
3.1.0 2.1.0.0
2.2.0 2.1.0.0
2.1.0 2.0.0.0
2.0.0 2.0.0.0
1.8.0 1.1.1.0
1.7.0 1.1.0.0
1.6.0 1.0.1.0
1.5.0 1.0.0.0
1.4.0 0.11.0.1
1.3.0 0.11.0.0
1.2.1 0.10.2.1
1.2.0 0.10.2.0
1.1.0 0.10.1.1
1.0.0 0.10.1.0

Simple Example

It wraps the org.apache.kafka.streams.TopologyTestDriver class, but adds more syntactic sugar to keep your test code simple:

import com.madewithtea.mockedstreams.MockedStreams

val input = Seq(("x", "v1"), ("y", "v2"))
val exp = Seq(("x", "V1"), ("y", "V2"))
val strings = Serdes.String()

MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("topic-in", strings, strings, input)
  .output("topic-out", strings, strings) shouldEqual exp

Multiple Input / Output Example and State

It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(stores: Seq[String]):

import com.madewithtea.mockedstreams.MockedStreams

val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

mstreams.output("out-a", strings, ints) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints) shouldEqual(expectedB)

Record order and multiple emissions

The records provided to the mocked stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit records multiple times to the same topics, at various moments in your scenario.

This can be handy to validate that your topology behaviour is or is not dependent on the order in which the records are received and processed.

In the example below, 2 records are first submitted to topic A, then 3 to topic B, then 1 more to topic A again.

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
  .topology(topologyTables) // Scala DSL
  .input(InputATopic, strings, ints, firstInputForTopicA)
  .input(InputBTopic, strings, ints, firstInputForTopicB)
  .input(InputATopic, strings, ints, secondInputForTopicA)

State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method:

import com.madewithtea.mockedstreams.MockedStreams

 val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

 mstreams.stateTable("store-name") shouldEqual Map('a' -> 1) 

Window State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2 and added the timestamp extractor to the config, you are able to verify the window state store content via the .windowStateTable(name: String, key: K) method:

import com.madewithtea.mockedstreams.MockedStreams

val props = new Properties
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  classOf[TimestampExtractors.CustomTimestampExtractor].getName)

val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .stores(Seq("store-name"))
  .config(props)

mstreams.windowStateTable("store-name", "x") shouldEqual someMapX
mstreams.windowStateTable("store-name", "y") shouldEqual someMapY

Adding Timestamps

With .input the input records timestamps are set to 0 default timestamp of 0. This e.g. prevents testing Join windows of Kafka streams as it cannot produce records with different timestamps. However, using .inputWithTime allows adding timestamps like in the following example:

val inputA = Seq(
  ("x", int(1), 1000L),
  ("x", int(1), 1001L),
  ("x", int(1), 1002L)
)

val builder = MockedStreams()
  .topology(topology1WindowOutput) // Scala DSL
  .inputWithTime(InputCTopic, strings, ints, inputA)
  .stores(Seq(StoreName))

Custom Streams Configuration

Sometimes you need to pass a custom configuration to Kafka Streams:

import com.madewithtea.mockedstreams.MockedStreams

  val props = new Properties
  props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName)

  val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .config(props)
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

mstreams.output("out-a", strings, ints) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints) shouldEqual(expectedB)

Companies using Mocked Streams

More Repositories

1

twitterstream

Twitter Streaming API Example with Kafka Streams in Scala
Scala
49
star
2

django-todo

A simple todo list REST JSON backend with OAuth2
Python
43
star
3

kafcache

Kafka Streams + Memcached (e.g. AWS ElasticCache) for low-latency in-memory lookups
Scala
13
star
4

blockchain-importer

Functional, typesafe, well-tested and composable streaming blockchain importer, drop-in solution for Kafka ecosystem.
Scala
13
star
5

chopchop

React-based paged-reader for the browser, aims for mouseless controls and stylish looks.
JavaScript
12
star
6

blockchain-rpc

Functional, typesafe and well-tested JSON RPC client for Bitcoin, Ethereum and Omni full nodes
Scala
12
star
7

cookiecutter-scala-spark

A cookiecutter template for Apache Spark applications written in Scala
Scala
10
star
8

ethsync

Reliable at-least-once Ethereum to Kafka bridge in Scala
Scala
10
star
9

cairosketchpad

Live coding sketchpad for Python/Cairo
Python
8
star
10

django-highscore

Simple Django REST JSON API for Scores / Highscore / Leaderboard
Python
5
star
11

pixijs-dat-template

PixiJS and dat.GUI template
4
star
12

coding-challenges-solutions

Solutions of coding challenges I wrote, quality differs, spent less or more time.
Python
3
star
13

tinybuddhawisdom

Tiny Buddha wisdoms for your shell
Python
3
star
14

bottledynamo

Good enough AWS DynamoDB abstraction in Scala with Circe JSON serialization using Twitter Futures
Scala
3
star
15

crassula

Lightweight Python coding companion for agile programming
Python
3
star
16

pentaandroid

The official repository of the Penta game for Android
Java
3
star
17

evopy

Framework for experimenting with evolution strategies
Python
2
star
18

hclu

Hierachical clustering in Python
Python
2
star
19

tracking-api-example

Event Tracking API example with Finatra in Scala
Scala
2
star
20

paprweb

Micro-site for the papr application; PDF generation
CSS
2
star
21

butterpack

file to image encoding and image to file decoding
C++
2
star
22

apexchart-example

JavaScript
1
star
23

example-cpuusage-docker

Example: Docker container with Finatra web service and Spark aggregation
1
star
24

spider-hackernews

A simple news.ycombinator.com scrapy spider
Python
1
star
25

ea-sandbox

Some evolutionary algorithms in Python and Haskell to play arround with.
Python
1
star
26

of-experiments

OpenFramework experiments
C++
1
star
27

twitter-intervals

The Missing Intervals of Twitter Util Time
Scala
1
star
28

ingest-token-prices

Rust service to maintain last/30d historical token prices from Coingecko without API key
Rust
1
star
29

tracking-api

Finatra-based service for web event tracking
Scala
1
star
30

KafkaTuner

Tune Kafka Microservice from Command Line
1
star
31

flow-sandbox

1
star
32

reebo.github.io

HTML
1
star
33

react-plotly-experiments

JavaScript
1
star
34

substrate-frame

Rust
1
star
35

chatgpt-codes

using chatGPT in non-trivial programming tasks
Rust
1
star
36

learning-rust

Rust
1
star
37

mockedstreams-doc

CSS
1
star
38

helloedgeware

Rust
1
star
39

egui-playground

Rust
1
star
40

first-near-project

1
star
41

flow-cadence-sandbox

1
star
42

elearn

Very simple flashcard-based e-learning system for the command line
Haskell
1
star