• Stars
    star
    274
  • Rank 150,274 (Top 3 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 10 years ago
  • Updated over 8 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Large-scale event processing with Akka Persistence and Apache Spark

Akka Analytics

Large-scale event processing with Akka Persistence and Apache Spark. At the moment you can

Dependencies

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies ++= Seq(
  "com.github.krasserm" %% "akka-analytics-cassandra" % “0.3.1”,
  "com.github.krasserm" %% "akka-analytics-kafka" % “0.3.1”
)

Event batch processing

With akka-analytics-cassandra you can expose and process events written by all persistent actors as resilient distributed dataset (RDD). It uses the Spark Cassandra Connector to fetch data from the Cassandra journal. Here's a primitive example (details here):

import akka.actor.ActorSystem

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import akka.analytics.cassandra._

val conf = new SparkConf()
 .setAppName("CassandraExample")
 .setMaster("local[4]")
 .set("spark.cassandra.connection.host", "127.0.0.1")

val sc = new SparkContext(conf)

// expose journaled Akka Persistence events as RDD
val rdd: RDD[(JournalKey, Any)] = sc.eventTable().cache()

// and do some processing ... 
rdd.sortByKey().map(...).filter(...).collect().foreach(println)

The dataset generated by eventTable() is of type RDD[(JournalKey, Any)] where Any represents the persisted event (see also Akka Persistence API) and JournalKey is defined as

package akka.analytics.cassandra

case class JournalKey(persistenceId: String, partition: Long, sequenceNr: Long)

Events for a given persistenceId are partitioned across nodes in the Cassandra cluster where the partition is represented by the partition field in the key. The eventTable() method returns an RDD in which events with the same persistenceId - partition combination (= cluster partition) are ordered by increasing sequenceNr but the ordering across cluster partitions is not defined. If needed the RDD can be sorted with sortByKey() by persistenceId, partition and sequenceNr in that order of significance. Btw, the default size of a cluster partition in the Cassandra journal is 5000000 events (see akka-persistence-cassandra).

Event stream processing

With akka-analytics-kafka you can expose and process events written by all persistent actors (more specific, from any user-defined topic) as discretized stream (DStream). Here's a primitive example (details here):

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

import akka.analytics.kafka._
import akka.persistence.kafka.Event

val sparkConf = new SparkConf()
  .setAppName("events-consumer")
  .setMaster("local[4]")

// read from user-defined events topic 
// with 2 threads (see also Kafka API) 
val topics = Map("events" -> 2)
val params = Map[String, String](
  "group.id" -> "events-consumer",
  "auto.commit.enable" -> "false",  
  "auto.offset.reset" -> "smallest",
  "zookeeper.connect" -> "localhost:2181",
  "zookeeper.connection.timeout.ms" -> "10000")

val ssc = new StreamingContext(sparkConf, Seconds(1))
val es: DStream[Event] = ssc.eventStream(params, topics)

es.foreachRDD(rdd => rdd.map(...).filter(...).collect().foreach(println))

ssc.start()
ssc.awaitTermination()

The stream generated by eventStream(...) is of type DStream[Event] where Event is defined in akka-persistence-kafka as

package akka.persistence.kafka

/**
 * Event published to user-defined topics.
 *
 * @param persistenceId Id of the persistent actor that generates event `data`.
 * @param sequenceNr Sequence number of the event.
 * @param data Event data generated by a persistent actor.
 */
case class Event(persistenceId: String, sequenceNr: Long, data: Any)

The stream of events (written by all persistent actors) is partially ordered i.e. events with the same persistenceId are ordered by sequenceNr whereas the ordering of events with different persistenceId is not defined. Details about Kafka consumer params are described here.

Custom serialization

If events have been persisted with a custom serializer, the corresponding Akka serializer configuration must be specified for event processing. For event batch processing this is done as follows:

val system: ActorSystem = ...
val jsc: JournalSparkContext = 
  new SparkContext(sparkConfig).withSerializerConfig(system.settings.config)

val rdd: RDD[(JournalKey, Any)] = jsc.eventTable()
// ...

jsc.context.stop()

For event stream processing this is done in a similar way:

val system: ActorSystem = ...
val jsc: JournalStreamingContext = 
  new StreamingContext(sparkConfig, Seconds(1)).withSerializerConfig(system.settings.config)

val es: DStream[Event] = jsc.eventStream(kafkaParams, kafkaTopics)
// ...

jsc.context.start()
// ...

jsc.context.stop()

Running examples are akka.analytics.cassandra.CustomSerializationSpec and akka.analytics.kafka.CustomSerializationSpec.

More Repositories

1

bayesian-machine-learning

Notebooks about Bayesian methods for machine learning
Jupyter Notebook
1,808
star
2

super-resolution

Tensorflow 2.x based implementation of EDSR, WDSR and SRGAN for single image super-resolution
Python
1,496
star
3

perceiver-io

A PyTorch implementation of Perceiver, Perceiver IO and Perceiver AR with PyTorch Lightning scripts for distributed training
Python
432
star
4

face-recognition

Deep face recognition with Keras, Dlib and OpenCV
Jupyter Notebook
377
star
5

machine-learning-notebooks

Stanford Machine Learning course exercises implemented with scikit-learn
Jupyter Notebook
341
star
6

fairseq-image-captioning

Transformer-based image captioning extension for pytorch/fairseq
Python
313
star
7

streamz

A combinator library for integrating Functional Streams for Scala (FS2), Akka Streams and Apache Camel
Scala
283
star
8

akka-persistence-cassandra

A replicated Akka Persistence journal backed by Apache Cassandra
Scala
224
star
9

akka-persistence-kafka

A replicated Akka Persistence journal backed by Apache Kafka
Scala
201
star
10

akka-stream-eventsourcing

Event sourcing for Akka Streams
Scala
101
star
11

grails-jaxrs

JAX-RS Plugin for Grails
Groovy
50
star
12

scalaz-camel

A Scala(z)-based DSL for Apache Camel
Scala
50
star
13

ipf

Open eHealth Integration Platform
Java
35
star
14

akka-persistence-testkit

Compatibility testkit for Akka Persistence storage plugins
Scala
21
star
15

bot-with-plan

Separation of planning concerns in ReAct-style LLM agents. Planner fine-tuning on synthetic trajectories.
Python
10
star
16

krasserm.github.io

Jupyter Notebook
9
star
17

camelinaction-appendix-e

akka-camel examples from book Camel in Action - Appendix E (adjusted to the most recent Akka release or development snapshot)
Scala
8
star
18

machine-learning-minis

Minimalistic example code for various machine learning and deep learning topics
Jupyter Notebook
8
star
19

ipf-labs

eHealth Integration Framework Labs
Java
7
star
20

ipf-runtime

OSGi-based runtime environment for IPF applications
Shell
6
star
21

ipf-tools

eHealth Integration Framework Tools
Java
6
star
22

sagemaker-tutorial

Multi-node, multi-GPU training with PyTorch Lightning on SageMaker
Python
5
star
23

eventuate-crdt-example

Example application that uses Eventuate's operation-based CRDTs
Scala
3
star
24

safr

Security Annotation Framework
Java
1
star