• Stars
    star
    226
  • Rank 175,205 (Top 4 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 6 years ago
  • Updated 11 days ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Idiomatic, typesafe, and reactive Scala client for Apache Pulsar

pulsar4s - Apache Pulsar Scala Client

build

pulsar4s is a concise, idiomatic, reactive, type safe Scala client for Apache Pulsar. As a simple wrapper over the Java client, we benefit from the reliability and performance of that client while providing better integration with the Scala ecosystem and idioms.

Warning!!

(This disclaimer was written on 2023-01-05.)

Starting in version 2.9.0, we support scala 3. This means we had to perform some "aggressive" bumps on libs:

Libs that were bumped for everyone:

  • play-json 2.10 (Currently in RC7)
  • cats-effect 3.3 (was 2.x)
  • ZIO 2.0 (was 1.x) & zio-cats-interop 23.0.0

Libs that come in different versions across scala versions:

  • avro4s
    • for Scala 3: 5.0+
    • for Scala 2: 4.1+
  • scala-java8-compat
    • for Scala ≥2.13: 1.0.2
    • for Scala 2.12: 0.8.0 (Was already the case before scala 3)

Check carefully that bumping pulsar4s will not break, especially with cats-effect!

Using the client

The first step is to create a client attached to the pulsar cluster, providing the service url.

val client = PulsarClient("pulsar://localhost:6650")

Alternatively, you can use an instance of PulsarClientConfig if you need to set further configuration options such as authentication, tls, timeouts and so on.

val config = PulsarClientConfig("pulsar://localhost:6650", ...)
val client = PulsarClient(config)

Then we can create either a producer or a consumer from the client. We need an implicit schema in scope - more on that later.

To create a producer, we need the topic, and an instance of ProducerConfig. We can set further options on the config object, such as max pending messages, router mode, producer name and so on.

implicit val schema: Schema[String] = Schema.STRING

val topic = Topic("persistent://sample/standalone/ns1/b")
val producerConfig = ProducerConfig(topic, ...)
val producer = client.producer[String](producerConfig)

To create a consumer, we need one or more topics to subscribe to, the subscription name, and an instance of ConsumerConfig. We can set further options on the config object, such as subscription type, consumer name, queue size and so on.

implicit val schema: Schema[String] = Schema.STRING

val topic = Topic("persistent://sample/standalone/ns1/b")
val consumerConfig = ConsumerConfig(Seq(topic), Subscription("mysub"), ...)
val consumerFn = client.consumer[String](ConsumerConfig(, )

Note: Call close() on the client, producer, and consumer once you are finished. The client and producer also implement AutoCloseable and Closeable.

Schemas

A message must be the correct type for the producer or consumer. When a producer or consumer is created, an implicit Schema typeclass must be available. In the earlier examples, you saw that we added an implicit schema for String using implicit val schema: Schema[String] = Schema.STRING.

There are built in schemas for bytes and strings, but other complex types required a custom schema. Some people prefer to write custom typeclasses manually for the types they need to support. Other people like to just have it done automagically. For those people, pulsar4s provides extensions for the well known Scala Json libraries that can be used to generate messages where the body is a JSON representation of the class.

An example of creating a producer for a complex type using the circe json library to generate the schema:

import io.circe.generic.auto._
import com.sksamuel.pulsar4s.circe._

val topic = Topic("persistent://sample/standalone/ns1/b")
val producer = client.producer[Food](ProducerConfig(topic))
producer.send(Food("pizza", "ham and pineapple"))

Note: The imports bring into scope a method that will generate an implicit schema when required.

The following extension modules can be used for automatic schemas

Library Module Import
Circe pulsar4s-circe import io.circe.generic.auto._
import com.sksamuel.pulsar4s.circe._
Jackson pulsar4s-jackson import com.sksamuel.pulsar4s.jackson._
Json4s pulsar4s-json4s import com.sksamuel.pulsar4s.json4s._
Spray Json pulsar4s-spray-json import com.sksamuel.pulsar4s.sprayjson._
Play Json pulsar4s-play-json import com.sksamuel.pulsar4s.playjson._

Producing

There are two ways to send a message - either with a plain value, or with an instance of ProducerMessage. If you do not need to specify extra options on the message - such as key, event time, headers, etc - then you can just send a plain value, and the client will wrap the value in a pulsar message. Alternatively, you can create an instance of ProducerMessage to specify extra options.

Each method can be synchronous or asynchronous. The asynchronous methods return a scala.concurrent.Future. If you are using another effect library, such as cats, scalaz or monix, then pulsar4s also supports those effects. See the section on #effects.

If the send method is successful, you will receive the MessageId of the generated message. If an exception is generated, then in the synchronous methods, you will receive a Failure with the error. In the asynchronous methods the exception will be surfaced as a failed Future.

To send a plain value, we just invoke send with the value:

producer.send("wibble")

Or to send a message, we first create an instance of ProducerMessage.

val message = DefaultProducerMessage(Some("mykey"), "wibble", eventTime = Some(EventTime(System.currentTimeMillis)))
producer.send(message)

Consuming

To receive a message, create a consumer and invoke either the receive, receive(Duration), or the receiveAsync methods. The first two are synchronous and return an instance of ConsumerMessage, blocking if necessary, and the latter is asynchronous, returning a Future (or other effect) with the ConsumerMessage once ready.

val message: Message = consumer.receive

or

val message: Future[T] = consumer.receiveAsync

Once a message has been consumed, it is important to acknowledge the message by using the message id with the ack methods.

consumer.acknowledge(message.messageId)

Akka Streams

Pulsar4s integrates with the outstanding akka-streams library - it provides both a source and a sink. To use this, you need to add a dependency on the pulsar4s-akka-streams module.

Sources

To create a source all that is required is a function that will create a consumer on demand and the message id to seek. The function must return a fresh consumer each time it is invoked. The consumer is just a regular pulsar4s Consumer and can be created in the normal way, for example.

val topic = Topic("persistent://sample/standalone/ns1/b")
val consumerFn = () => client.consumer(ConsumerConfig(topic, subscription))

We pass that function into the source method, providing the seek. Note the imports.

import com.sksamuel.pulsar4s.akka.streams._
val pulsarSource = source(consumerFn, Some(MessageId.earliest))

The materialized value of the source is an instance of Control which provides a method called 'close' which can be used to stop consuming messages. Once the akka streams source is completed (or fails) the consumer will be automatically closed.

Sinks

To create a sink, we need a producer function similar to the source's consumer function. Again, the producer used is just a regular pulsar4s Producer. The function must return a fresh producer each time it is invoked.

val topic = Topic("persistent://sample/standalone/ns1/b")
val producerFn = () => client.producer(ProducerConfig(topic))

We pass that function into the sink method. Once again, take note of the imports.

import com.sksamuel.pulsar4s.akka.streams._
val pulsarSink = sink(producerFn)

A sink requires messages of type ProducerMessage[T] where T is the value type of the message. For example, if we were producing String messages, then we would map our upstream messages into instances of ProducerMessage[String] before passing them to the sink.

import com.sksamuel.pulsar4s.akka.streams._
Source.fromIterator(() => List("a", "b", "c", "d").iterator)
  .map(string => ProducerMessage(string))
  .runWith(sink(producerFn))

A sink will run until the upstream source completes. In other words, to terminate the sink, the source must be cancelled or completed. Once the sink completes the producer will be automatically closed.

The materialized value of the sink is a Future[Done] which will be completed once the upstream source has completed.

There is also an implementation of a 'multi-sink'. Multi-sink allows to produce to multiple topics in Pulsar, while using just 1 sink. Multi-sink expects, in addition to ProducerMessage[T], a Topic, so the input format is (Topic, ProducerMessage[T]). All producers in the sink are lazily-created, once a tuple with a new topic is received. There is also a possibility to provide a collection of topics in the constructing function, to create those topics ahead of time if the names are known. New topics read from the stream will also be created on-the-fly.

Example usage of a multi-sink:

import com.sksamuel.pulsar4s.akka.streams._

val topic1 = Topic("persistent://sample/standalone/ns1/b")
val topic2 = Topic("persistent://sample/standalone/ns1/bb")
val producerFn = (topic: Topic) => client.producer(ProducerConfig(topic))
val pulsarMultiSink = multiSink(producerFn)
# or to create those topics ahead of time:
val pulsarMultiSink2 = multiSink(producerFn, Set(topic1, topic2))

Full Example

Here is a full example of consuming from a topic for 10 seconds, publising the messages back into another topic. Obviously this is a bit of a toy example but shows everything in one place.

import com.sksamuel.pulsar4s.{ConsumerConfig, MessageId, ProducerConfig, PulsarClient, Subscription, Topic}
import org.apache.pulsar.client.api.Schema

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val schema: Schema[Array[Byte]] = Schema.BYTES

val client = PulsarClient("pulsar://localhost:6650")

val intopic = Topic("persistent://sample/standalone/ns1/in")
val outtopic = Topic("persistent://sample/standalone/ns1/out")

val consumerFn = () => client.consumer(ConsumerConfig(Seq(intopic), Subscription("mysub")))
val producerFn = () => client.producer(ProducerConfig(outtopic))

val control = source(consumerFn, Some(MessageId.earliest))
                .map { consumerMessage => ProducerMessage(consumerMessage.data) }
                .to(sink(producerFn)).run()

Thread.sleep(10000)
control.close()

FS2 Support

Pulsar4s integrates with the fs2 library - it provides both a source and a sink. To use this, you need to add a dependency on the pulsar4s-{effect} + pulsar4s-fs2 module.

Example

import com.sksamuel.pulsar4s._
import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._
import com.sksamuel.pulsar4s.fs2.Streams

import org.apache.pulsar.client.api.Schema

implicit val schema: Schema[Array[Byte]] = Schema.BYTES

val client = PulsarClient("pulsar://localhost:6650")

val intopic = Topic("persistent://sample/standalone/ns1/in")
val outtopic = Topic("persistent://sample/standalone/ns1/out")

Streams.batch[IO, Array[Byte]](client.consumerAsync[Array[Byte], IO](ConsumerConfig(
  subscriptionName = Subscription("mysub"),
  topics = Seq(intopic),
  subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
)))
  .map(_.map(ProducerMessage(_.value)))
  .through(Streams.committableSink(client.producerAsync[Array[Byte], IO](ProducerConfig(outtopic))))
  .compile
  .drain

Example SBT Setup

val pulsar4sVersion = "x.x.x"
libraryDependencies ++= Seq(
  "com.clever-cloud.pulsar4s" %% "pulsar4s-core" % pulsar4sVersion,

  // for the akka-streams integration
  "com.clever-cloud.pulsar4s" %% "pulsar4s-akka-streams" % pulsar4sVersion,

  // if you want to use avro for schemas
  "com.clever-cloud.pulsar4s" %% "pulsar4s-avro" % pulsar4sVersion,

  // if you want to use circe for schemas
  "com.clever-cloud.pulsar4s" %% "pulsar4s-circe" % pulsar4sVersion,

  // if you want to use json4s for schemas
  "com.clever-cloud.pulsar4s" %% "pulsar4s-json4s" % pulsar4sVersion,

  // if you want to use jackson for schemas
  "com.clever-cloud.pulsar4s" %% "pulsar4s-jackson" % pulsar4sVersion,

  // if you want to use spray-json for schemas
  "com.clever-cloud.pulsar4s" %% "pulsar4s-spray-json" % pulsar4sVersion,

  // if you want to use play-json for schemas
  "com.clever-cloud.pulsar4s" %% "pulsar4s-play-json" % pulsar4sVersion,

  // if you want to use monix effects
  "com.clever-cloud.pulsar4s" %% "pulsar4s-monix" % pulsar4sVersion,

  // if you want to use scalaz effects
  "com.clever-cloud.pulsar4s" %% "pulsar4s-scalaz" % pulsar4sVersion,

  // if you want to use cats effects
  "com.clever-cloud.pulsar4s" %% "pulsar4s-cats-effect" % pulsar4sVersion,

  // if you want to use fs2
  "com.clever-cloud.pulsar4s" %% "pulsar4s-fs2" % pulsar4sVersion,

  // if you want to use zio
  "com.clever-cloud.pulsar4s" %% "pulsar4s-zio" % pulsar4sVersion
)

Contributions

Contributions to pulsar4s are always welcome. Good ways to contribute include:

  • Raising bugs and feature requests
  • Improving the performance of pulsar4s
  • Adding to the documentation

License

This software is licensed under the Apache 2 license, quoted below.

Copyright 2017-2018 Stephen Samuel

Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.

More Repositories

1

clever-components

Collection of Web Components by Clever Cloud
JavaScript
216
star
2

json2caseclass

Kickstart your scala API client by turning JSON into case classes.
JavaScript
124
star
3

cliparse-node

Declarative CLI parsing for node
JavaScript
84
star
4

clever-tools

The official command line interface for Clever Cloud
JavaScript
69
star
5

Quercus

Java implementation of PHP, by Caucho's Technology, Clever Cloud Development Branch
Java
47
star
6

expressjs-mongodb-statsd-example

Run Express.js with MongoDB and StatsD on Clever Cloud
JavaScript
40
star
7

uuid_to_pokemon.rs

Translate a UUID into a qualified POKéMON name in a consistent way.
Rust
32
star
8

doc.clever-cloud.com

Markdown content of the Clever Cloud documentation
31
star
9

stream-dns

A DNS server written in Go with an event stream architecture, updated via kafka
Go
29
star
10

simple-vmm

example VM monitor using rust-vmm
Rust
29
star
11

awesome-clever-cloud

Curated list of awesome Clever Cloud resources
23
star
12

biscuit-pulsar

Biscuit Authentication and Authorization plugins for Apache Pulsar.
Java
21
star
13

issues-helper

Easily open issues from the command line
Rust
20
star
14

varnish-examples

Examples of Varnish configuration files for Clever Cloud
Perl
18
star
15

kawa

Agnostic representation of HTTP1 and HTTP2, with zero-copy, made for Sōzu.
Rust
18
star
16

documentation

Clever Cloud documentation V.2
HTML
16
star
17

rabbitmq-autoreconnect-java

Helper to automatically handle reconnections automatically in a pool of rabbitmq servers.
Java
16
star
18

clever-cloud.com

The website of Clever Cloud : www.clever-cloud.com
CSS
14
star
19

wadl2json

Convert a remote WADL file into a JSON equivalent
JavaScript
14
star
20

CleverCloud-exheres

Clever Cloud's exherbo repository
13
star
21

terraform-provider-clevercloud

Terraform Clever Cloud provider
Go
13
star
22

warp10.rs

Warp10 client for rust
Rust
12
star
23

anorm-pg-entity

Abstraction around a DB entity wrt anorm. Still in a state of flux.
Scala
11
star
24

Bianca

Bianca is a fast, 100% Java implementation of the PHP language
Java
10
star
25

clever-client.js

JavaScript REST client and utils for Clever Cloud's API
JavaScript
10
star
26

clever-operator

A kubernetes operator that expose clerver cloud's resources through custom resource definition
Rust
10
star
27

escli

Command line tool to administrate ES cluster easily
JavaScript
10
star
28

clevercloud-sdk-rust

A rust client and structures to interact with the Clever-Cloud API.
Rust
9
star
29

cellar-migration

A tool to help migrate between an S3 compatible service to Clever Cloud Cellar service
Rust
9
star
30

rust-docker-example

Run Rust with Docker on Clever Cloud
Rust
8
star
31

rust-guidelines

Rust guidelines we follow at Clever Cloud
8
star
32

wadl-client

Generate a Javascript client for a web API providing a WADL description
JavaScript
8
star
33

node-pulsar-rust-backed

WIP, POC of node js driver for pulsar backed by rust
JavaScript
6
star
34

logstash-output-pulsar

Ruby
6
star
35

expressjs-postgresql-example

Run Express.js with PostgreSQL on Clever Cloud
JavaScript
6
star
36

gitlab-runner-generators

A set of scripts to generate GitLab Runners and deploy them on Clever Cloud
Shell
6
star
37

fmkcms

JavaScript
6
star
38

wemos-ws-led

C++
5
star
39

n8n-example

Run n8n on Clever Cloud
TypeScript
5
star
40

biscuit-wasm

Web Assembly library for the Biscuit token
Rust
5
star
41

meteorjs-mongodb-example

Run Meteor with MongoDB on Clever Cloud
HTML
5
star
42

telegraf-output-warp10

Go
5
star
43

phoenix-docker-example

Run Phoenix with Docker on Clever Cloud.
Elixir
5
star
44

flask-example

Run Flask on Clever Cloud
Python
5
star
45

biscuit-cryptography-example

Example implementation for Biscuit tokens cryptography
Rust
5
star
46

plant-a-tree

💡⛅ Clever Cloud will make sure a new tree 🌳 is planted for each new deployment of this app on Clever Cloud 💡⛅
JavaScript
5
star
47

warp10-scala-client

Warp10 Scala client which is using Pekko HTTP or Streams.
Scala
5
star
48

metrics-spec

An attempt to describe semantics
Rust
4
star
49

clever-components-cdn

Small endpoint to serve Clever Cloud components dynamicly.
JavaScript
4
star
50

Viadeo4J

Viadeo java API
Java
4
star
51

keras-example

Run Keras on Clever Grid
Python
4
star
52

clever-cloud-review-app

Deploy, sync and delete review apps on Clever Cloud for each PR
HTML
4
star
53

mformater

a tool for source code format
Shell
3
star
54

JavaLocaleUtil

some good things for playing with java.util.Locale
Java
3
star
55

clever-intellij-plugin

Java
3
star
56

mkv-go-cli

Clever Cloud's Materia serverless key-value store CLI, written in Go
Go
3
star
57

mock-smtp-server

A mock smtp server
JavaScript
3
star
58

nodejs-tcp-chat-example

Run a dead simple TCP chat on Clever Cloud
JavaScript
3
star
59

clever-wordpress

Install and manage Wordpress with Bedrock's composer
PHP
3
star
60

foundationdb-exporter

A FoundationDB metrics Exporter with Prometheus compatibility
Rust
3
star
61

Community

Clever Cloud Community Forums
3
star
62

rust-wemos-example

Run Rust Wemos on Clever Cloud
JavaScript
2
star
63

php-composer-example

Run PHP with Composer on Clever Cloud
JavaScript
2
star
64

clevercloud-provider-go

Go
2
star
65

fmkecom

fmk extends for commercials
Java
2
star
66

hugo-cc-doc-theme

Clever Cloud's gohugo documentation theme
HTML
2
star
67

botr4j

Bits on the run java API
Java
2
star
68

Clever-Cloud-overlay

Subpart of Keruspe overlay
Shell
2
star
69

botr4j-example

Usages Examples for botr4j
Java
2
star
70

quercus-modules

Quercus external modules global repository
Java
2
star
71

fmkcms_site

Java
2
star
72

seaside-example

Run Seaside web application with Pharo Smalltalk on Clever Cloud
Smalltalk
2
star
73

wemos-statsd-example

Run Wemos with StatsD on Clever Cloud
JavaScript
2
star
74

testcontainers-ceph

Testcontainers implementation for Ceph, a distributed object, block, and file storage platform
Java
2
star
75

nodejs-pulsar-example

This repository demonstrates the usage of Apache Pulsar with a dead simple Node.js application.
JavaScript
2
star
76

warp10-macaroons-plugin

Authentication plugin adding macaroons support to warp10
Java
2
star
77

grafana-example

Run Grafana on Clever Cloud
Shell
2
star
78

Menhir

Templating engine
Java
1
star
79

biscuit-playground

Frontend application to test Biscuit tokens
JavaScript
1
star
80

biscuit-c

C bindings to Biscuit
Rust
1
star
81

opencatalog

App and API catalog design for cities
1
star
82

springboot-mongo-example

Run Spring Boot with MongoDB on Clever Cloud
Java
1
star
83

CandidateMaterials

Clever Cloud Candidate Materials for engineering hiring process
1
star
84

hapijs-statsd-example

Run Hapi.js with StatsD on Clever Cloud
JavaScript
1
star
85

.NET-example

Deploy a .NET application on Clever Cloud
HTML
1
star
86

play-scala-example

Deploy a Scala app built with Play Framework
JavaScript
1
star
87

mattermost-example

Run Mattermost on Clever Cloud
Shell
1
star
88

ollama-chat-deploy

Deploy AI models on Clever Cloud with Ollama
Shell
1
star
89

sozu-prometheus-connector

Exposes Sōzu metrics in a prometheus format on HTTP
Rust
1
star
90

onboarder

Welcome to Clever Cloud
JavaScript
1
star
91

mkv-raw-tcp-v

Clever Cloud's Materia serverless key-value store raw TCP demo, written in V
V
1
star
92

auth-sandbox

WIP
JavaScript
1
star
93

svelte

Deploy a Svelte app on Clever Cloud
JavaScript
1
star
94

quercus-apc

Quercus Extension: APC - The Alternative PHP Cache (APC) is a free and open opcode cache for PHP.
Java
1
star
95

sozu-pulsar-connector

Listens on a Pulsar topic to transmit requests to Sōzu
Rust
1
star
96

clevercloud-warp10-datasource

Grafana datasource for Warp 10 platform
TypeScript
1
star
97

quercus-pdflib

Quercus Extension: PDF - The PDF functions in PHP can create PDF files using the PDFlib library
Java
1
star
98

systemd-services

Many systemd services files
Shell
1
star
99

oauth10a-rust

A rust implementation of the oauth 1.0a protocol
Rust
1
star
100

goji-statsd-example

Run Goji with StatsD on Clever Cloud
Go
1
star