• Stars
    star
    828
  • Rank 55,086 (Top 2 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 12 years ago
  • Updated over 10 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A library for building reliable, scalable and distributed event-sourced applications in Scala

Build Status

Eventsourced

Scala version: 2.10.2
Akka version: 2.2.0

Contents

Overview

The Eventsourced library adds scalable actor state persistence and at-least-once message delivery guarantees to Akka. With Eventsourced, stateful actors

  • persist received messages by appending them to a log (journal)
  • project received messages to derive current state
  • usually hold current state in memory (memory image)
  • recover current (or past) state by replaying received messages (during normal application start or after crashes)
  • never persist current state directly (except optional state snapshots for recovery time optimization)

In other words, Eventsourced implements a write-ahead log (WAL) that is used to keep track of messages an actor receives and to recover its state by replaying logged messages. Appending messages to a log instead of persisting actor state directly allows for actor state persistence at very high transaction rates and supports efficient replication. In contrast to other WAL-based systems, Eventsourced usually keeps the whole message history in the log and makes usage of state snapshots optional.

Logged messages represent intended changes to an actor's state. Logging changes instead of updating current state is one of the core concept of event sourcing. Eventsourced can be used to implement event sourcing concepts but it is not limited to that. More details about Eventsourced and its relation to event sourcing can be found at How does Eventsourced persist actor state and how is this related to event sourcing.

Eventsourced can also be used to make message exchanges between actors reliable so that they can be resumed after crashes, for example. For that purpose, channels with at-least-once message delivery guarantees are provided. Channels also prevent that output messages, sent by persistent actors, are redundantly delivered during replays which is relevant for message exchanges between these actors and other services.

Building blocks

The core building blocks provided by Eventsourced are processors, channels and journals. These are managed by an Akka extension, the EventsourcingExtension.

Processor

A processor is a stateful actor that logs (persists) messages it receives. A stateful actor is turned into a processor by modifying it with the stackable Eventsourced trait during construction. A processor can be used like any other actor.

Messages wrapped inside Message are logged by a processor, unwrapped messages are not logged. Wrapped messages are often referred to as events in this user guide. Wrapped messages can also be commands, as explained in section Application.

Logging behavior is implemented by the Eventsourced trait, a processor's receive method doesn't need to care about that. Acknowledging a successful write to a sender can be done by sending a reply. A processor can also hot-swap its behavior by still keeping its logging functionality.

Processors are registered at an EventsourcingExtension. This extension provides methods to recover processor state by replaying logged messages. Processors can be registered and recovered at any time during an application run.

Eventsourced doesn't impose any restrictions how processors maintain state. A processor can use vars, mutable data structures or STM references, for example.

Channel

Channels are used by processors for sending messages to other actors (channel destinations) and receiving replies from them. Channels

  • require their destinations to confirm the receipt of messages for providing at-least-once delivery guarantees (explicit ack-retry protocol). Receipt confirmations are written to a log.
  • prevent redundant delivery of messages to destinations during processor recovery (replay of messages). Replayed messages with matching receipt confirmations are dropped by the corresponding channels.

A channel itself is an actor that decorates a destination with the aforementioned functionality. Processors usually create channels as child actors for decorating destination actor references.

A processor may also sent messages directly to another actor without using a channel. In this case that actor will redundantly receive messages during processor recovery.

Eventsourced provides three different channel types (more are planned).

  • Default channel
    • Does not store received messages.
    • Re-delivers uncomfirmed messages only during recovery of the sending processor.
    • Order of messages as sent by a processor is not preserved in failure cases.
  • Reliable channel
    • Stores received messages.
    • Re-delivers unconfirmed messages based on a configurable re-delivery policy.
    • Order of messages as sent by a processor is preserved, even in failure cases.
    • Often used to deal with unreliable remote destinations.
    • Can recover from crashes of the JVM it is running in.
  • Reliable request-reply channel
    • Same as reliable channel but additionally guarantees at-least-once delivery of replies.
    • Order of replies not guaranteed to correspond to the order of sent request messages.

Eventsourced channels are not meant to replace any existing messaging system but can be used, for example, to reliably connect processors to such a system, if needed. More generally, they are useful to integrate processors with other services, as described in this article.

Journal

A journal is an actor that is used by processors and channels to log messages and receipt confirmations. The quality of service (availability, scalability, ...) provided by a journal depends on the used storage technology. The Journals section below gives an overview of existing journal implementations and their development status.

Application

The Eventsourced library doesn't impose any restrictions on the structure and semantics of Message payloads. Hence, persistent messages can therefore be events as well as commands. Both can be seen as facts about the interaction of an application with its environment. This is demonstrated in the Eventsourced reference application which persists events as well as commands. This also simplifies the implementation of long-running, persistent business processes (sometimes referred to as sagas). These are processors that react on events by sending commands to other services which can be other processors or external services, for example.

Eventsourced fits well into applications that implement the CQRS pattern and follow a domain-driven design (DDD) (see reference application). On the other hand, the library doesn't force applications to do so and allows them to implement event-sourcing (and/or command-sourcing) without CQRS and/or DDD.

Journals

For persisting messages, Eventsourced currently provides the following journal implementations:

Journal Usage
LevelDB journal. It can be configured either with a native LevelDB (accessed via leveldbjni) or a LevelDB Java port as storage backend. Running a native LevelDB from sbt requires special settings. All examples in this user guide use the LevelDB Java port. Production
HBase journal. An HBase backed journal supporting high-availability, horizontal read and write scalability, concurrent and non-blocking reads and writes. Details here. Experimental
MongoDB Casbah based journal. A MongoDB backed journal. Details here. Thanks to Duncan DeVore. Experimental
MongoDB Reactive based journal. A MongoDB backed journal. Details here. Thanks to Duncan DeVore. Experimental
DynamoDB journal. A DynamoDB backed journal. Details here. Thanks to Scott Clasen. Experimental
Journal.IO journal. Journal.IO backed journal for testing purposes. Messages are persisted. Testing
In memory journal. An in-memory journal for testing purposes. Messages are not persisted. Testing

Terminology

In the following, the terms persistent actor, event-sourced actor, event-sourced processor and processor are used interchangeably. Furthermore, a Message is often referred to as event message.

First steps

This section guides through the minimum steps required to create, use and recover an event-sourced actor and demonstrates the usage of channels. Code from this section is contained in FirstSteps.scala and FirstSteps.java. It can be executed from the sbt prompt with

Scala:

> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.FirstSteps

Java:

> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.japi.FirstSteps

The example in this section and all further examples use a journal that is backed by a LevelDB Java port. For running a native LevelDB instance from sbt, additional settings are required. A legend to the figures used in this and other sections is in Appendix A.

Step 1: EventsourcingExtension initialization

EventsourcingExtension is an Akka extension provided by the Eventsourced library. It is used by applications to

  • create and register event-sourced actors (called processors or event processors)
  • create and register channels
  • recover registered processors and channels from journaled event messages

An EventsourcingExtension is initialized with an ActorSystem and a journal ActorRef.

Scala:

import java.io.File
import akka.actor._
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.journal.leveldb._

val system: ActorSystem = ActorSystem("example")
val journal: ActorRef = LeveldbJournalProps(new File("target/example-1"), native = false).createJournal
val extension: EventsourcingExtension = EventsourcingExtension(system, journal)

Java:

import java.io.File;
import akka.actor.*;
import org.eligosource.eventsourced.core.*;
import org.eligosource.eventsourced.journal.leveldb.*;

final ActorSystem system = ActorSystem.create("guide");
final ActorRef journal = LeveldbJournalProps.create(new File("target/guide-1-java")).withNative(false).createJournal(system);
final EventsourcingExtension extension = EventsourcingExtension.create(system, journal);

This example uses a LevelDB journal but any other journal implementation can be used as well.

Step 2: Event-sourced actor definition

With the Scala API, event-sourced actors can be defined as 'plain' actors. With the Java API, event-sourced actors need to extend the abstract UntypedEventsourcedActor class. For example,

Scala:

class Processor extends Actor {
  var counter = 0

  def receive = {
    case msg: Message => {
      counter = counter + 1
      println("[processor] event = %s (%d)" format (msg.event, counter))
    }
  }
}

Java:

public class Processor extends UntypedEventsourcedActor {
    private int counter = 0;

    @Override
    public int id() {
        return 1;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Message) {
            Message msg = (Message)message;
            counter = counter + 1;
            System.out.println(String.format("[processor] event = %s (%d)", msg.event(), counter));
        }
    }
}

is an actor that counts the number of received event Messages. In Eventsourced applications, events are always communicated (transported) via event Messages.

Step 3: Event-sourced actor creation and recovery

To make the Scala Processor an event-sourced actor, it must be modified with the stackable Eventsourced trait during instantiation. The Java Processor already extends UntypedEventsourcedActor class, so no further modification is needed.

Scala:

// create and register event-sourced processor
val processor: ActorRef = extension.processorOf(Props(new Processor with Eventsourced { val id = 1 } ))

// recover registered processors by replaying journaled events
extension.recover()

Java:

// create and register event-sourced processor
final ActorRef processor = extension.processorOf(Props.create(Processor.class), system);

// recover registered processors by replaying journaled events
extension.recover();

An actor that is modified with Eventsourced (or extends UntypedEventsourcedActor) writes event Messages to a journal before its receive method (or onReceive method, respectively) is called. The processorOf method registers that actor under a unique id. The processor id is defined by implementing the abstract Eventsourced.id member which must be a positive integer and consistently re-used across applications runs. The recover method recovers the state of processor by replaying all event messages that processor received in previous application runs.

Step 4: Event-sourced actor usage

The event-sourced processor can be used like any other actor. Messages of type Message are written to the journal, messages of any other type are directly received by processor without being journaled.

Event-sourced actor

Scala:

// send event message to processor (will be journaled)
processor ! Message("foo")

Java:

// send event message to processor (will be journaled)
processor.tell(Message.create("foo"), null);

A first application run will create an empty journal. Hence, no event messages will be replayed and the processor writes

[processor] event = foo (1)

to stdout. When the application is restarted, however, the processor's state will be recovered by replaying the previously journaled event message. Then, the application sends another event message. You will therefore see

[processor] event = foo (1)
[processor] event = foo (2)

on stdout where the first println is triggered by a replayed event message.

Step 5: Channel usage

Channel

In this step, the event-sourced processor is extended to send out new event messages to a destination. It creates another event message (by making a copy of the received event message) with an updated event field and sends the updated message to destination.

Scala:

class Processor(destination: ActorRef) extends Actor {
  var counter = 0;

  def receive = {
    case msg: Message => {
      counter = counter + 1
      // …
      destination ! msg.copy(event = "processed %d event messages so far" format counter)
    }
  }
}

val destination: ActorRef = system.actorOf(Props[Destination])
// instantiate processor by passing the destination as constructor argument
val processor: ActorRef = extension.processorOf(Props(new Processor(destination) with Eventsourced { val id = 1 } ))

extension.recover()

Java:

public class Processor extends UntypedEventsourcedActor {
    private ActorRef destination;
    private int counter = 0;

    public Processor(ActorRef destination) {
        this.destination = destination;
    }

    @Override
    public int id() {
        return 1;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Message) {
            Message msg = (Message)message;
            counter = counter + 1;
            // …
            destination.tell(msg.withEvent(String.format("processed %d event messages so far", counter)), getSelf());
        }
    }
}

final ActorRef destination = system.actorOf(Props.create(Destination.class));
// instantiate processor by passing the destination as constructor argument
final ActorRef processor = extension.processorOf(Props.create(Processor.class, destination), system);

extension.recover();

Without any further actions, this would also send event messages to destination during recovery (i.e. during replay of event messages). With every application restart, destination would redundantly receive the whole event message history again and again. This is not acceptable in most cases, such as when destination represents an external service, for example.

To prevent redundant message delivery to destination we need something that remembers which messages have already been successfully delivered. This is exactly the use case for channels. A channel drops all messages that have already been successfully delivered to a destination. We therefore wrap destination by a channel and let the processor communicate with the destination via that channel. This can be done without changing the code of Processor.

Scala:

val destination: ActorRef = system.actorOf(Props[Destination])
// wrap destination by channel
val channel: ActorRef = extension.channelOf(DefaultChannelProps(1, destination))
// instantiate processor by passing the channel (i.e. wrapped destination) as constructor argument
val processor: ActorRef = extension.processorOf(Props(new Processor(channel) with Eventsourced { val id = 1 } ))

Java:

final ActorRef destination = system.actorOf(Props.create(Destination.class));
// wrap destination by channel
final ActorRef channel = extension.channelOf(DefaultChannelProps.create(1, destination), system);
// instantiate processor by passing the channel (i.e. wrapped destination) as constructor argument
final ActorRef processor = extension.processorOf(Props.create(Processor.class, channel), system);

A channel must have a unique id (1 in our example), a positive integer that must be consistently defined across application runs. Here, we create a default channel that is configured with a DefaultChannelProps configuration object. If applications need reliable event message delivery to destinations, they should use a reliable channel that is configured with a ReliableChannelProps configuration object.

Assuming the following definition of a Destination actor

Scala:

class Destination extends Actor {
  def receive = {
    case msg: Message => {
      println("[destination] event = '%s'" format msg.event)
      // confirm receipt of event message from channel
      msg.confirm()
    }
  }
}

Java:

public class Destination extends UntypedActor {
    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Message) {
            Message msg = (Message)message;
            System.out.println(String.format("[destination] event = %s", msg.event()));
            msg.confirm(true);
        }
    }
}

and that we're starting again from an empty journal, you should see

[processor] event = foo (1)
[destination] event = 'processed 1 event messages so far'

on stdout during a first application run. When running the application again, you'll see that the event-sourced processor receives the complete event message history but the destination only receives the last event message produced by processor (which corresponds the the single event message sent to processor during the current application run):

[processor] event = foo (1)
[processor] event = foo (2)
[destination] event = 'processed 2 event messages so far'

When receiving event messages from a channel, destinations must confirm the receipt of that message by calling Message.confirm() which asynchronously writes a confirmation (an acknowledgement) to the journal that the message has been successfully delivered. Later, you'll also see how confirmation functionality can be added to destinations with the stackable Confirm trait.

This First steps guide is a rather low-level introduction to the Eventsourced library. More advanced library features are covered in the following sections.

Stackable traits

Eventsourced

Eventsourced

The Eventsourced trait has already been discussed in section First steps. It can be combined with the stackable Receiver, Emitter and/or Confirm traits where the Eventsourced trait must always be the last modification i.e.

Scala:

new MyActor with Receiver with Confirm with Eventsourced

Java:

public class MyActor extends UntypedEventsourcedConfirmingReceiver

The Eventsourced Java API provides some predefined combinations of stackable traits as abstract base classes. For example, UntypedEventsourcedConfirmingReceiver is defined as

abstract class UntypedEventsourcedReceiver extends UntypedActor with Receiver with Confirm with Eventsourced

Other predefined combinations of stackable traits in the Java API are described in the following subsections. Refer to the Untyped* abstract classes in the API docs for all predefined combinations.

Receiver

Receiver

An actor that receives event Messages often wants to pattern-match against the contained event directly instead of the whole event message. This can be achieved by modifying it with the Receiver trait during instantiation (Scala API) or extending the abstract UntypedReceiver class (Java API).

Scala:

class MyActor extends Actor {
  def receive = {
    case event => println("received event %s" format event)
  }
}

val myActor = system.actorOf(Props(new MyActor with Receiver))

myActor ! Message("foo")

Java:

public class MyActor extends UntypedReceiver {
    @Override
    public void onReceive(Object event) throws Exception {
        System.out.println(String.format("received event = %s", event));
    }
}

final ActorRef myActor = system.actorOf(Props.create(MyActor.class));

myActor.tell(Message.create("foo"), null);

In the above example, sending Message("foo") to myActor will write received event foo to stdout. The Receiver trait stores the received event message as current event message in a field, extracts the contained event from that message and calls the receive (or onReceive) method of MyActor with event as argument. If MyActor wants to have access to the current event message it must be defined with a Receiver self-type and call the message method (Scala API) or just call the message() method (Java API).

Scala:

class MyActor extends Actor { this: Receiver =>
  def receive = {
    case event => {
      // obtain current event message
      val currentMessage = message
      // …
      println("received event %s" format event)
    }
  }
}

Java:

public class MyActor extends UntypedReceiver {
    @Override
    public void onReceive(Object event) throws Exception {
        // obtain current event message
        Message currentMessage = message();
        // …
        System.out.println(String.format("received event = %s", event));
    }
}

The Receiver trait can also be combined with the stackable Eventsourced and/or Confirm traits where Receiver must always be the first modification. For example:

Scala:

new MyActor with Receiver with Eventsourced

Java:

public class MyActor extends UntypedEventsourcedReceiver

Refer to the API docs for further details.

Emitter

Emitter

Where a Receiver modification allows actors to pattern-match against incoming events directly instead of whole event Messages, an Emitter introduces a corresponding simplification on the sending (outgoing) side. It allows actors to send (emit) events to channels without having to deal with whole event Messages. An emitter can also lookup channels by name (or id, see below).

Scala:

class MyActor extends Actor { this: Emitter =>
    def receive = {
      case event => {
        // emit event to channel "myChannel"
        emitter("myChannel") sendEvent ("received: %s" format event)
      }
    }
  }

// create register channel under name "myChannel"
extension.channelOf(DefaultChannelProps(1, destination).withName("myChannel"))

val myActor = system.actorOf(Props(new MyActor with Emitter))

Java:

public class MyActor extends UntypedEmitter {
    @Override
    public void onReceive(Object event) throws Exception {
        // emit event to channel "myChannel"
        emitter("myChannel").sendEvent(String.format("received: %s", event), getSelf());
    }
}

// create register channel under name "myChannel"
extension.channelOf(DefaultChannelProps.create(1, destination).withName("myChannel"), system);

final ActorRef myActor = extension.processorOf(Props.create(MyActor.class), system);

Event messages sent by an emitter to a channel are always derived from (i.e. are a copy of) the current event message (an Emitter is also Receiver and maintains a current event message, see also section Receiver). A call to the emitter method with a channel name as argument creates a MessageEmitter object that captures the named channel and the current event message. Calling sendEvent on that object modifies the captured event message with the specified event argument and sends the updated event message to the channel (see also channel usage hints). A MessageEmitter object can also be sent to other actors (or threads) and be used there i.e. a MessageEmitter object is thread-safe. Channels can also be referred to by id when creating a MessageEmitter i.e. there's no need to define a custom channel name:

Scala:

class MyActor extends Actor { this: Emitter =>
  def receive = {
    case event => {
      // emit event to channel with id 1
      emitter(1) sendEvent ("received: %s" format event)
    }
  }
}

// create register channel
extension.channelOf(DefaultChannelProps(1, destination))

Java:

public class MyActor extends UntypedEmitter {
    @Override
    public void onReceive(Object event) throws Exception {
        // emit event to channel with id 1
        emitter(1).sendEvent(String.format("received: %s", event), getSelf());
    }
}

// create register channel
extension.channelOf(DefaultChannelProps.create(1, destination), system);

The Emitter trait can also be combined with the stackable Eventsourced and/or Confirm traits where Emitter must always be the first modification. For example:

Scala:

new MyActor with Emitter with Eventsourced

Java:

public class MyActor extends UntypedEventsourcedEmitter

Refer to the API docs for further details.

Confirm

Confirm

The receipt of event messages from channels must be confirmed by calling confirm() or confirm(true) on the received event Message. Applications can also negatively confirm an event message receipt by calling confirm(false). This, for example, causes a reliable channel to redeliver the event message.

Instead of calling confirm(true) or confirm(false) explicitly, actors can also be modified with the stackable Confirm trait. This trait calls confirm(true) on the received event message when the modified actor's receive (or onReceive) method returns normally and confirm(false) when it throws an exception.

This trait can either be used standalone

Scala:

new MyActor with Confirm

Java:

public class MyActor extends UntypedConfirmingActor

or in combination with the stackable Receiver, Emitter and/or Eventsourced traits where the Confirm modification must be made after a Receiver or Emitter modification but before an Eventsourced modification. For example:

Scala:

new MyActor with Receiver with Confirm with Eventsourced

Java:

public class MyActor extends UntypedEventsourcedConfirmingReceiver

Refer to the API docs for further details.

Modified example

Example

This section modifies (and simplifies) the example from section First steps by making use of the stackable Receiver, Emitter and Confirm traits. In particular, for the Scala API

  • Processor will be modified with Emitter (in addition to Eventsourced)
  • Destination will be modified with Receiver and Confirm

For the Java API

  • Processor will extend UntypedEventsourcedEmitter
  • Destination will extend UntypedConfirmingReceiver

Code from this section is contained in StackableTraits.scala and StackableTraits.java. It can be executed from the sbt prompt with

Scala:

> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.StackableTraits

Java:

> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.japi.StackableTraits

The new Scala definition of Processor now has a self-type Emitter and pattern-matches against events directly. The Java definition of Processor extends UntypedEventsourcedEmitter and also receives events directly (instead of Messages).

Scala:

class Processor extends Actor { this: Emitter =>
  var counter = 0

  def receive = {
    case event => {
      counter = counter + 1
      println("[processor] event = %s (%d)" format (event, counter))
      emitter("destination") sendEvent ("processed %d events so far" format counter)
    }
  }
}

Java:

public class Processor extends UntypedEventsourcedEmitter {
    private int counter = 0;

    @Override
    public int id() {
        return 1;
    }

    @Override
    public void onReceive(Object event) throws Exception {
        counter = counter + 1;
        System.out.println(String.format("[processor] event = %s (%d)", event, counter));
        emitter("destination").sendEvent(String.format("processed %d event messages so far", counter), getSelf());
    }
}

Instead of passing the channel via the constructor it is now looked-up by name ("destination"). The channel name is specified during channel creation.

Scala:

extension.channelOf(DefaultChannelProps(1, destination).withName("destination"))

Java:

extension.channelOf(DefaultChannelProps.create(1, destination).withName("destination"), system);

The Scala Processor must be instantiated with an additional Emitter modification to conform to the Processor self-type. No further modification is needed for the Java Processor.

Scala:

val processor: ActorRef = extension.processorOf(Props(new Processor with Emitter with Eventsourced { val id = 1 } ))

Java:

final ActorRef processor = extension.processorOf(Props.create(Processor.class), system);

The new definition of Destination

Scala:

class Destination extends Actor {
  def receive = {
    case event => {
      println("[destination] event = '%s'" format event)
    }
  }
}

Java:

public class Destination extends UntypedConfirmingReceiver {
    @Override
    public void onReceive(Object event) throws Exception {
        System.out.println(String.format("[destination] event = %s", event));
    }
}

pattern-matches against events directly and leaves event message receipt confirmation to the Confirm trait. The Scala Destination must be instantiated with a Receiver and a Confirm modification and again, no further modification is needed for the Java Destination.

Scala:

val destination: ActorRef = system.actorOf(Props(new Destination with Receiver with Confirm))

Java:

final ActorRef destination = system.actorOf(Props.create(Destination.class));

Sender references

The Eventsourced library preserves sender references for all

  • message exchanges with actors that are modified with Eventsourced, Receiver, Emitter and/or Confirm (or extend any of the abstract Untyped* base classes from the Java API) and
  • message exchanges with destination actors via channels

i.e. event-sourced actor applications can make use of sender references in the same way as plain actor applications. If you know how sender references work with Akka actors, the following will sound familiar to you.

Processor reply

For example, taking the code from section First steps as a starting point, Processor can be extended to reply to message senders as follows.

Scala:

class Processor(destination: ActorRef) extends Actor {
  // …

  def receive = {
    case msg: Message => {
      // …
      // reply to sender
      sender ! ("done processing event = %s" format msg.event)
    }
  }
}

Java:

public class Processor extends UntypedEventsourcedActor {
    // …

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Message) {
            // …
            getSender().tell(String.format("done processing event = %s", msg.event()), getSelf());
        }
    }
}

Applications can now ask the processor and will get a response asynchronously.

Scala:

processor ? Message("foo") onSuccess {
  case response => println(response)
}

Java:

ask(processor, Message.create("foo"), 5000L).onSuccess(new OnSuccess<Object>() {
    @Override
    public void onSuccess(Object response) throws Throwable {
        System.out.println(response);
    }
}, system.dispatcher());

No surprise here. The sender reference in this example represents the future that is returned from the ? or ask method call. But what happens during a replay? During a replay, the sender reference will be deadLetters because Eventsourced processors don't store sender references in the journal. The main reason for this is that applications usually do not want to redundantly reply to senders during replays.

Destination reply

Instead of replying to the sender, the processor can also forward the sender reference to a destination and let the destination reply to the sender. This even works if the destination is wrapped by a channel because a channel simply forwards sender references when delivering event messages to destinations. For that reason, a ReliableChannel needs to store sender references (in contrast to processors), so that sender references are even available after a reliable channel has been restarted. If a stored sender reference is a remote reference, it remains valid even after recovery from a JVM crash (i.e. a crash of the JVM the reliable channel is running in) provided the remote sender is still available.

Scala:

class Processor(destination: ActorRef) extends Actor {
  var counter = 0

  def receive = {
    case msg: Message => {
      // …
      // forward modified event message to destination (together with sender reference)
      destination forward msg.copy(event = "processed %d event messages so far" format counter)
    }
  }
}

class Destination extends Actor {
  def receive = {
    case msg: Message => {
      // …
      // reply to sender
      sender ! ("done processing event = %s (%d)" format msg.event)
    }
  }
}

val destination: ActorRef = system.actorOf(Props[Destination])
val channel: ActorRef = extension.channelOf(DefaultChannelProps(1, destination))
val processor: ActorRef = extension.processorOf(Props(new Processor(channel) with Eventsourced { val id = 1 } ))

Java:

public class Processor extends UntypedEventsourcedActor {
    private ActorRef destination;
    private int counter = 0;

    public Processor(ActorRef destination) {
        this.destination = destination;
    }

    @Override
    public int id() {
        return 1;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Message) {
            Message msg = (Message)message;
            // forward modified event message to destination (together with sender reference)
            destination.forward(msg.withEvent(String.format("processed %d event messages so far", counter)), getContext());
        }
    }
}

public class Destination extends UntypedActor {
    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Message) {
            Message msg = (Message)message;
            // …
            // reply to sender
            getSender().tell(String.format("done processing event = %s", msg.event()), getSelf());
        }
    }
}

final ActorRef destination = system.actorOf(Props.create(Destination.class));
final ActorRef channel = extension.channelOf(DefaultChannelProps.create(1, destination), system);
final ActorRef processor = extension.processorOf(Props.create(Processor.class, channel), system);

When using a MessageEmitter (see also section Emitter) applications can choose between methods sendEvent and forwardEvent where sendEvent takes an (implicit) sender reference as parameter and forwardEvent forwards the current sender reference. They work in the same way as the tell (!) and forward methods on ActorRef, respectively.

Code from this section is contained in SenderReferences.scala and SenderReferences.java. It can be executed from the sbt prompt with

Scala:

> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.SenderReferences

Java:

> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.japi.SenderReferences

Channels

A channel is an actor that keeps track of successfully delivered event messages. Channels are used by event-sourced actors (processors) to prevent redundant message delivery to destinations during event message replay. Some channels can also be used standalone i.e. independent of event-sourced actors.

A use case for channels is described in section External Updates of Martin Fowler's Event Sourcing article. A getting-started example is given in section Channel usage of the First steps guide

Currently, the library provides two different channel implementations, DefaultChannel and ReliableChannel, and a pattern on top of ReliableChannel, a reliable request-reply channel. These are explained in the following subsections.

DefaultChannel

Default channel

A default channel is a transient channel that delivers event messages to a destination actor. When the destination confirms the delivery of an event message by calling either confirm() or confirm(true) on the received Message object, a confirmation (an acknowledgement) is asynchronously written to the journal. During a replay, event messages for which a confirmation exists won't be delivered again to the destination.

Event messages that are negatively confirmed by the destination (via a call to confirm(false) on the received event message) will be re-delivered during the next event message replay. This is also the case for event messages for which no confirmation has been made. Therefore, in cases of negative or missing confirmations, the order of event messages received by a destination from a default channel may differ from the order of event messages produced by an event-sourced processor.

A DefaultChannel is created and registered at an EventsourcingExtension as follows.

val extension: EventsourcingExtension = …
val destination: ActorRef = …
val channelId: Int = …

val channel: ActorRef = extension.channelOf(DefaultChannelProps(channelId, destination))

The channelId must be a positive integer and consistently defined across application runs. The map of registered channels can be obtained via the channels method of EventsourcingExtension which returns a map of type Map[Int, ActorRef] where the mapping key is the channel id. Channels can optionally be registered under a custom name (see also section Emitter).

// …
val channelId: Int = …
val channelName: String = …

val channel: ActorRef = extension.channelOf(DefaultChannelProps(channelId, destination).withName(channelName))

The map of registered named channels can be obtained via the namedChannels method which returns a map of type Map[String, ActorRef] where the mapping key is the channel name.

A default channel preserves sender references. Applications can therefore use ? and forward as well to communicate with channel destinations via channel actor refs. However, special care must be taken when using ?: replayed messages that have already been confirmed by a channel destination will be ignored by the corresponding channel and the destination cannot reply. Hence, the sender will see a reply timeout. This can be avoided by finding out in advance if the channel will ignore a message or not. Applications do that by investigating the Message.acks list. If the channel's id is contained in that list, the message will be ignored and should not be used for asking.

val channelId: Int = …
val channel: ActorRef = …

if (!message.acks.contains(channelId)) channel ? message.copy(…) onComplete {
  case result => …
}

See also usage-hints regarding message.copy(…).

ReliableChannel

Reliable channel

A reliable channel is a persistent channel that writes event messages to a journal before delivering them to a destination actor. In contrast to a default channel, a reliable channel preserves the order of messages as produced by an event-sourced processor and attempts to re-deliver event messages on destination failures. Therefore, a reliable channel enables applications to recover from temporary destination failures without having to run an event message replay. Furthermore, a reliable channel can also recover from crashes of the JVM it is running in. This allows applications to guarantee at-least-once message delivery in case of both, destination failures and sender failures.

If a destination positively confirms the receipt of an event message, the stored message is removed from the channel and the next one is delivered. If a destination negatively confirms the receipt of an event message or if no confirmation is made (i.e. a timeout occurs), a re-delivery attempt is made after a certain redelivery delay. If the maximum number of re-delivery attempts have been made, the channel restarts itself after a certain restart delay and starts again with re-deliveries. If the maximum number of restarts has been reached, the channel stops message delivery and publishes a DeliveryStopped event to the event stream of the actor system this channel belongs to. Applications can then re-activate the channel by calling the deliver(Int) method of EventsourcingExtension with the channel id as argument. Refer to the ReliableChannel API docs for details.

A ReliableChannel is created and registered in the same way as a default channel except that a ReliableChannelProps configuration object is used.

// …
val channel: ActorRef = extension.channelOf(ReliableChannelProps(channelId, destination))

This configuration object additionally allows applications to configure a RedeliveryPolicy for the channel.

A reliable channel preserves sender references. Applications can therefore use ? and forward as well to communicate with channel destinations via channel actor refs. Details have already been described in the default channel section. A reliable channel also stores sender references along with event messages so that they can be forwarded to destinations even after the channel has been restarted. If a stored sender reference is a remote reference, it remains valid even after recovery from a JVM crash (i.e. a crash of the JVM the reliable channel is running in) provided the remote sender is still available.

For those familiar with Akka, a reliable channel is similar to an Akka reliable proxy except that it additionally can recover from sender JVM crashes (see also section Remote destinations).

Reliable request-reply channel

Reliable request-reply channel

A reliable request-reply channel is a pattern implemented on top of a reliable channel. It mediates reliable request-reply interactions between a request sender (usually an Eventsourced processor) and a destination. This channel has the following properties in addition to a plain reliable channel. It

  • extracts requests from received Messages before sending them to the destination.
  • wraps replies from the destination into a Message before sending them back to the request sender.
  • sends a special DestinationNotResponding reply to the request sender if the destination doesn't reply within a configurable reply timeout.
  • sends a special DestinationFailure reply to the request sender if destination responds with Status.Failure.
  • guarantees at-least-once delivery of replies to the request sender (in addition to at-least-once delivery of requests to the destination).
  • requires a positive receipt confirmation for a reply to mark a request-reply interaction as successfully completed.
  • redelivers requests, and subsequently replies, on missing or negative receipt confirmations.

A reliable request-reply channel is created and registered in the same way as a reliable channel except that a ReliableRequestReplyChannelProps configuration object is used.

// …
import org.eligosource.eventsourced.patterns.reliable.requestreply._

val channel: ActorRef = extension.channelOf(ReliableRequestReplyChannelProps(channelId, destination))

This configuration object additionally allows applications to configure a replyTimeout for replies from the destination. A detailed usage example of a reliable request-reply channel is given in this article.

Usage hints

General

Channels must be activated before usage, see extension.deliver().

Eventsourced usage

For channels to work properly, event-sourced processors must copy the processorId and sequenceNr values from a received (and journaled) input event message to output event messages. This is usually done by calling copy() on the received input event message and updating only those fields that are relevant for the application such as event or ack, for example:

class Processor(channel: ActorRef) extends Actor {
  def receive = {
    case msg: Message => {
      // …
      channel ! msg.copy(event = …, ack = …)
    }
  }
}

When using a message emitter, this is done automatically.

Standalone usage

Reliable channels and reliable request-reply channels can also be used independently of Eventsourced processors (i.e. standalone). For standalone channel usage, senders must set the Message.processorId of the sent Message to 0 (which is the default value):

val channel = extension.channelOf(ReliableChannelProps(…))

channel ! Message("my event") // processorId == 0

This is equivalent to directly sending the Message.event:

channel ! "my event"

A reliable channel internally wraps a received event into a Message with processorId set to 0. Setting the processorId to 0 causes a reliable channel to skip writing an acknowledgement. An acknowledgement always refers to an event message received by an Eventsourced processor, so there's no need to write one in this case. Another (unrelated) use case for turning off writing acknowledgements is the emission of event message series in context of event-sourced channel usage.

Remote destinations

Applications should consider using reliable channels whenever a sender processor should deliver messages to a remote destination at-least-once and in sending order, even in cases of

  1. network errors between sender and destination (remote actor ref remains valid but remote actor is temporarily unavailable)
  2. destination JVM crashes (remote actor ref becomes invalid) and
  3. sender JVM crashes (messages already received by a sender processor but not yet delivered to the remote destination must be automatically delivered when the sender recovers from a crash)

Using a remote actor ref as channel destination would work in case 1 but not in case 2. One possible way to deal with case 2, is to use a local actor (a destination proxy) that communicates with the remote destination via an ActorSelection. An ActorSelection can be created from an actor path and is not bound to the remote destination's life-cycle.

class DestinationProxy(destinationPath: String) extends Actor {
  val destinationSelection: ActorSelection = context.actorSelection(destinationPath)

  def receive = {
    case msg => destinationSelection tell (msg, sender) // forward
  }
}

val destinationPath: String = …
val proxy = system.actorOf(Props(new DestinationProxy(destinationPath)))
val channel = extension.channelOf(ReliableChannelProps(1, proxy))

Using an ActorSelection also covers case 1, of course. Case 3 is automatically covered by the fact that a sender processor uses a reliable channel for sending messages to a destination. Here's an example:

class Sender extends Actor {
  val id = 1
  val ext = EventsourcingExtension(context.system)
  val proxy = context.actorOf(Props(new DestinationProxy("akka.tcp://[email protected]:2852/user/destination")))
  val channel = ext.channelOf(ReliableChannelProps(1, proxy))

  def receive = {
    case msg: Message => channel forward msg
  }
}

// create and recover sender and its channel
val sender = extension.processorOf(Props(new Sender with Eventsourced))

sender ! Message("hello")

Special care must be taken if the remote destination actor is an Eventsourced actor. In this case, the application must ensure that the destination actor can only be accessed remotely after it has been successfully recovered. This can be achieved, for example, by using an additional endpoint actor that simply forwards to the destination actor. The endpoint actor is registered under the destination path after the destination actor has been successfully recovered.

class DestinationEndpoint(destination: ActorRef) extends Actor {
  def receive = {
    case msg => destination forward msg
  }
}

class Destination extends Actor {
  val id = 2

  def receive = {
    case msg: Message => {
      println(s"received ${msg.event}")
      msg.confirm()
    }
  }
}

val destination = extension.processorOf(Props(new Destination with Eventsourced))

// wait for destination recovery to complete
extension.recover()

// make destination remotely accessible after recovery
system.actorOf(Props(new DestinationEndpoint(destination)), "destination")

This ensures that new remote messages never interleave with messages that are replayed to the destination actor during recovery.

Code from this section is contained in ReliableChannelExample.scala. The sender application can be started from sbt with

> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.Sender

The (remote) destination can be started with

> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.Destination

The sender application prompts the user to enter messages on stdin which are then reliably sent to the remote destination.

Channel alternatives

A less reliable alternative to channels is communication via sender references. This means producing event messages to destinations that have been passed to a processor via sender references (along with an input event message). These sender references will be deadLetters during a replay which also prevents redundant delivery. The main difference, however, is that the delivery guarantee changes from at-least-once to at-most-once.

Recovery

Recovery is a procedure that re-creates the state of event-sourced applications consisting of Eventsourced actors (processors) and channels. Recovery is usually done at application start, either after normal termination or after a crash (but can also be done any time later, even for individual processors and channels).

val system: ActorSystem = …
val journal: ActorRef = …

val extension = EventsourcingExtension(system, journal)

// create and register event-sourced processors
extension.processorOf(…)
// …

// create and register channels
extension.channelOf(…)
// …

// recover state of registered processors and activate channels
extension.recover()

// processors and channels are now ready to use
// …

The recover() method first replays journaled event messages to all registered processors. By replaying the event message history, processors can recover state. Processors that emit event messages to one or more channels will also do so during replay. These channels will either ignore (discard) event messages that have already been successfully delivered (i.e. acknowledged) in previous application runs or buffer them for later delivery. After replay, the recover() method triggers the delivery of buffered messages by activating channels.

If channels delivered event messages immediately instead of buffering them, delivered event messages could wrongly interleave with replayed event messages. This could lead to inconsistencies in event message ordering across application runs and therefore to inconsistencies in application state. Therefore, recovery must ensure that buffered event messages are only delivered after all replayed event messages have been added to their corresponding processors' mailboxes. This is especially important for the recovery of processors and channels that are connected to cyclic, directed graphs.

Replay parameters

Recovery can be parameterized with replay parameters using the EventsourcingExtension.recover(Seq[ReplayParams]) method (or one of its overloaded definitions). ReplayParams allow fine-grained control over state recovery of individual processors. For each processor to be recovered, an application must create a ReplayParams instance. ReplayParams specify

  • whether replay should start from scratch, from a snapshot or from a given sequence number (lower sequence number bound).
  • whether replay should end at current state or any state in the past (using an upper sequence number bound)

The following two subsections demonstrate some ReplayParams usage examples. For more details, refer to the API docs of ReplayParams and its companion object. For details about snapshot creation refer to the Snapshots section.

Recovery without snapshots

As already explained above

val extension: EventsourcingExtension = …

import extension._

recover()

recovers all processors with no lower and upper sequence number bound i.e. all event messages are replayed. This is equivalent to

recover(replayParams.allFromScratch)

or

recover(processors.keys.map(pid => ReplayParams(pid)).toSeq)

If an application only wants to recover specific processors it should create ReplayParams only for these processors. For example

recover(Seq(ReplayParams(1), ReplayParams(2)))

only recovers processors with ids 1 and 2. Upper and lower sequence number bounds can be specified as well.

recover(Seq(ReplayParams(1, toSequenceNr = 12651L), ReplayParams(2, fromSequenceNr = 10L)))

Here processor 1 will receive replayed event messages with sequence numbers within range 0 and 12651 (inclusive), processor 2 with receive event messages with sequence numbers starting from 10 with no upper sequence number bound.

Recovery with snapshots

During snapshot based recovery, a processor receives a SnapshotOffer message before receiving the remaining event messages (if there are any). A processor uses a SnapshotOffer message to restore its state.

Scala:

class Processor extends Actor {
  var state = // …

  def receive = {
    case so: SnapshotOffer => state = so.snapshot.state
    // …
  }
}

Java:

public class Processor extends UntypedEventsourcedActor {
    private Object state = // …

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof SnapshotOffer) {
            SnapshotOffer so = (SnapshotOffer)message;
            state = so.snapshot().state();
        }
        // …
    }
}

Snapshot based recovery will only send a SnapshotOffer message to a processor if one or more snapshots have been created for that processor before and these snapshots match the criteria in the corresponding ReplayParams. Relevant criteria are toSequenceNr and snapshotFilter. If there are no snapshots for a processor or existing snapshots do not match ReplayParams criteria, event messages will be replayed from scratch i.e. from sequence number 0.

To recover all processors from their latest snapshot

recover(replayParams.allWithSnapshot)

can be used. This is equivalent to

recover(processors.keys.map(pid => ReplayParams(pid, snapshot = true)).toSeq)

Snapshot based recovery can also be made with upper sequence number bound.

recover(Seq(ReplayParams(1, snapshot = true, toSequenceNr = 12651L)))

This recovers processor 1 with the latest snapshot that has a sequence number <= 12651. Remaining event messages (if there are any) are replayed up to sequence number 12651 (inclusive). Applications may also define further constraints on snapshots. For example

import scala.concurrent.duration._

val limit = System.currentTimeMillis - 24.hours.toMillis

recover(Seq(ReplayParams(1, snapshotFilter = snapshotMetadata => snapshotMetadata.timestamp < limit)))

uses the latest snapshot of processor 1 that is older than 24 hours. This is done with a snapshotFilter that filters snapshots based on their timestamp. Snapshot filters operate on SnapshotMetadata.

Await processing

The recover method waits for replayed messages being sent to all processors (via !) but does not wait for replayed event messages being processed by these processors. However, any new message sent to any registered processor, after recover successfully returned, will be processed after the replayed event messages. Applications that want to wait for processors to complete processing of replayed event messages, should use the awaitProcessing() method of EventsourcingExtension.

val extension: EventsourcingExtension = …

extension.recover()
extension.awaitProcessing()

This can be useful in situations where event-sourced processors maintain state via STM references and the application wants to ensure that the (externally visible) state is fully recovered before accepting new read requests from client applications. By default, the awaitProcessing() method waits for all registered processors to complete processing but applications can also specify a subset of registered processors.

Non-blocking recovery

The recover and awaitProcessing methods block the calling thread. This may be convenient in scenarios where a main thread wants to recover the state of an event-sourced application before taking any further actions. In other scenarios, for example, where recovery is done for individual child processors (and channels) inside an actor (see OrderExampleReliable.scala), the non-blocking recovery API should be used:

val extension: EventsourcingExtension = …

val future = for {
  _ <- extension.replay(…)
  _ <- extension.deliver(…)            // optional
  _ <- extension.completeProcessing(…) // optional
} yield ()

future onSuccess {
  case _ => // event-sourced processors now ready to use …
}

Here, the futures returned by replay, deliver and completeProcessing are monadically composed with a for-comprehension which ensures a sequential execution of the corresponding asynchronous operations. When the composite future completes, the recovered processors and channels are ready to use. More details in the API docs. The replay method can also be parameterized with a ReplayParams sequence (see section Replay parameters).

Channel recovery and usage

Channels can even be used by applications immediately after creation i.e. before activating them. This is especially important when event-sourced (parent) processors create new event-sourced child processors during handling of an event:

class Parent extends Actor { this: Receiver with Eventsourced =>
  import context.dispatcher

  var child: Option[ActorRef] = None

  def receive = {
    case event => {
        // create child processor wrapped by channel
        if (child.isEmpty) { child = Some(createChildProcessor(2, 2)) }
        // channel can be used immediately
        child.foreach(_ ! message.copy(…))
    }
  }

  def createChildProcessor(pid: Int, cid: Int): ActorRef = {
    implicit val recoveryTimeout = Timeout(10 seconds)

    val childProcessor = extension.processorOf(Props(new Child with Receiver with Eventsourced { val id = pid } ))
    val childChannel = extension.channelOf(DefaultChannelProps(cid, childProcessor))

    for { // asynchronous, non-blocking recovery
      _ <- extension.replay(Seq(ReplayParams(pid)))
      _ <- extension.deliver(cid)
    } yield ()

    childChannel
  }
}

class Child extends Actor { this: Receiver =>
  def receive = {
    case event => {
      …
      confirm()
    }
  }
}

Here, Parent lazily creates a new childProcessor (wrapped by a default channel) on receiving an event. The childChannel is used by Parent immediately after creation i.e. concurrently to childProcessor message replay and childChannel activation. This is possible because a channel internally buffers new messages before its activation and delivers them to its destination after activation. This ensures that new messages will only be delivered to childProcessor after childChannel has been activated. During Parent recovery, childChannel will ignore messages that have already been successfully delivered to childActor (i.e. confirmed by childActor).

State dependencies

The behavior of Eventsourced processors may depend on the state of other Eventsourced processors. For example, processor A sends a message to processor B and processor B replies with a message that includes (part of) processor B's state. Depending on the state value included in the reply, processor A may take different actions. To ensure a proper recovery of such a setup, any state-conveying or state-dependent messages exchanged between processors A and B must be of type Message (see also DependentStateRecoverySpec.scala). Exchanging state via non-journaled messages (i.e. messages of type other than Message) can break consistent recovery. This is also the case if an Eventsourced processor maintains state via an externally visible STM reference and another Eventsourced processor directly reads from that reference. Communication between Eventsourced processors is closely related to external queries and external updates.

Snapshots

Snapshots represent processor state at a certain point in time and can dramatically reduce recovery times. Snapshot capturing and saving is triggered by applications and does not delete entries from the event message history unless explicitly requested by an application.

Applications can create snapshots by sending an Eventsourced processor a SnapshotRequest message (Scala API) or SnapshotRequest.get() message (Java API).

Scala:

import org.eligosource.eventsourced.core._
// …

val processor: ActorRef = …

processor ! SnapshotRequest

Java:

import org.eligosource.eventsourced.core.*;
// …

final ActorRef processor = …

processor.tell(SnapshotRequest.get(), …)

This will asynchronously capture and save a snapshot of processor's state. The sender will be notified when the snapshot has been successfully saved.

Scala:

(processor ? SnapshotRequest).mapTo[SnapshotSaved].onComplete {
  case Success(SnapshotSaved(processorId, sequenceNr, timestamp)) => …
  case Failure(e)                                                 => …
}

Java:

ask(processor, SnapshotRequest.get(), 5000L).mapTo(Util.classTag(SnapshotSaved.class)).onComplete(new OnComplete<SnapshotSaved>() {
    public void onComplete(Throwable failure, SnapshotSaved result) {
        if (failure != null) { … } else { … }
    }
}, system.dispatcher());

Alternatively, applications may also use the EventsourcingExtension.snapshot method to trigger snapshot creation. For example,

Scala:

val extension: EventsourcingExtension = …

extension.snapshot(Set(1, 2)) onComplete {
  case Success(snapshotSavedSet: Set[SnapshotSaved]) => …
  case Failure(_)                                    => …
}

Java:

Set<Integer> processorIds = new HashSet<Integer>();

processorIds.add(1);
processorIds.add(2);

extension.snapshot(processorIds, new Timeout(5000L)).onComplete(new OnComplete<Set<SnapshotSaved>>() {
    public void onComplete(Throwable failure, Set<SnapshotSaved> snapshotSavedSet) {
        if (failure != null) { … } else { … }
    }
}, system.dispatcher());

creates snapshots of processors with ids 1 and 2. The returned future (of type Future[scala.immutable.Set[SnapshotSaved]] (Scala API) or Future<java.util.Set<SnapshotSaved>> (Java API)) successfully completes when the snapshots of both processors have been successfully saved.

To participate in snapshot capturing, a processor must process SnapshotRequest messages by calling their process method with its current state as argument:

Scala:

class Processor extends Actor {
  var state = …

  def receive = {
    case sr: SnapshotRequest => sr.process(state)
    // …
  }
}

Java:

public class Processor extends UntypedEventsourcedActor {
    private Object state = …

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof SnapshotRequest) {
            SnapshotRequest sr = (SnapshotRequest)message;
            sr.process(state, getContext());
        }
        // …
    }
}

Calling process will asynchronously save the state argument together with (generated) snapshot metadata. Creating a new snapshot does not delete older snapshots unless explicitly requested by an application. Hence, there can be n snapshots per processor.

An example that demonstrates snapshot creation and snapshot based recovery is contained in SnapshotExample.scala and SnapshotExample.java. It can be executed from the sbt prompt with

Scala:

> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.SnapshotExample

Java:

> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.japi.SnapshotExample

Configuration

Snapshotting is supported by all journals via the Hadoop FileSystem abstraction. The default FileSystem instance is the local filesystem i.e. snapshots are by default written locally unless configured otherwise by the application. Please refer to the Hadoop documentation how to create FileSystem instances for HDFS, FTP, S3 etc. Application-defined FileSystem instances can be configured in the following way:

Scala:

// …
import org.apache.hadoop.fs.FileSystem

// …
val hdfs: FileSystem = FileSystem.get(...)
val journal: ActorRef = LeveldbJournalProps(..., snapshotFilesystem = hdfs).createJournal

Java:

// …
import org.apache.hadoop.fs.FileSystem;

// …
final FileSystem hdfs = FileSystem.get(…);
final ActorRef journal = LeveldbJournalProps.create(...).withSnapshotFilesystem(hdfs).createJournal(system);

Find out more in the HadoopFilesystemSnapshottingProps API docs.

Behavior changes

Actors that are modified with a stackable Receiver, Emitter and/or Eventsourced trait (Scala API) or extend any of the abstract Untyped* base classes (Java API) can change their behavior with the methods become() and unbecome(). These are defined on the Behavior trait from which Receiver, Emitter and Eventsourced inherit.

Actors that change their behavior with become() and unbecome() will keep the functionality introduced by a stackable Receiver, Emitter and/or Eventsourced trait. For example, an actor that is modified with the Eventsourced trait (Scala API) or extends UntypedEventsourcedActor (Java API) will continue to journal event messages after having changed its behavior with become().

On the other hand, actors that change their behavior with context.become() (Scala API) or getContext().become() (Java API) will loose the functionality introduced by the stackable Receiver, Emitter and/or Eventsourced traits (although the lost behavior can be recovered with context.unbecome() or getContext().unbecome()).

Event series

When a processor derives more than one output event message from a single input event message and emits those output messages to a single channel, it generates a series of event messages. For an event message series, the event processor should set the ack field for all but the last emitted message to false.

class Processor(channel: ActorRef) extends Actor {
  def receive = {
    case msg: Message => {
      // …
      channel ! msg.copy(event = "event 1", ack = false) // 1st message of series
      channel ! msg.copy(event = "event 2", ack = false) // 2nd message of series
      // …
      channel ! msg.copy(event = "event n") // last message of series
    }
  }
}

Processors that use an emitter do that in the following way.

class Processor extends Actor { this: Emitter =>
  def receive = {
    case event => {
      // …
      emitter("channelName") send (msg => msg.copy(event = "event 1", ack = false)) // 1st message of series
      emitter("channelName") send (msg => msg.copy(event = "event 2", ack = false)) // 2nd message of series
      // …
      emitter("channelName") sendEvent "event n"
    }
  }
}

This ensures that an acknowledgement is only written to the journal after the last message of a series has been successfully

Destinations, however, should confirm the receipt of every event message, regardless whether it belongs to a series or not.

Idempotency

Under certain failure conditions, channels may deliver event messages to destinations more than once. A typical example is that a destination positively confirms a message receipt but the application crashes shortly before that confirmation can be written to the journal. In this case, the destination will receive the event message again during recovery.

For these (but also other) reasons, channel destinations must be idempotent event message consumers which is an application-level concern. For example, an event message consumer that stores received purchase orders in a map (where the map key is the order id) is likely to be an idempotent consumer because receiving a purchase order only once or several times will lead to the same result: the purchase order is contained in the map only once. An event message consumer that counts the number of received purchase orders is not an idempotent consumer: a re-delivery will lead to a wrong counter value from a business logic perspective. In this case the event message consumer must implement some extra means to detect event message duplicates.

For detecting duplicates, applications should use identifiers with their events. Identifier values should be set by an event-sourced processor before an event is emitted via a channel. Channel destinations (or other downstream consumers) should keep track of identifiers of successfully processed events and compare them to identifiers of newly received events. A newly received event with an already known identifier can be considered as a duplicate (assuming that the emitting processor generates unique identifiers). For generating unique identifiers, processors can use the sequence number of received event messages:

case class MyEvent(details: Any, eventId: Long)

class Processor extends Actor { this: Emitter with Eventsourced =>
  def receive = {
    case event => {
      // get sequence number of current event message
      val snr: Long = sequenceNr
      val details: Any = …
      // …
      emitter("channelName") sendEvent MyEvent(details, snr)
    }
  }
}

Using the sequence number has the advantage that consumers of emitted events only need to remember the identifier of the last successfully consumed event. If the identifier of a newly received event is less than or equal to that of the last consumed event then it is a duplicate and can therefore be ignored.

class Consumer extends Actor {
  var lastEventId = 0L

  def receive = {
    case MyEvent(details, eventId) =>
      if (eventId <= lastEventId) {
        // duplicate
      } else {
        // …
        lastEventId = eventId
      }
  }
}

Consumers that are event-sourced processors can store the event identifier as part of their state which will be recovered during an event message replay. Other consumers must store the identifier somewhere else.

Processors that emit event message series should use an event message index in addition to the sequence number to uniquely identify an emitted event:

case class MyEvent(details: Any, eventId: (Long, Int))

class Processor extends Actor { this: Emitter with Eventsourced =>
  def receive = {
    case event => {
      // get sequence number of current event message
      val snr: Long = sequenceNr
      val details: Seq[Any] = …
      // …
      emitter("channelName") send (msg => msg.copy(event = MyEvent(details(0), (snr, 0)), ack = false))
      emitter("channelName") send (msg => msg.copy(event = MyEvent(details(1), (snr, 1)), ack = false))
      // …
    }
  }
}

Consumers should then compare the sequence number - index pairs for detecting duplicates.

Serialization

Applications can configure custom serializers for events of event Messages. Custom serializers are used for both, writing the event to a journal and for remote communication. They can be configured like any other Akka serializer. For example:

akka {
  actor {
    serializers {
      custom = "example.MyEventSerializer"
    }
    serialization-bindings {
      "example.MyEvent" = custom
    }
  }
}

Here, example.MyEvent is an application-specific event type and example.MyEventSerializer is an application-specific serializer that extends akka.serialization.Serializer

import akka.serialization.Serializer

class CustomEventSerializer extends Serializer {
  def identifier = …
  def includeManifest = true

  def toBinary(o: AnyRef) = …
  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]) = …
}

Event Messages themselves are serialized with a pre-configured, library-specific serializer. This serializer is automatically used for event Messages when the eventsourced-journal-common-*.jar is on the classpath of an Akka application.

Further examples

Order management

The order management example in this section is taken from Martin Fowler's great LMAX article:

Imagine you are making an order for jelly beans by credit card. A simple retailing system would take your order information, use a credit card validation service to check your credit card number, and then confirm your order - all within a single operation. The thread processing your order would block while waiting for the credit card to be checked, but that block wouldn't be very long for the user, and the server can always run another thread on the processor while it's waiting.

In the LMAX architecture, you would split this operation into two. The first operation would capture the order information and finish by outputting an event (credit card validation requested) to the credit card company. The Business Logic Processor would then carry on processing events for other customers until it received a credit-card-validated event in its input event stream. On processing that event it would carry out the confirmation tasks for that order.

This can be implemented with the Eventsourced library as shown in the following diagram (legend is in Appendix A).

Order management

  • We implement the mentioned Business Logic Processor processor as event-sourced actor (OrderProcessor). It processes OrderSubmitted events by assigning submitted orders an id and storing them in a map (= state of OrderProcessor). For every submitted order it emits a CreditCardValidationRequested event.
  • CreditCardValidationRequested events are processed by a CreditCardValidator actor. It contacts an external credit card validation service and sends CreditCardValidated events back to the OrderProcessor for every order with a valid credit card number. In the example implementation below, we won't actually use an external service to keep the implementation simple, but for real-world implementations, akka-camel would be a perfect fit here.
  • On receiving a CreditCardValidated event, the event-sourced OrderProcessor updates the status of corresponding order to validated = true and sends an OrderAccepted event, containing the updated order, to Destination. It also replies the updated order to the initial sender.

The Order domain object, the domain events and the OrderProcessor are defined as follows:

// domain object
case class Order(id: Int = -1, details: String, validated: Boolean = false, creditCardNumber: String)

// domain events
case class OrderSubmitted(order: Order)
case class OrderAccepted(order: Order)
case class CreditCardValidationRequested(order: Order)
case class CreditCardValidated(orderId: Int)

// event-sourced order processor
class OrderProcessor extends Actor { this: Emitter =>
  var orders = Map.empty[Int, Order] // processor state

  def receive = {
    case OrderSubmitted(order) => {
      val id = orders.size
      val upd = order.copy(id = id)
      orders = orders + (id -> upd)
      emitter("validation_requests") forwardEvent CreditCardValidationRequested(upd)
    }
    case CreditCardValidated(orderId) => {
      orders.get(orderId).foreach { order =>
        val upd = order.copy(validated = true)
        orders = orders + (orderId -> upd)
        sender ! upd
        emitter("accepted_orders") sendEvent OrderAccepted(upd)
      }
    }
  }
}

The OrderProcessor uses a message emitter to send CreditCardValidationRequested events to CreditCardValidator via the named "validation_requests" channel. The forwardEvent method not only sends the event but also forwards the initial sender reference. Upon receiving a CreditCardValidationRequested event, the CreditCardValidator runs a credit card validation in the background and sends a CreditCardValidated event back to the OrderProcessor

class CreditCardValidator(orderProcessor: ActorRef) extends Actor { this: Receiver =>
  def receive = {
    case CreditCardValidationRequested(order) => {
      val sdr = sender  // initial sender
      val msg = message // current event message
      Future {
        // do some credit card validation
        // …

        // and send back a successful validation result (preserving the initial sender)
        orderProcessor tell (msg.copy(event = CreditCardValidated(order.id)), sdr)
      }
    }
  }
}

The CreditCardValidator again forwards the initial sender reference which finally enables the OrderProcessor to reply to the initial sender when it receives the CreditCardValidated event. The OrderProcessor also sends an OrderAccepted event to Destination via the named "accepted_orders" channel.

class Destination extends Actor {
  def receive = {
    case event => println("received event %s" format event)
  }
}

Next step is to wire the collaborators and to recover them:

val extension: EventsourcingExtension = …

val processor = extension.processorOf(Props(new OrderProcessor with Emitter with Confirm with Eventsourced { val id = 1 }))
val validator = system.actorOf(Props(new CreditCardValidator(processor) with Receiver))
val destination = system.actorOf(Props(new Destination with Receiver with Confirm))

extension.channelOf(ReliableChannelProps(1, validator).withName("validation_requests"))
extension.channelOf(DefaultChannelProps(2, destination).withName("accepted_orders"))

extension.recover()

The named "validation requests" channel is a reliable channel that re-delivers CreditCardValidationRequested events in case of CreditCardValidator failures (for example, when the external credit card validation service is temporarily unavailable). Furthermore, it should be noted that the CreditCardValidator does not confirm event message deliveries (it neither calls confirm() explicitly nor is it modified with the Confirm trait during instantiation). Delivery confirmation will take place when the OrderProcessor successfully processed the CreditCardValidated event.

The Order processor is now ready to receive OrderSubmitted events.

processor ? Message(OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-5678"))) onSuccess {
  case order: Order => println("received response %s" format order)
}

Running this example with an empty journal will write

received response Order(0,jelly beans,true,1234-5678-1234-5678)
received event OrderAccepted(Order(0,jelly beans,true,1234-5678-1234-5678))

to stdout. You may observe a different line ordering when running the example. The submitted order was assigned an id of 0 which corresponds to the initial size of the OrderProcessor's orders map. A second application run will first recover the previous application state, so that another order submission will generate an order id of 1.

received response Order(1,jelly beans,true,1234-5678-1234-5678)
received event OrderAccepted(Order(1,jelly beans,true,1234-5678-1234-5678))

The example code is contained in OrderExample.scala and can be executed from the sbt prompt with

> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.OrderExample

An advanced version of this example, using a reliable request-reply channel, is discussed in Event sourcing and external service integration.

State machines

State machines

With a change since Akka 2.1, event-sourcing Akka FSMs is now pretty easy. The following state machine example is a Door which can be in one of two states: Open and Closed.

sealed trait DoorState

case object Open extends DoorState
case object Closed extends DoorState

case class DoorMoved(state: DoorState, times: Int)
case class DoorNotMoved(state: DoorState, cmd: String)
case class NotSupported(cmd: Any)

class Door extends Actor with FSM[DoorState, Int] { this: Emitter =>
  startWith(Closed, 0)

  when(Closed) {
    case Event("open", counter) => {
      emit(DoorMoved(Open, counter + 1))
      goto(Open) using(counter + 1)
    }
  }

  when(Open) {
    case Event("close", counter) => {
      emit(DoorMoved(Closed, counter + 1))
      goto(Closed) using(counter + 1)
    }
  }

  whenUnhandled {
    case Event(cmd @ ("open" | "close"), counter) => {
      emit(DoorNotMoved(stateName, "cannot %s door" format cmd))
      stay
    }
    case Event(cmd, counter) => {
      emit(NotSupported(cmd))
      stay
    }
  }

  def emit(event: Any) = emitter("destination") forwardEvent event
}

On state changes, a door emits DoorMoved events to the named "destination" channel. DoorMoved events contain the door's current state and the number of moves so far. On invalid attempts to move a door e.g. trying to open an opened door, a DoorNotMoved event is emitted. The channel destination is an actor that simply prints received events to stdout.

class Destination extends Actor {
  def receive = { case event => println("received event %s" format event) }
}

After configuring the application

val system: ActorSystem = …
val extension: EventsourcingExtension = …

val destination = system.actorOf(Props(new Destination with Receiver with Confirm))

extension.channelOf(DefaultChannelProps(1, destination).withName("destination"))
extension.processorOf(Props(new Door with Emitter with Eventsourced { val id = 1 } ))
extension.recover()

val door = extension.processors(1)

we can start sending event messages to door:

door ! Message("open")
door ! Message("close")

This will write

received event DoorMoved(Open,1)
received event DoorMoved(Closed,2)

to stdout. When trying to attempt an invalid state change with

door ! Message("close")

the destination will receive a DoorNotMoved event:

received event DoorNotMoved(Closed,cannot close door)

Restarting the example application will recover the door's state so that

door ! Message("open")
door ! Message("close")

will produce

received event DoorMoved(Open,3)
received event DoorMoved(Closed,4)

The code from this section is contained in slightly modified form in FsmExample.scala.

Clustering

This section makes the Door state machine from the previous example highly-available in an Akka cluster. The Door state machine is a cluster-wide singleton that is managed by NodeActors. There's one NodeActor per cluster node listening to cluster events. If a NodeActor becomes the master (= leader) it creates and recovers a Door instance. The other NodeActors obtain a remote reference to the Door instance on master.

Clustering

Clients interact with the Door singleton via NodeActors by sending them door commands ("open" or "close"). NodeActors accept commands on any cluster node, not only on master. A NodeActor forwards these commands to the Door as command Messages. Event Messages emitted by the Door are sent to a remote Destination actor via the named "destination" channel. The Destination creates a response from the received events and sends that response back to the initial sender. The application that runs the Destination actor is not part of the cluster but a standalone remote application. It also hosts the journal that is used by the cluster nodes (which is a SPOF in this example but later versions will use a distributed journal).

When the master crashes, another node in the cluster becomes the master and recovers the Door state machine. The remaining slave node renews its remote reference to the Door instance on the new master.

Clustering

Code from this section is contained in ClusterExample.scala, the configuration files used are journal.conf and cluster.conf. For a more detailed description of the example code, refer to the code comments. To run the distributed example application from sbt, first start the application that hosts the Destination actor and the journal:

> run-main org.eligosource.eventsourced.example.DestinationApp

Then start the first seed node of the cluster

> run-main org.eligosource.eventsourced.example.NodeApp 2561

then the second seed node

> run-main org.eligosource.eventsourced.example.NodeApp 2562

and finally a third cluster node

> run-main org.eligosource.eventsourced.example.NodeApp

The above commands require that you're in the eventsourced-examples project. You can switch to it via

> project eventsourced-examples

Most likely the first seed node will become the master which writes

MASTER: recovered door at akka://[email protected]:2561

to stdout. The other nodes become slaves that write

SLAVE: referenced door at akka://[email protected]:2561

to stdout. All nodes prompt the user to enter a door command:

command (open|close):

We will now enter commands on the last started cluster node (a slave node).

The Door singleton is initially in closed state. Entering open will open it:

command (open|close): open
moved 1 times: door now open

Then close it again:

command (open|close): close
moved 2 times: door now closed

Trying to close a closed door will result in an error:

command (open|close): close
cannot close door: door is closed

Now kill the master node with ctrl^c. This will also destroy the Door singleton. After 1-2 seconds, a new master has been determined by the cluster. The new master is going to recover the event-sourced Door singleton. The slave will renew its remote reference to the Door. To verify that the Door has been properly recovered, open the door again:

command (open|close): open
moved 3 times: door now open

You can see that the Door state (which contains the number of past moves) has been properly failed-over.

Miscellaneous

Multicast processor

Multicast

The Multicast processor is a predefined Eventsourced processor that forwards received event messages to multiple targets. Using a Multicast processor with n targets is an optimization of having n Eventsourced processors that receive the same event Messages. Using a multicast processor, a received event message is journaled only once whereas with n Eventsourced processors that message would be journaled n times (once for each processor). Using a Multicast processor for a large number of targets can therefore significantly save disk space and increase throughput.

Applications can create a Multicast processor with the multicast factory method which is defined in package core.

// …
import org.eligosource.eventsourced.core._

val extension: EventsourcingExtension = …

val processorId: Int = …
val target1: ActorRef = …
val target2: ActorRef = …

val multicast = extension.processorOf(Props(multicast(processorId, List(target1, target2))))

This is equivalent to

val multicast = extension.processorOf(Props(new Multicast(List(target1, target2), identity) with Eventsourced { val id = processorId } ))

Applications that want to modify received event Messages, before they are forwarded to targets, can specify a transformer function.

val transformer: Message => Any = msg => msg.event
val multicast = extension.processorOf(Props(multicast(1, List(target1, target2), transformer)))

In the above example, the transformer function extracts the event from a received event Message. If the transformer function is not specified, it defaults to the identity function. Another Multicast factory method is the decorator method for creating a multicast processor with a single target.

Retroactive changes

TODO

Appendix A: Legend

Legend

Appendix B: Project

Appendix C: Articles

Appendix D: Support