• Stars
    star
    101
  • Rank 338,166 (Top 7 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 7 years ago
  • Updated about 7 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Event sourcing for Akka Streams

Event sourcing for Akka Streams

Gitter Build Status

General concept

This project brings to Akka Streams what Akka Persistence brings to Akka Actors: persistence via event sourcing. It provides a stateful EventSourcing graph stage of type BidiFlow[REQ, E, E, RES, _] (simplified) which models the event sourcing message flow. An EventSourcing stage therefore

  • consumes and validates a request (command or query)
  • produces events (derived from a command and internal state) to be written to an event log
  • consumes written events for updating internal state
  • produces a response after internal state update

An EventSourcing stage maintains current state and uses a request handler for validating requests and generating events and responses:

  import com.github.krasserm.ases.EventSourcing._

  type RequestHandler[S, E, REQ, RES] = (S, REQ) => Emission[S, E, RES]

Request handler input is current state and a request, output is an instruction to emit events and/or a response. emit instructs the EventSourcing stage to emit events and call a responseFactory with current state after all emitted events have been written and applied to current state:

  def emit[S, E, RES](events: Seq[E], responseFactory: S => RES): Emission[S, E, RES]

respond instructs the EventSourcing stage to emit a response immediately without emitting any events:

  def respond[S, E, RES](response: RES): Emission[S, E, RES]

Methods emit and respond themselves are side-effect free. They only generate Emission values that are interpreted by the EventSourcing stage.

For updating internal state the EventSourcing stage uses an event handler:

  type EventHandler[S, E] = (S, E) => S

Event handler input is current state and a written event, output is updated state. Given definitions of emitter id, initial state, a request handler and an event handler, an EventSourcing stage can be constructed with:

  import akka.stream.scaladsl.BidiFlow
  import com.github.krasserm.ases.EventSourcing

  def emitterId: String  
  def initialState: S
  def requestHandler: RequestHandler[S, E, REQ, RES]
  def eventHandler: EventHandler[S, E]

  def eventSourcingStage: BidiFlow[REQ, E, E, RES, _] =
    EventSourcing(emitterId, initialState, requestHandler, eventHandler)

Event logs are modeled as Flow[E, E, _]. This project provides event log implementations that can use Akka Persistence journals or Apache Kafka as storage backends. Eventuate event logs as storage backends will be supported later. The following example uses the Akka Persistence in-memory journal as storage backend:

  import akka.stream.scaladsl.Flow
  import com.github.krasserm.ases.log.AkkaPersistenceEventLog

  val provider: AkkaPersistenceEventLog = 
    new AkkaPersistenceEventLog(journalId = "akka.persistence.journal.inmem")

  def persistenceId: String = 
    emitterId

  def eventLog: Flow[E, E, _] =
    provider.flow[E](persistenceId)

After materialization, an event log emits replayed events to its output port before emitting newly written events that have been received from its input port. To allow eventSourcingStage to use eventLog for writing and reading events it must join the event log:

  def requestProcessor: Flow[REQ, RES, _] =
    eventSourcingStage.join(eventLog)

The result is a stateful, event-sourced request processor of type Flow[REQ, RES, _] that processes a request stream. It will only demand new requests from upstream if there is downstream demand for responses and events. Any slowdown in response processing or event writing will back-pressure upstream request producers.

In the same way as PersistentActors in Akka Persistence, request processors provide a consistency boundary around internal state but additionally provide type safety and back-pressure for the whole event sourcing message flow.

The examples presented in this section are a bit simplified for better readability. Take a look at section Event logging protocols and the tests (e.g. EventSourcingSpec) for further details.

Dynamic request processor loading

When applying domain-driven design, consistency boundaries are often around so-called aggregates. A request processor that manages a single aggregate should be dynamically loaded and recovered when the first request targeted at that aggregate arrives.

This can be achieved with a Router. A router is configured with a function that extracts the aggregate id from a request and another function that creates a request processor from an aggregate id:

  import com.github.krasserm.ases.Router

  trait Aggregate[A] {
    def aggregateId(a: A): String
  }

  def eventSourcingStage(aggregateId: String): BidiFlow[REQ, E, E, RES, _] =
    EventSourcing(aggregateId, initialState, requestHandler, eventHandler)

  def requestProcessor(aggregateId: String): Flow[REQ, RES, _] =
    eventSourcingStage(aggregateId).join(provider.flow[E](aggregateId))

  def requestRouter(implicit agg: Aggregate[REQ]): Flow[REQ, RES, _] = {
    Router(req => agg.aggregateId(req), (aggregateId: String) => requestProcessor(aggregateId))
  }

A running example is in RequestRoutingSpec. Dynamic unloading of request processors (e.g. using an LRU policy) is not supported yet as this requires special support from Akka Streams.

Request processor collaboration (microservices)

In contrast to PersistentActors, EventSourcing stages can form a group by sharing an event log. Within a group, events emitted by one member can be consumed by all members in the group i.e. stages communicate via broadcast using the ordering guarantees of the underlying event log implementation. This feature can be used to implement event-sourced microservices that collaborate via events over a shared event log.

The following example defines a requestProcessor method that creates request processors that use a Kafka topic partition as shared event log. Request processors created with this method form a group whose members collaborate over the shared kafkaTopicPartition. All members consume events in the same order as a topic partition provides total ordering:

  import com.github.krasserm.ases.log.KafkaEventLog
  import org.apache.kafka.common.TopicPartition

  def kafkaHost: String
  def kafkaPort: Int
  def kafkaTopicPartition: TopicPartition

  val provider = new KafkaEventLog(kafkaHost, kafkaPort)

  def requestProcessor(emitterId: String): Flow[REQ, RES, _] =
    EventSourcing(emitterId, initialState, requestHandler, eventHandler)
      .join(provider.flow(kafkaTopicPartition))
  
  // A group of request processors    
  val requestProcessor1 = requestProcessor("processor1")
  val requestProcessor2 = requestProcessor("processor2")
  // ...

You can find a running example in EventCollaborationSpec (a replicated event-sourced counter). Collaboration of request processors is comparable to collaboration of EventsourcedActors in Eventuate (see event collaboration for details).

Handler switching

Applications can switch request and event handlers as a function of current state by defining request and event handler providers:

  def emitterId: String
  def initialState: S
  def requestHandlerProvider: S => RequestHandler[S, E, REQ, RES]
  def eventHandlerProvider: S => EventHandler[S, E]

  def eventSourcingStage: BidiFlow[REQ, E, E, RES, _] =
    EventSourcing(emitterId, initialState, requestHandlerProvider, eventHandlerProvider)

A request handler provider is called with current state for each request, an event handler provider is called with current state for each written event. This feature will be used later to implement a state machine DSL on top of the current handler API.

Event logging protocols

The examples so far modeled event logs as Flow[E, E, _] and EventSourcing stages as BidiFlow[REQ, E, E, RES, _] where E is the type of a domain event. This is not sufficient for real-world use cases. Further event metadata are required:

  • Events emitted by an EventSourcing stage must contain the id of the emitting stage (emitterId). This is needed, for example, by consumers on the query side of a CQRS application to distinguish emitters when consuming events from an aggregated or shared event log.
  • Events emitted by an event log must additionally contain the sequence number of the written event. Sequence numbers are required to track event processing progress, for example.
  • An event log must also signal to an EventSourcing stage when event replay has been completed. Only after successful recovery, an EventSourcing stage is allowed to signal demand for new requests to upstream producers.

For these reasons, the current implementation uses

Flow[Emitted[E], Delivery[Durable[E]], _]

as type for event logs and

BidiFlow[REQ, Emitted[E], Delivery[Durable[E]], RES, _]

as type for EventSourcing stages. Emitted, Durable and Delivery are defined in Protocol.scala. These protocol definitions are still preliminary and expected to change. For example, an extension to Emitted could support the emission of event batches for atomic batch writes.

Project status

In its current state, the project is a prototype that demonstrates the basic ideas how event sourcing and event collaboration could be added to Akka Streams. It can be used for experiments but is not yet ready for production. The prototype should serve as basis for further discussions and evolve to a potential later contribution to Akka, if there is enough interest in the community.

More Repositories

1

bayesian-machine-learning

Notebooks about Bayesian methods for machine learning
Jupyter Notebook
1,808
star
2

super-resolution

Tensorflow 2.x based implementation of EDSR, WDSR and SRGAN for single image super-resolution
Python
1,496
star
3

perceiver-io

A PyTorch implementation of Perceiver, Perceiver IO and Perceiver AR with PyTorch Lightning scripts for distributed training
Python
432
star
4

face-recognition

Deep face recognition with Keras, Dlib and OpenCV
Jupyter Notebook
377
star
5

machine-learning-notebooks

Stanford Machine Learning course exercises implemented with scikit-learn
Jupyter Notebook
341
star
6

fairseq-image-captioning

Transformer-based image captioning extension for pytorch/fairseq
Python
313
star
7

streamz

A combinator library for integrating Functional Streams for Scala (FS2), Akka Streams and Apache Camel
Scala
283
star
8

akka-analytics

Large-scale event processing with Akka Persistence and Apache Spark
Scala
274
star
9

akka-persistence-cassandra

A replicated Akka Persistence journal backed by Apache Cassandra
Scala
224
star
10

akka-persistence-kafka

A replicated Akka Persistence journal backed by Apache Kafka
Scala
201
star
11

grails-jaxrs

JAX-RS Plugin for Grails
Groovy
50
star
12

scalaz-camel

A Scala(z)-based DSL for Apache Camel
Scala
50
star
13

ipf

Open eHealth Integration Platform
Java
35
star
14

akka-persistence-testkit

Compatibility testkit for Akka Persistence storage plugins
Scala
21
star
15

bot-with-plan

Separation of planning concerns in ReAct-style LLM agents. Planner fine-tuning on synthetic trajectories.
Python
10
star
16

krasserm.github.io

Jupyter Notebook
9
star
17

camelinaction-appendix-e

akka-camel examples from book Camel in Action - Appendix E (adjusted to the most recent Akka release or development snapshot)
Scala
8
star
18

machine-learning-minis

Minimalistic example code for various machine learning and deep learning topics
Jupyter Notebook
8
star
19

ipf-labs

eHealth Integration Framework Labs
Java
7
star
20

ipf-runtime

OSGi-based runtime environment for IPF applications
Shell
6
star
21

ipf-tools

eHealth Integration Framework Tools
Java
6
star
22

sagemaker-tutorial

Multi-node, multi-GPU training with PyTorch Lightning on SageMaker
Python
5
star
23

eventuate-crdt-example

Example application that uses Eventuate's operation-based CRDTs
Scala
3
star
24

safr

Security Annotation Framework
Java
1
star