GCP Datastore Akka Persistence Plugin
akka-persistence-gcp-datastore is a journal and snapshot store plugin for akka-persistence using Google Cloud Datastore. It uses the official Google Java Dependency to talk with the datastore.
Scala 2.12 & 2.13, Java 8 & Java 11, akka 2.6.X are supported.
The plugin supports the following functionality:
- serialization of events and snapshots with play-json
- peristence-query api
- custom serialization
Related Blogposts
- Using Google Datastore with akka-persistence
- Was ist Event Sourcing? (de)
- Wie lรคsst sich das Lesen beim Event Sourcing durch CQRS optimieren? (de)
Usage & Setup
Versions: The table below lists the versions and their main dependencies
Version to use | Scala 2.12 | Scala 2.13 | Scala 3 / Dotty | Akka | play-json | google-cloud-datastore |
---|---|---|---|---|---|---|
1.0.2 | โ | โ | ? | 2.6.x | 2.8.x | 1.102.x |
Dependency
You just need to add the following dependency to you sbt dependencies
libraryDependencies += "de.innfactory" %% "akka-persistence-gcp-datastore" % "X.Y.Z"
Configuration
Take a look at reference.conf under src/main/resources We forked the cqrs cassandra lightbend example with necessary changes for gcp-datastore (@ Demo Test Project available based on the CQRS Example from Lightbend https://github.com/innFactory/akka-persistence-gcp-datastore-example)
Add the following to your application.conf for a basic configuration:
akka {
# use google cloud datastore as journal and snapshot store
persistence {
journal {
plugin = "gcp-datastore-journal"
auto-start-journals = ["gcp-datastore-journal"]
}
snapshot-store {
plugin = "gcp-datastore-snapshot"
auto-start-snapshot-stores = ["gcp-datastore-snapshot"]
}
}
}
Datastore Configuration
-
Google Cloud Project with Datastore or FireStore in Datastore mode enabled
-
Create a index.yml file with content bolow in the project that will use this plugin:
indexes: - kind: journal properties: - name: persistenceId - name: sequenceNr direction: desc - kind: snapshot properties: - name: persistenceId - name: timestamp direction: desc - kind: snapshot properties: - name: persistenceId - name: timestamp - kind: journal properties: - name: tagsKey - name: timestamp - kind: journal properties: - name: persistenceId - name: sequenceNr
index.yml
-
Open terminal and execute
gcloud app deploy index.yaml
This is telling the GCP Datastore to build indexes for the plugin based on the yaml file
-
Create a service account for read and write to datastore. Download the json and add it to the project
src/main/resources/datastore.json
Persistence query API
The plugin supports the Persistence query APi, mostly used in CQRS applications to transform/migrate the events from the write side to the read side.
The ReadJournal is retrieved via the akka.persistence.datastore.journal.read.DatastoreScaladslReadJournal
and akka.persistence.datastore.journal.read.DatastoreJavadslReadJournal
. There is also a DatastoreReadJournalProvider.
import akka.persistence.datastore.journal.read.DatastoreScaladslReadJournal
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
val system = ??? //ActorSystem akka-classic or akka-typed then system.toClassic is needed. see the example.
val readJournal =
PersistenceQuery(system).readJournalFor[DatastoreScaladslReadJournal]("gcp-datastore-query")
Supported Queries
All queries are live streams and they are not completed when they reaches the end of the currently stored events, but continue to push new events when new events are persisted.
eventsByTag
eventsByTags is used for retrieving events that were marked with a given tag.
eventsByPersistenceId
eventsByPersistenceId is used for retrieving events for a specific PersistentActor identified by its persistenceId
Testing
To test this plugin
(Source: https://cloud.google.com/datastore/docs/tools/datastore-emulator)
-
gcloud components install cloud-datastore-emulator
-
gcloud beta emulators datastore start --no-store-on-disk --consistency=1.0
-
Set Env Variable
DATASTORE_TESTHOST=http://<host>:<port>
of datastore emulator -
Execute
sbt run
-
Before executing test reset datastore data:
curl -X POST http://<host>:<port>/reset
There is a shell script under .circle ci which runs all of these tests. cqrs tests are outsourced in the example project.
Contribution policy
Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.
Credits
innFactory GmbH is a lightbend partner from germany. We are experts for Apps, BigData & Cloud Computing. If you need help with your next project, feel free to ask for our support.