superv.async
Let it crash. The Erlang approach to build reliable systems.
Errors happen, and proper error handling cannot be bolted on top of subsystems. This library draws on the Erlang philosophy of error handling to help building robust distributed systems that deal with errors by default. A detailed description of Erlang supervisors can also be found here.
This is a Clojure(Script) library that
extends core.async with error handling
and includes a number of convenience functions and macros. This library is a
fork of full.async. The original
attempt to merge this work with it
failed
due to the limitations of a dynamic binding approach.
The fork became reasonable, because full.async mostly deals with convenience
functions, but it is not as radically focused on proper error handling. Since
the error handling cannot happen through a transparent dynamic binding, some
convenience is lost in superv.async
as you need to carry around the supervisor
lexically. If binding support comes to ClojureScript for asynchronous contexts
the projects might merge again. The binding approach also has a performance
penalty though. Since the boundary of both error handling libraries is the
exception mechanism and core.async channels, they still compose, but supervision
will not compose with full.async
contexts.
Usage
Exception Handling
Exception handling is an area for which core.async
doesn't have support out of
the box. If something within a go
block throws an exception, it will be logged
in the thread pool or logged to the cljs console, but the block will simply
return nil
, hiding any information about the actual exception. You could wrap
the body of each go
block within an exception handler but that's not very
convenient. superv.async
does this internally for you and provides a set of
helper functions and macros for dealing with exceptions in a simple and robust
manner:
go-try
: equivalent ofgo
but catches any exception thrown and returns via the resulting channel<?
,alts?
,<??
: equivalents of<!
,alts!
and<!!
but if the value is an exception, it will get rethrown. This also ensures a proper stacktrace between async contexts.
Supervision
go-try
and the ?
expressions work well when you have a sequential execution
of goroutines because you can just rethrow the exceptions on the higher levels
of the call-stack. This fails for concurrent goroutines like go-loops or
background tasks, which are often the long-term purpose of the introduction of
asynchronous programming in the first place. To handle these concurrent
situations we have introduced an Erlang inspired supervision concept.
Two requirements for robust systems in Erlang are:
- All exceptions in distributed processes are caught in a unifying supervision context
- All concurrently acquired resources are freed on an exception
We decided to form a supervision context in which exceptions thrown in concurrent parts get reported to a supervisor:
(require '[superv.async :refer [S go-try restarting-supervisor on-abort]])
(let [try-fn (fn [S] (go-try S (throw (ex-info "stale" {}))))
start-fn (fn [rs] ;; will be called again on retries
(go-try rs
(on-abort rs ;; you must ensure the freeing of resources for proper restarts
"do cleanup here")
(try-fn rs) ;; triggers restart after stale-timeout
42))]
(<?? S (restarting-supervisor start-fn :retries 3 :stale-timeout 1000)))
The restarting supervisor tracks all nested goroutines and waits until all are
finished and have hence freed all resources (through on-abort
cleanup
routines) before it tries to restart or finally returns itself either the result
or the exception. This allows composition of supervised contexts. There is a
default simple-supervisor S
in the superv.async
namespace for convenience
during development. We recommend to carry it as S
along in your call stack.
While go-try
, thread-try
and go-loop-try
propagate errors to their
supervisor, their errors, if they run concurrently, will only be caught after
they become stale. For the cases of concurrent go-routines go-super
,
go-super-loop
and thread-super
exist. They will immediately report
exceptions to the supervisor and not propagate them through the call stack,
yielding an immediate restart or failure if necessary and shortening the restart
cycle.
To really free resources you just use the typical stack-based try-catch-finally
mechanism. To support this, channel operations <?
, >?
, alt?
, put?
...
trigger an "abort" exception when the supervisor detects an exception somewhere.
This at least eventually guarantees the termination of all blocking contexts,
since we cannot use preemption on errors like the Erlang VM does. In cases where
you have long blocking IO without core.async, you can insert some dummy blocking
operation to speed up restarts.
The supervisor tracks all thrown exceptions, so whenever they are not taken off some channel and become stale, it will timeout and restart, depending on the exception type it is tracking.
Sequences & Collections
Channels by themselves are quite similar to sequences however converting between
them may sometimes be cumbersome. superv.async
provides a set of convenience
methods derived from full.async
for this:
-
<<!
,<<?
: takes all items from the channel and returns them as a collection. Must be used within ago
block. -
<<!!
,<<??
: takes all items from the channel and returns as a lazy sequence. Returns immediately. -
<!*
,<?*
,<!!*
,<??*
takes one item from each input channel and returns them in a single collection. Results have the same ordering as input channels. -
go-for
is an adapted for-comprehension with channels instead of lazy-seqs. It allows complex control flow across function boundaries where a single transduction context would be not enough. For example:
(let [query-fn #(go (* % 2))
goroutine #(go [%1 %2])]
(<<?? S (go-for S [a [1 2 3]
:let [b (<? S (query-fn a))]
:when (> b 5)]
(<? S (goroutine a b)))))
Note that channel operations are side-effects, so this is best used to realize read-only operations like queries. Nested exceptions are propagated according to go-try semantics.
Without go-for
you needed to form some boilerplate around function
boundaries like this, which is much less succinct:
(<<?? (->> [1 2 3]
(map #(go [% (<? (query-fn %))]))
async/merge
(async/into [])
(filter #(> (second %) 5))
(map (apply goroutine))
async/merge))
Parallel Processing
pmap>>
lets you apply a function to channel's output in parallel,
returning a new channel with results.
Conventions
For readability of code, superv.async
follows these conventions from
full.async
:
- Async operators that throw exceptions use
?
in place of!
, for example throwing counterpart of<!
is<?
. - Functions that return channel that will contain zero or one value (typically
result of
go
blocks) are sufixed with>
. Similarly operators that expect zero/single value channel as input are prefixed with<
(for example<?
). - Functions that return channel that will contain zero to many values are
sufixed with
>>
. Similarly operators that expect zero to many value channel as input are prefixed with<<
(for example<<?
).
Change Log
See CHANGES.
TODO
- add spec signatures
License
Copyright (C) 2015-2016 Christian Weilbach, 2016 FullContact. Distributed under the Eclipse Public License, the same as Clojure.