CATS-SAGA
Purely Functional Transaction Management In Scala With Cats
CI | Coverage | Release | |
---|---|---|---|
Disclaimer
This library was inspired by goedverhaal, but it's implementation and semantics differs, it tries to be semantically consistent with zio-saga and provide you with flexible and powerful functions for building Sagas of different complexities.
For whom this library?
This library is designed for those who want to apply Saga pattern on their codebase and to encode long-running transactions.
Moreover if you use tagless final encoding this library is a perfect fit.
Also consider looking at zio-saga, it's designed specifically for
ZIO users. Although you could use this library with ZIO
as well.
Getting started
Add zio-saga dependency to your build.sbt
:
libraryDependencies += "com.vladkopanev" %% "cats-saga" % "0.3.0"
Example of usage:
Consider the following case, we have built our food delivery system in microservices fashion, so
we have Order
service, Payment
service, LoyaltyProgram
service, etc.
And now we need to implement a closing order method, that collects payment, assigns loyalty points
and closes the order. This method should run transactionally so if e.g. closing order fails we will
rollback the state for user and refund payments, cancel loyalty points.
Applying Saga pattern we need a compensating action for each call to particular microservice, those actions needs to be run for each completed request in case some of the requests fails.
Let's think for a moment about how we could implement this pattern without any specific libraries.
The naive implementation could look like this:
def orderSaga(): IO[Unit] = {
for {
_ <- collectPayments(2d, 2) handleErrorWith (_ => refundPayments(2d, 2))
_ <- assignLoyaltyPoints(1d, 1) handleErrorWith (_ => cancelLoyaltyPoints(1d, 1))
_ <- closeOrder(1) handleErrorWith (_ => reopenOrder(1))
} yield ()
}
Looks pretty simple and straightforward, handleErrorWith
function tries to recover the original request if it fails.
We have covered every request with a compensating action. But what if last request fails? We know for sure that corresponding
compensation reopenOrder
will be executed, but when other compensations would be run? Right, they would not be triggered,
because the error would not be propagated higher, thus not triggering compensating actions. That is not what we want, we want
full rollback logic to be triggered in Saga, whatever error occurred.
Second try, this time let's somehow trigger all compensating actions.
def orderSaga: IO[Unit] = {
collectPayments(2d, 2).flatMap { _ =>
assignLoyaltyPoints(1d, 1).flatMap { _ =>
closeOrder(1) handleErrorWith(e => reopenOrder(1) *> IO.raiseError(e))
} handleErrorWith (e => cancelLoyaltyPoints(1d, 1) *> IO.raiseError(e))
} handleErrorWith(e => refundPayments(2d, 2) *> IO.raiseError(e))
}
This works, we trigger all rollback actions by failing after each. But the implementation itself looks awful, we lost expressiveness in the call-back hell, imagine 15 saga steps implemented in such manner.
You can solve this problems in different ways, but you will encounter a number of difficulties, and your code still would look pretty much the same as we did in our last try.
Achieve a generic solution is not that simple, so you will end up repeating the same boilerplate code from service to service.
cats-saga
tries to address this concerns and provide you with simple syntax to compose your Sagas.
With cats-saga
we could do it like so:
def orderSaga(): IO[Unit] = {
import com.vladkopanev.cats.saga.Saga._
(for {
_ <- collectPayments(2d, 2) compensate refundPayments(2d, 2)
_ <- assignLoyaltyPoints(1d, 1) compensate cancelLoyaltyPoints(1d, 1)
_ <- closeOrder(1) compensate reopenOrder(1)
} yield ()).transact
}
compensate
pairs request IO with compensating action IO and returns a new Saga
object which then you can compose
with other Sagas
.
To materialize Saga
object to IO
when it's complete it is required to use transact
method.
Because Saga
is effect polymorphic you could use whatever effect type you want in tagless final style:
def orderSaga[F[_]: Concurrent](): F[Unit] = {
import com.vladkopanev.cats.saga.Saga._
(for {
_ <- collectPayments(2d, 2) compensate refundPayments(2d, 2)
_ <- assignLoyaltyPoints(1d, 1) compensate cancelLoyaltyPoints(1d, 1)
_ <- closeOrder(1) compensate reopenOrder(1)
} yield ()).transact
}
As you can see with cats-saga
the process of building your Sagas is greatly simplified comparably to ad-hoc solutions.
cats-sagas
are composable, boilerplate-free and intuitively understandable for people that aware of Saga pattern.
This library lets you compose transaction steps both in sequence and in parallel,
this feature gives you more powerful control over transaction execution.
Advanced
Advanced example of working application that stores saga state in DB (journaling) could be found here examples.
Retrying
cats-saga
provides you with functions for retrying your compensating actions, so you could write:
collectPayments(2d, 2) retryableCompensate (refundPayments(2d, 2), RetryPolicies.exponentialBackoff(1.second))
In this example your Saga will retry compensating action refundPayments
after exponentially
increasing timeouts (based on cats-retry).
Parallel execution
Saga pattern does not limit transactional requests to run only in sequence.
Because of that cats-sagas
contains methods for parallel execution of requests.
val flight = bookFlight compensate cancelFlight
val hotel = bookHotel compensate cancelHotel
val bookingSaga = flight zipPar hotel
Note that in this case two compensations would run in sequence, one after another by default.
If you need to execute compensations in parallel consider using Saga#zipWithParAll
function, it allows arbitrary
combinations of compensating actions.
Result dependent compensations
Depending on the result of compensable effect you may want to execute specific compensation, for such cases cats-saga
contains specific functions:
compensate(compensation: Either[E, A] => F[Unit])
this function makes compensation dependent on the result of corresponding effect that either fails or succeeds.compensateIfFail(compensation: E => F[Unit])
this function makes compensation dependent only on error type hence compensation will only be triggered if corresponding effect fails.compensateIfSuccess(compensation: A => F[Unit])
this function makes compensation dependent only on successful result type hence compensation can only occur if corresponding effect succeeds.
Notes on compensation action failures
By default, if some compensation action fails no other compensation would run and therefore user has the ability to choose what to do: stop compensation (by default), retry failed compensation step until it succeeds or proceed to next compensation steps ignoring the failure.