• Stars
    star
    122
  • Rank 281,471 (Top 6 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 7 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Lego bricks to build Apache Kafka serializers and deserializers

Kafka serialization/deserialization building blocks

CircleCI Badge Codacy Badge Download

The aim of this library is to provide the Legoâ„¢ bricks to build a serializer/deserializer for kafka messages.

The serializers/deserializers built by this library cannot be used in the Kafka configuration through properties, but need to be passed through the Kafka Producer/Consumer constructors (It is feature IMHO).

For the Avro serialization this library uses Avro4s while for JSON it supports Json4s, Circe and Spray out of the box. It is quite easy to add support for other libraries as well.

Modules

The library is composed by these modules:

  • kafka-serialization-core: provides the serialization primitives to build serializers and deserializers.
  • kafka-serialization-cats: provides cats typeclasses instances for serializers and deserializers.
  • kafka-serialization-json4s: provides serializer and deserializer based on Json4s
  • kafka-serialization-jsoniter-scala: provides serializer and deserializer based on Jsoniter Scala
  • kafka-serialization-spray: provides serializer and deserializer based on Spray Json
  • kafka-serialization-circe: provides serializer and deserializer based on Circe
  • kafka-serialization-avro: provides an schema-registry client settings
  • kafka-serialization-avro4s: provides serializer and deserializer based on Avro4s 1.x
  • kafka-serialization-avro4s2: provides serializer and deserializer based on Avro4s 2.x

The Avro4s serialization support the schema evolution through the schema registry. The consumer can provide its own schema and Avro will take care of the conversion.

Getting Started

  • The library is available in the Kaluza artifactory repository.
  • See here for the latest version.
  • Add this snippet to your build.sbt to use it:
import sbt._
import sbt.Keys.

resolvers += "Artifactory" at "https://kaluza.jfrog.io/artifactory/maven"

libraryDependencies ++= {
  val kafkaSerializationV = "0.5.25"
  Seq(
    "com.ovoenergy" %% "kafka-serialization-core" % kafkaSerializationV,
    "com.ovoenergy" %% "kafka-serialization-circe" % kafkaSerializationV, // To provide Circe JSON support
    "com.ovoenergy" %% "kafka-serialization-json4s" % kafkaSerializationV, // To provide Json4s JSON support
    "com.ovoenergy" %% "kafka-serialization-jsoniter-scala" % kafkaSerializationV, // To provide Jsoniter Scala JSON support
    "com.ovoenergy" %% "kafka-serialization-spray" % kafkaSerializationV, // To provide Spray-json JSON support
    "com.ovoenergy" %% "kafka-serialization-avro4s" % kafkaSerializationV // To provide Avro4s Avro support
  )
}

Circe example

Circe is a JSON library for Scala that provides support for generic programming trough Shapeless. You can find more information on the Circe website.

Simple serialization/deserialization example with Circe:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.circe._

// Import the Circe generic support
import io.circe.generic.auto._
import io.circe.syntax._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._

import scala.collection.JavaConverters._

case class UserCreated(id: String, name: String, age: Int)

val producer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit], 
  circeJsonSerializer[UserCreated]
)

val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  circeJsonDeserializer[UserCreated]
)

Jsoniter Scala example

Jsoniter Scala. is a library that generates codecs for case classes, standard types and collections to get maximum performance of JSON parsing & serialization.

Here is an example of serialization/deserialization with Jsoniter Scala:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.jsoniter_scala._

// Import the Jsoniter Scala macros & core support
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.github.plokhotnyuk.jsoniter_scala.core._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._

import scala.collection.JavaConverters._

case class UserCreated(id: String, name: String, age: Int)

implicit val userCreatedCodec: JsonValueCodec[UserCreated] = JsonCodecMaker.make[UserCreated](CodecMakerConfig)

val producer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit],
  jsoniterScalaSerializer[UserCreated]()
)

val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  jsoniterScalaDeserializer[UserCreated]()
)

Avro example

Apache Avro is a remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.

Apache Avro provide some support to evolve your messages across multiple version without breaking compatibility with older or newer consumers. It supports several encoding formats but two are the most used in Kafka: Binary and Json.

The encoded data is always validated and parsed using a Schema (defined in JSON) and eventually evolved to the reader Schema version.

This library provided the support to Avro by using the Avro4s libray. It uses macro and shapeless to allowing effortless serialization and deserialization. In addition to Avro4s it need a Confluent schema registry in place, It will provide a way to control the format of the messages produced in kafka. You can find more information in the Confluent Schema Registry Documentation .

An example with Avro4s binary and Schema Registry:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.avro4s._

import com.sksamuel.avro4s._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._

import scala.collection.JavaConverters._

val schemaRegistryEndpoint = "http://localhost:8081"

case class UserCreated(id: String, name: String, age: Int)

// This type class is need by the avroBinarySchemaIdSerializer
implicit val UserCreatedToRecord = ToRecord[UserCreated]

val producer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit], 
  avroBinarySchemaIdSerializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = true)
)

// This type class is need by the avroBinarySchemaIdDeserializer
implicit val UserCreatedFromRecord = FromRecord[UserCreated]

val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  avroBinarySchemaIdDeserializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = true)
)

This Avro serializer will try to register the schema every new message type it will serialize and will save the obtained schema id in cache. The deserializer will contact the schema registry each time it will encounter a message with a never seen before schema id.

The schema id will encoded in the first 4 bytes of the payload. The deserializer will extract the schema id from the payload and fetch the schema from the schema registry. The deserializer is able to evolve the original message to the consumer schema. The use case is when the consumer is only interested in a part of the original message (schema projection) or when the original message is in a older or newer format of the cosumer schema (schema evolution).

An example of the consumer schema:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.avro4s._

import com.sksamuel.avro4s._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._

import scala.collection.JavaConverters._

val schemaRegistryEndpoint = "http://localhost:8081"

/* Assuming the original message has been serialized using the 
 * previously defined UserCreated class. We are going to project
 * it ignoring the value of the age
 */
case class UserCreated(id: String, name: String)

// This type class is need by the avroBinarySchemaIdDeserializer
implicit val UserCreatedFromRecord = FromRecord[UserCreated]


/* This type class is need by the avroBinarySchemaIdDeserializer 
 * to obtain the consumer schema
 */
implicit val UserCreatedSchemaFor = SchemaFor[UserCreated]

val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  avroBinarySchemaIdWithReaderSchemaDeserializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = false)
)

Format byte

The Original Confluent Avro serializer/deserializer prefix the payload with a "magic" byte to identify that the message has been written with the Avro serializer.

Similarly this library support the same mechanism by mean of a couple of function. It is even able to multiplex and demultiplex different serializers/deserializers based on that format byte. At the moment the supported formats are

  • JSON
  • Avro Binary with schema ID
  • Avro JSON with schema ID

let's see this mechanism in action:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.avro4s._
import com.ovoenergy.kafka.serialization.circe._

// Import the Circe generic support
import io.circe.generic.auto._
import io.circe.syntax._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._
import scala.collection.JavaConverters._


sealed trait Event
case class UserCreated(id: String, name: String, email: String) extends Event

val schemaRegistryEndpoint = "http://localhost:8081"

/* This producer will produce messages in Avro binary format */
val avroBinaryProducer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit],   
  formatSerializer(Format.AvroBinarySchemaId, avroBinarySchemaIdSerializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = false))
)

/* This producer will produce messages in Json format */
val circeProducer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit],   
  formatSerializer(Format.Json, circeJsonSerializer[UserCreated])
)

/* This consumer will be able to consume messages from both producer */
val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  formatDemultiplexerDeserializer[UserCreated](unknownFormat => failingDeserializer(new RuntimeException("Unsupported format"))){
    case Format.Json => circeJsonDeserializer[UserCreated]
    case Format.AvroBinarySchemaId => avroBinarySchemaIdDeserializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = false)
  }
)

/* This consumer will be able to consume messages in Avro binary format with the magic format byte at the start */
val avroBinaryConsumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  avroBinarySchemaIdDeserializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = true)
)

You can notice that the formatDemultiplexerDeserializer is little bit nasty because it is invariant in the type T so all the demultiplexed serialiazer must be declared as Deserializer[T].

There are other support serializer and deserializer, you can discover them looking trough the code and the tests.

Useful de-serializers

In the core module there are pleanty of serializers and deserializers that handle generic cases.

Optional deserializer

To handle the case in which the data is null, you need to wrap the deserializer in the optionalDeserializer:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.circe._

// Import the Circe generic support
import io.circe.generic.auto._
import io.circe.syntax._

import org.apache.kafka.common.serialization.Deserializer

case class UserCreated(id: String, name: String, age: Int)

val userCreatedDeserializer: Deserializer[Option[UserCreated]] = optionalDeserializer(circeJsonDeserializer[UserCreated])

Cats instances

The cats module provides the Functor typeclass instance for the Deserializer and Contravariant instance for the Serializer. This allow to do:

import cats.implicits._
import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.cats._
import org.apache.kafka.common.serialization.{Serializer, Deserializer, IntegerSerializer, IntegerDeserializer}

val intDeserializer: Deserializer[Int] = (new IntegerDeserializer).asInstanceOf[Deserializer[Int]]
val stringDeserializer: Deserializer[String] = intDeserializer.map(_.toString)
 
val intSerializer: Serializer[Int] = (new IntegerSerializer).asInstanceOf[Serializer[Int]]
val stringSerializer: Serializer[String] = intSerializer.contramap(_.toInt)

Complaints and other Feedback

Feedback of any kind is always appreciated.

Issues and PR's are welcome as well.

About this README

The code samples in this README file are checked using mdoc.

This means that the README.md file is generated from docs/src/README.md. If you want to make any changes to the README, you should:

  1. Edit docs/src/README.md
  2. Run sbt mdoc to regenerate ./README.md
  3. Commit both files to git

More Repositories

1

gitoops

all paths lead to clouds
Go
613
star
2

castle

Framework for building Kafka and avro based apps in typescript
TypeScript
74
star
3

circleci-orbs

CircleCI Orbs
Shell
52
star
4

ring-jwt

Ring middleware to parse, decide and verify JWT tokens
Clojure
49
star
5

cloud_sql_backup

A script for backing up GCP Cloud SQL instances
Shell
30
star
6

natchez-extras

Integrations between Natchez, Doobie, HTTP4s, Log4cats and Datadog. Formerly called effect-utils.
Scala
29
star
7

cloud-key-rotator

A Golang program to rotate AWS & GCP account keys
Go
26
star
8

clj-gcp

Clojure utilities for the Google Cloud Platform.
Clojure
23
star
9

ciris-kubernetes

Kubernetes support for Ciris
Scala
21
star
10

kafka-avro-confluent

Kafka De/Serializer using avro and Confluent's Schema Registry
Clojure
20
star
11

comms-aws

AWS scala native implementation
Scala
19
star
12

genesys-web-messaging-tester

Easily write automated tests for Genesys' Web Messenger bots
TypeScript
18
star
13

serverless-xstate-demo

Demo of running xstate on serverless AWS infrastructure
TypeScript
17
star
14

shipit

Scala
13
star
15

ciris-aws-secretsmanager

Scala
11
star
16

ciris-aws-ssm

Scala
11
star
17

mantle

Simplifying encryption with 256 bit, AES, GCM, KMS and Envelope Encryption
Go
10
star
18

avro-mock-generator

Generate random mock data from an Avro Schema
TypeScript
8
star
19

bit-node-tools

Misc node tools from the Boost Internal Tools team
TypeScript
8
star
20

iam-service-account-controller

Kubernetes controller that automatically manages AWS IAM roles for ServiceAccounts
Go
8
star
21

kiss

AWS-based secrets management for Kubernetes
Go
6
star
22

ciris-aiven-kafka

Aiven Kafka support for Ciris
Scala
6
star
23

bigquery-metrics-exporter

A Golang application to export table level metrics from BigQuery into Datadog.
Go
6
star
24

apollo-datasource-s3

S3DataSource is responsible for fetching data from a given s3 bucket, using aws js api.
TypeScript
5
star
25

meters4s

Scala
5
star
26

cypress-tourist

Visit a place, take a picture. Compare.
JavaScript
5
star
27

VertFlow

Run Docker containers on Airflow using green energy.
Python
5
star
28

comms-deduplication

Scala
4
star
29

ssm-env-secrets

Load secrets from ssm transparently
TypeScript
4
star
30

go-sync

Go library for synchronising all the things
Go
4
star
31

pm-timeline-generator

Post-mortem Timeline Generator - Export reacted messages from Slack via a DM to a Slackbot.
Python
4
star
32

apollo-datasource-soap

SOAPDataSource is responsible for fetching data using soap forApollo GraphQL client
TypeScript
3
star
33

kafka-clj-utils

Clojure utilities for Kafka
Clojure
3
star
34

clj-aiven

Clojure
3
star
35

clj-kafka-repl

General purpose Clojure REPL functions for interrogating Kafka.
Clojure
3
star
36

duct.middleware.ring-jwt

Clojure
3
star
37

terraform-module-fargate-app

A terraform module that builds a great Fargate Application
HCL
3
star
38

cloud-key-client

A Golang client to interact with Cloud Providers' Service Account keys
Go
3
star
39

async-reactor-ts

Render async Stateless Functional Components in React
TypeScript
2
star
40

pg-sql-migrate

A very small library for running sql migrations with postgres.
TypeScript
2
star
41

cert-expiry-monitor

Tiny container to expose certificate expiration dates as Prometheus metrics
Python
2
star
42

datastore4s

Scala
1
star
43

roadie-backstage-plugins

All Backstage plugins created by Roadie.
TypeScript
1
star
44

sbt-credstash

Scala
1
star
45

helm-bulk

Helm plugin that loads or saves Helm releases from File to Cluster, or Cluster to File, respectively
Go
1
star
46

github-actions

JavaScript
1
star
47

cq-provider-cloudflare

CloudQuery Cloudflare provider
Go
1
star
48

ovo-backstage-plugins

Our own plugins developed for use with Backstage
JavaScript
1
star