• Stars
    star
    222
  • Rank 179,123 (Top 4 %)
  • Language
    Scala
  • License
    BSD 3-Clause "New...
  • Created over 11 years ago
  • Updated 12 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Event Store JVM Client

Event Store JVM Client Continuous Integration Coverage Status Version

Scala 2.13.7 / 2.12.15
Akka 2.6.17
Event Store v5.x, v20.x and v21.x are supported

We have two APIs available:

  • Calling methods on EsConnection

We are using scala.concurrent.Future for asynchronous calls, however it is not friendly enough for Java users. In order to make Java devs happy and not reinvent a wheel, we propose to use tools invented by Akka team. Check it out

final EsConnection connection = EsConnectionFactory.create(system);
final Future<Event> future    = connection.readEvent("my-stream", new EventNumber.Exact(0), false, null);
val connection = EsConnection(system)
val future     = connection(ReadEvent(EventStream.Id("my-stream"), EventNumber.First))
  • Sending messages to eventstore.ConnectionActor
final ActorRef connection = system.actorOf(ConnectionActor.getProps());
final ReadEvent readEvent = new ReadEventBuilder("my-stream").first().build();
connection.tell(readEvent, null);
val connection = system.actorOf(ConnectionActor.props())
connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

Setup

Sbt

libraryDependencies += "com.geteventstore" %% "eventstore-client" % "7.4.0"

Maven

<dependency>
    <groupId>com.geteventstore</groupId>
    <artifactId>eventstore-client_${scala.version}</artifactId>
    <version>7.4.0</version>
</dependency>

Java examples

Read event

import java.net.InetSocketAddress;
import akka.actor.*;
import akka.actor.Status.Failure;
import akka.event.*;
import eventstore.j.*;
import eventstore.core.*;
import eventstore.akka.Settings;
import eventstore.akka.tcp.ConnectionActor;

public class ReadEventExample {

    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create();
        final Settings settings = new SettingsBuilder()
                .address(new InetSocketAddress("127.0.0.1", 1113))
                .defaultCredentials("admin", "changeit")
                .build();
        final ActorRef connection = system.actorOf(ConnectionActor.getProps(settings));
        final ActorRef readResult = system.actorOf(Props.create(ReadResult.class));

        final ReadEvent readEvent = new ReadEventBuilder("my-stream")
                .first()
                .resolveLinkTos(false)
                .requireMaster(true)
                .build();

        connection.tell(readEvent, readResult);
    }


    public static class ReadResult extends AbstractActor {
        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(ReadEventCompleted.class, m -> {
                        final Event event = m.event();
                        log.info("event: {}", event);
                        context().system().terminate();
                    })
                    .match(Failure.class, f -> {
                        final EsException exception = (EsException) f.cause();
                        log.error(exception, exception.toString());
                        context().system().terminate();
                    })
                    .build();
        }
    }
}

Write event

import java.util.UUID;
import akka.actor.*;
import akka.event.*;
import eventstore.j.*;
import eventstore.core.*;
import eventstore.akka.tcp.ConnectionActor;

public class WriteEventExample {

    public static void main(String[] args) {

        final ActorSystem system   = ActorSystem.create();
        final ActorRef connection  = system.actorOf(ConnectionActor.getProps());
        final ActorRef writeResult = system.actorOf(Props.create(WriteResult.class));

        final EventData event = new EventDataBuilder("my-event")
                .eventId(UUID.randomUUID())
                .data("my event data")
                .metadata("my first event")
                .build();

        final WriteEvents writeEvents = new WriteEventsBuilder("my-stream")
                .addEvent(event)
                .expectAnyVersion()
                .build();

        connection.tell(writeEvents, writeResult);
    }

    public static class WriteResult extends AbstractActor {

        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(WriteEventsCompleted.class, m -> {
                        log.info("range: {}, position: {}", m.numbersRange(), m.position());
                        context().system().terminate();
                    })
                    .match(Status.Failure.class, f -> {
                        final EsException exception = (EsException) f.cause();
                        log.error(exception, exception.toString());
                    })
                    .build();
        }

    }
}

Subscribe to All

import java.io.Closeable;
import akka.actor.ActorSystem;
import eventstore.j.*;
import eventstore.core.IndexedEvent;
import eventstore.akka.SubscriptionObserver;

public class SubscribeToAllExample {
    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create();
        final EsConnection connection = EsConnectionFactory.create(system);
        final Closeable closeable = connection.subscribeToAll(new SubscriptionObserver<IndexedEvent>() {
            @Override
            public void onLiveProcessingStart(Closeable subscription) {
                system.log().info("live processing started");
            }

            @Override
            public void onEvent(IndexedEvent event, Closeable subscription) {
                system.log().info(event.toString());
            }

            @Override
            public void onError(Throwable e) {
                system.log().error(e.toString());
            }

            @Override
            public void onClose() {
                system.log().error("subscription closed");
            }
        }, false, null);
    }
}

Build event

import java.util.UUID;
import eventstore.core.EventData;
import eventstore.j.EventDataBuilder;

public class EventDataBuilderExample {

    final EventData empty = new EventDataBuilder("eventType").build();

    final EventData binary = new EventDataBuilder("binary")
            .eventId(UUID.randomUUID())
            .data(new byte[]{1, 2, 3, 4})
            .metadata(new byte[]{5, 6, 7, 8})
            .build();

    final EventData string = new EventDataBuilder("string")
            .eventId(UUID.randomUUID())
            .data("data")
            .metadata("metadata")
            .build();

    final EventData json = new EventDataBuilder("json")
            .eventId(UUID.randomUUID())
            .jsonData("{\"data\":\"data\"}")
            .jsonMetadata("{\"metadata\":\"metadata\"}")
            .build();
}

Scala examples

Read event

import java.net.InetSocketAddress
import _root_.akka.actor._
import _root_.akka.actor.Status.Failure
import eventstore.akka.tcp.ConnectionActor

object ReadEventExample extends App {
  val system = ActorSystem()

  val settings = Settings(
    address = new InetSocketAddress("127.0.0.1", 1113),
    defaultCredentials = Some(UserCredentials("admin", "changeit"))
  )

  val connection = system.actorOf(ConnectionActor.props(settings))
  implicit val readResult = system.actorOf(Props[ReadResult]())

  connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

  class ReadResult extends Actor with ActorLogging {
    def receive = {
      case ReadEventCompleted(event) =>
        log.info("event: {}", event)
        shutdown()

      case Failure(e: EsException) =>
        log.error(e.toString)
        shutdown()
    }

    def shutdown(): Unit = { context.system.terminate(); () }
  }
}

Write event

import _root_.akka.actor.Status.Failure
import _root_.akka.actor.{ ActorLogging, Actor, Props, ActorSystem }
import eventstore.core.util.uuid.randomUuid
import eventstore.akka.tcp.ConnectionActor

object WriteEventExample extends App {

  val system      = ActorSystem()
  val connection  = system.actorOf(ConnectionActor.props())
  val event       = EventData("my-event", eventId = randomUuid, data = Content("my event data"), metadata = Content("my first event"))

  implicit val writeResult = system.actorOf(Props(WriteResult))

  connection ! WriteEvents(EventStream.Id("my-stream"), List(event))

  case object WriteResult extends Actor with ActorLogging {

    def receive = {
      case WriteEventsCompleted(range, position) =>
        log.info("range: {}, position: {}", range, position)
        shutdown()

      case Failure(e: EsException) =>
        log.error(e.toString)
        shutdown()
    }

    def shutdown(): Unit = { context.system.terminate();  () }
  }
}

Start transaction

import _root_.akka.actor.{ActorSystem, Props}
import eventstore.core.util.uuid.randomUuid
import eventstore.akka.tcp.ConnectionActor
import eventstore.akka.TransactionActor._

object StartTransactionExample extends App {
  val system = ActorSystem()
  val connection = system.actorOf(ConnectionActor.props(), "connection")

  val kickoff = Start(TransactionStart(EventStream.Id("my-stream")))
  val transaction = system.actorOf(TransactionActor.props(connection, kickoff), "transaction")
  implicit val transactionResult = system.actorOf(Props[TransactionResult], "result")

  val data = EventData("transaction-event", eventId = randomUuid)

  transaction ! GetTransactionId // replies with `TransactionId(transactionId)`
  transaction ! Write(data) // replies with `WriteCompleted`
  transaction ! Write(data) // replies with `WriteCompleted`
  transaction ! Write(data) // replies with `WriteCompleted`
  transaction ! Commit // replies with `CommitCompleted`
}

Count all events

import _root_.akka.actor._
import scala.concurrent.duration._
import eventstore.akka.tcp.ConnectionActor

object CountAll extends App {
  val system = ActorSystem()
  val connection = system.actorOf(ConnectionActor.props(), "connection")
  val countAll = system.actorOf(Props[CountAll](), "count-all")
  system.actorOf(SubscriptionActor.props(connection, countAll, None, None, Settings.Default), "subscription")
}

class CountAll extends Actor with ActorLogging {
  context.setReceiveTimeout(1.second)

  def receive = count(0)

  def count(n: Long, printed: Boolean = false): Receive = {
    case _: IndexedEvent       => context become count(n + 1)
    case LiveProcessingStarted => log.info("live processing started")
    case ReceiveTimeout if !printed =>
      log.info("count {}", n)
      context become count(n, printed = true)
  }
}

Future-like api

import _root_.akka.actor.ActorSystem
import scala.concurrent.Future
import eventstore.core.util.uuid.randomUuid

object EsConnectionExample extends App {
  val system = ActorSystem()

  import system.dispatcher

  val connection = EsConnection(system)
  val log = system.log

  val stream = EventStream.Id("my-stream")

  val readEvent: Future[ReadEventCompleted] = connection(ReadEvent(stream))
  readEvent foreach { x =>
    log.info(x.event.toString)
  }

  val readStreamEvents: Future[ReadStreamEventsCompleted] = connection(ReadStreamEvents(stream))
  readStreamEvents foreach { x =>
    log.info(x.events.toString())
  }

  val readAllEvents: Future[ReadAllEventsCompleted] = connection(ReadAllEvents(maxCount = 5))
  readAllEvents foreach { x =>
    log.info(x.events.toString())
  }

  val writeEvents: Future[WriteEventsCompleted] = connection(WriteEvents(stream, List(EventData("my-event", eventId = randomUuid))))
  writeEvents foreach { x =>
    log.info(x.numbersRange.toString)
  }
}

EventStoreExtension

Most common use case is to have a single Event Store connection per application. Thus you can use our akka extension, it will make sure you have a single instance of connection actor.

EventStoreExtension(system).actor ! ReadEvent(EventStream.Id("stream"))
EventStoreExtension(system).connection(ReadEvent(EventStream.Id("stream")))

Streams

The client provides Akka Streams interface for EventStore subscriptions. You can find two methods allStreamsSource and streamSource available in Java and Scala APIs.

Here is a short example on how to use it:

List all streams

import _root_.akka.actor.ActorSystem

object ListAllStreamsExample extends App {
  implicit val system = ActorSystem()
  import system.dispatcher

  val connection = EventStoreExtension(system).connection
  val source = connection.streamSource(EventStream.System.`$streams`, infinite = false, resolveLinkTos = true)

  source
    .runForeach { x => println(x.streamId.streamId) }
    .onComplete { _ => system.terminate() }
}

Reactive Streams

You can use generic Reactive Streams Publisher interface for EventStore subscriptions, by converting an Akka Stream to Publisher. See: Integrating Akka Streams with Reactive Streams

Here is a short example on how to accomplish that:

import _root_.akka.actor.ActorSystem
import _root_.akka.stream.scaladsl._
import org.reactivestreams.{Publisher, Subscriber}
import scala.concurrent.duration._

object MessagesPerSecondReactiveStreams extends App {
  implicit val system = ActorSystem()

  val connection = EventStoreExtension(system).connection

  val publisher: Publisher[String] = connection.allStreamsSource()
    .groupedWithin(Int.MaxValue, 1.second)
    .map { xs => f"${xs.size.toDouble / 1000}%2.1fk m/s" }
    .runWith(Sink.asPublisher(fanout = false))

  val subscriber: Subscriber[String] = Source.asSubscriber[String]
    .to(Sink.foreach(println))
    .run()

  publisher.subscribe(subscriber)
}

Configuration

Default client settings defined in core reference.conf and client reference.conf. You can override them via own application.conf put in the src/main/resources, the same way you might already do for akka. We are using the same approach using the same configuration library.

Cluster

It is possible to use client against cluster of Event Store. For this you need to configure client via eventstore.cluster section in core reference.conf or ClusterSettings. Using application.conf for configuration is more preferable option.

More Repositories

1

EventStore

EventStoreDB, the event-native database. Designed for Event Sourcing, Event-Driven, and Microservices architectures
C#
5,021
star
2

EventStore-Client-NodeJS

A NodeJS client for Event Store
JavaScript
162
star
3

EventStore-Client-Dotnet

Dotnet Client SDK for the Event Store gRPC Client API written in C#
C#
131
star
4

eventstore-docker

Event Store Docker Container
Dockerfile
84
star
5

EventStore-Client-Go

Go Client for Event Store version 20 and above.
Go
81
star
6

EventStore.Akka.Persistence

Event Store Journal for Akka Persistence
Scala
74
star
7

EventStoreDB-Client-Java

Official Asynchronous Java 8+ Client Library for EventStoreDB 20.6+
Java
60
star
8

samples

Samples showing practical aspect of EventStoreDB, Event Sourcing
Java
59
star
9

EventStore.UI

The user interface for Event Store
JavaScript
56
star
10

ClientAPI.NetCore

.NET Core Client
C#
45
star
11

Sklaida

Sample application using Event Store
C#
42
star
12

EventStore.Charts

EventStore official Helm Charts
Shell
29
star
13

csharp-ldap

GitHub mirror of Novell's C# LDAP library (Novell.Directory.Ldap)
C#
23
star
14

replicator

Real-time replication tool
C#
19
star
15

training-advanced-go

Go
19
star
16

buffer-manager

A straightforward way to not make the GC angry in .NET and Mono
C#
17
star
17

training-advanced-dotnet

C#
17
star
18

Webinars

C#
16
star
19

esdb-tui

EventStoreDB Adminstration Tool.
Rust
10
star
20

training-advanced-java

Java
9
star
21

terraform-provider-eventstorecloud

Terraform Provider for Event Store Cloud
Go
8
star
22

esc

Event Store Cloud CLI
Rust
7
star
23

documentation

Next-gen documentation website
JavaScript
7
star
24

eventstore.org

CSS
6
star
25

Automations

Gathers GitHub actions used across the organization.
JavaScript
5
star
26

es-gencert-cli

Event Store Certificate Generation CLI
Go
4
star
27

training-introduction-nodejs

Introduction training material ( TypeScript)
TypeScript
4
star
28

training-introduction-java

Java
4
star
29

training-2021-09

C#
3
star
30

EventStore-Client-gRPC-TestData

Server container with pre-populated data set for testing Event Store gRPC Client SDKs
Dockerfile
3
star
31

training-introduction-dotnet

C#
3
star
32

eventstore-packer-templates

Shell
2
star
33

idsrv4

idsrv4
SCSS
2
star
34

nexus

Rust
2
star
35

eventstore-orleans

C#
2
star
36

Downloads

HTML
2
star
37

pulumi-eventstorecloud

Pulumi provider for Event Store Cloud
Python
2
star
38

HostStat.NET

.NET Library for collecting operating system metrics for Event Store https://www.eventstore.com
C#
2
star
39

Design-System

Event Store component library and frontend utilities
TypeScript
2
star
40

EventStoreDB-From-Scratch-.NET

This repo is for developers to read and write events to and from EventStoreDB. It is designed for individuals new to EventStoreDB, who want an easy on ramp to the platform. As a template repository, you can easily run the sample code in GitHub Codespaces and clone to start your own projects with EventStoreDB.
C#
2
star
41

esquery

C#
1
star
42

HashChecker

C#
1
star
43

training-simple-examples

Introductory examples on aggregates and read models
C#
1
star
44

download.geteventstore.com

Downloadable files and binaries
HTML
1
star
45

EventStoreDB-Client-Dotnet-Legacy

C#
1
star
46

EventStoreDB-From-Scratch-Python

This repo is for developers to read and write events to and from EventStoreDB. It is designed for individuals new to EventStoreDB, who want an easy on ramp to the platform. As a template repository, you can easily run the sample code in GitHub Codespaces and clone to start your own projects with EventStoreDB.
Shell
1
star