ZIO Kafka
ZIO Kafka is a Kafka client for ZIO. It provides a purely functional, streams-based interface to the Kafka client and integrates effortlessly with ZIO and ZIO Streams.
Introduction
Apache Kafka is a distributed event streaming platform that acts as a distributed publish-subscribe messaging system. It enables us to build distributed streaming data pipelines and event-driven applications.
Kafka has a mature Java client for producing and consuming events, but it has a low-level API. ZIO Kafka is a ZIO native client for Apache Kafka. It has a high-level streaming API on top of the Java client. So we can produce and consume events using the declarative concurrency model of ZIO Streams.
Installation
In order to use this library, we need to add the following line in our build.sbt
file:
libraryDependencies += "dev.zio" %% "zio-kafka" % "2.3.2"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.3.2" % Test
Example
Let's write a simple Kafka producer and consumer using ZIO Kafka with ZIO Streams. Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the docker-compose.yml
file and run docker-compose up
:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Now, we can run our ZIO Kafka Streaming application:
import zio._
import zio.kafka.consumer._
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.kafka.serde._
import zio.stream.ZStream
object MainApp extends ZIOAppDefault {
val producer: ZStream[Producer, Throwable, Nothing] =
ZStream
.repeatZIO(Random.nextIntBetween(0, Int.MaxValue))
.schedule(Schedule.fixed(2.seconds))
.mapZIO { random =>
Producer.produce[Any, Long, String](
topic = "random",
key = random % 4,
value = random.toString,
keySerializer = Serde.long,
valueSerializer = Serde.string
)
}
.drain
val consumer: ZStream[Consumer, Throwable, Nothing] =
Consumer
.plainStream(Subscription.topics("random"), Serde.long, Serde.string)
.tap(r => Console.printLine(r.value))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.drain
def producerLayer =
ZLayer.scoped(
Producer.make(
settings = ProducerSettings(List("localhost:29092"))
)
)
def consumerLayer =
ZLayer.scoped(
Consumer.make(
ConsumerSettings(List("localhost:29092")).withGroupId("group")
)
)
override def run =
producer.merge(consumer)
.runDrain
.provide(producerLayer, consumerLayer)
}
Resources
- An Introduction to ZIO Kafka
- Streaming microservices with ZIO and Kafka by Aleksandar Skrbic (February 2021)
- ZIO WORLD - ZIO Kafka by Aleksandar Skrbic (March 2020) β Aleksandar Skrbic presented ZIO Kafka, a critical library for the modern Scala developer, which hides some of the complexities of Kafka.
Adopters
Here is a partial list of companies using zio-kafka in production.
Want to see your company here? Submit a PR!
Documentation
Learn more on the ZIO Kafka homepage!
Contributing
For the general guidelines, see ZIO contributor's guide.
Code of Conduct
See the Code of Conduct
Support
Credits
This library is heavily inspired and made possible by the research and implementation done in Alpakka Kafka, a library maintained by the Akka team and originally written as Reactive Kafka by SoftwareMill.
License
Copyright 2021-2023 Itamar Ravid and the zio-kafka contributors.