FSharp.Core.Extensions
This library contains a set of utilities to support building efficient, concurrent programs using functional paradigm in F#, like:
-
atom
- an equivalent of F#ref
cells, which major difference is that all operations (reads and updates) are thread-safe, yet executed without expensive OS-level locks, relying fully on lock-free data structures instead. -
RWLock
- a user-space reentrant reader/writer lock, with support for async/await and cancellation (later still in progress). -
Vec
- an immutable efficient array-like data structure, with fast random reads, element traversal and append to the tail operations. Operations likeVec.add
andVec.item
are O(log32(n)), which for in-memory data structure goes close to O(1). Usually a performance is better than that of F# list or System.Collection.Immutable data structures for supported operations. -
Hlc
which is an implementation of hybrid-logical time protocol, that works as a middle-ground between wall-clock UTC time (with precision up to millisecond) and monotonicity guarantees (originallyDateTime.UtcNow
doesn't guarantee, that returned value will always be greater than the one previously obtained due to nature of NTP and phenomenas like leap seconds). -
Random
which provides a thread-safe module for generating random operations. It also contains few extra functions not supplied originally bySystem.Random
class. -
LazyTask
which is going to work likelazy (factory)
data structure, but the factory method can return aTask
. In order to make this data structure non-blocking, thelazyTask.Value
call itself returns a task.- TODO: make a
lazyTask { ... }
computation expression.
- TODO: make a
- In progress: an implementation of Persistent Adaptative Radix Tree.
-
Enum
module, which provides aSeq
-like module functions (map
,filter
etc.), that are optimized to work directly on enumerators. The difference is that, Enum is aware of generic nature of enumerators, so it can inline and devirtualize certain calls. - Bunch of utility functions:
- Helper functions to work with
Task
andChannel
(including multi-channelselect
operator). -
Array
andArray.Simd
which provide vectorized variants of some operations (likeArray.Simd.contains
) that can be much faster. - Utility functions in form of
Span
,ReadOnlySpan
,Memory
,ReadOnlyMemory
andReadOnlySequence
modules. - Extra functions to
Map
type. - Other functions I missed from time to time:
~%
operator as equivalent ofop_Implicit
cast.=>
which creates a KeyValuePair out of its parameters.*
multiply operator forTimeSpan
.
- Helper functions to work with
-
AsyncSeq
module which enables dozens of operators overIAsyncEnumerable<'a>
interface, including element transformation, adding time dimensions, stream joining and splitting. - F# Async extensions:
- Functional
TaskCompletionSource<>
equivalentsIVar
(completable once) andMVar
(completable multiple times), both using thread-safe methods. AsyncBatch
which enables convenient batch processing of isolated parallel requests. Useful i.e. in GraphQL expressions.
- Functional
API
Atomic
atom
is meant to be an almost drop-in replacement of F# ref
cells. It offers a read/update operations that use efficient interlocked semantics to perform a thread-safe read/modify access on all supported operations.
open FSharp.Core
open FSharp.Core.Atomic.Operators
// Create an atomic cell. Currently it's limited to work only with:
//
// - Reference types (classes, non-struct records, discriminated unions, tuples or anonymous objects)
// - 32- and 64-bit integers (NOTE: interlocked 64-bit operations on 32-bit OSes are downgraded by .NET runtime)
// - booleans
// - 32- and 64-bit floating point numbers
// - booleans
let a = atom "hello"
// Receive stored value
!a // returns: "hello"
// Replace stored value with a new one and return the previously stored one
a := "Hello" // returns: "hello"
// Those operations can be joined together to receive swap mechanics with no temporary variables necessary
let a = atom 1
let b = atom 2
a := b := !a |> ignore // swap contents of `a` and `b`. Now !a = 2 and !b = 1
// You can also increment/decrement atomic values, effectivelly turning it into thread-safe counter
let a = atom 1
Atomic.incr a // returns 2
// You can also perform a conditional updates a.k.a. Compare-And-Swap or Compare-Exchange.
// NOTE: for reference types it uses a reference equality (pointer check) instead of structural equality
let value = "hello"
let a = atom value
if a |> Atomic.cas value "new-value" then
printfn "Successfully updated value of `a` to %s" !a
else
printfn "Value of `a` has been updated in the meantime to %s" !a
// If you want to perform a thread safe update no matter if
// atomic cell is concurrently modified by other threads you can still do that:
let users = atom Set.empty
// keep in mind, that update function should be pure and lightweight, as it may be called multiple times
users |> Atomic.update (fun s -> Set.add "Alice" s) //returns: a state of Set prior to update
Atomic update performance
Here's the performance check of given operations - the operation is about getting reading a stored value, modifying it, storing and returning updated result. All of these operations must happen in thread-safe manner. We're comparing:
- InterlockedLoop as a baseline which uses an
Interlocked.CompareExchange
done in a loop until value has been safelly updated. - AtomicUpdate which is a variant above wrapped into a generic functionality of
Atomic.update
function. - ObjectLock which uses
lock
method over object sync root. - Mutex which uses
Mutex
as a tool of control. - SemaphoreSlim which uses
SemaphoreSlim
- an optimized variant of mutex/semaphore.
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19043
AMD Ryzen 9 3950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=5.0.301
[Host] : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT DEBUG
DefaultJob : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT
Method | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|
InterlockedLoop | 3.111 ns | 0.0719 ns | 0.0673 ns | 1.00 | 0.00 | - | - | - | - |
AtomicUpdate | 3.192 ns | 0.0487 ns | 0.0456 ns | 1.03 | 0.03 | - | - | - | - |
ObjectLock | 8.830 ns | 0.0790 ns | 0.0739 ns | 2.84 | 0.07 | - | - | - | - |
Mutex | 438.290 ns | 1.9435 ns | 1.8180 ns | 140.92 | 2.88 | - | - | - | - |
SemaphoreSlim | 23.639 ns | 0.0370 ns | 0.0328 ns | 7.60 | 0.17 | - | - | - | - |
RWLock
It's an implementation of reentrant reader/writer lock, existing fully in user-space (no syscalls). It's async/await compatible, supports cancellation and offers a safe and ergonomic API:
open System.Threading
open FSharp.Control.Tasks.Affine
open FSharp.Core
// Lock guards access to a wrapped value, it's not possible to access it
// without acquiring a read or write access.
use lock = RWLock.reentrant 100
let doSomething (cancel: CancellationToken) = vtask {
// In order to read value, you must first obtain read or write lock handle
// which implement Disposable, so they can be released safely.
// Multiple readers are allowed to hold a lock at the same time
use! reader = lock.Read(cancel)
printfn "R: %i" reader.Value
// Readers are allowed to be upgraded into write handles. When write
// handle is hold, no other task is allowed to hold writers OR readers
// (however the same task is still allowed to have them).
// Another way to obtain write handle: `lock.Write(cancel)`
use! writer = reader.Upgrade(cancel)
let mutable w = writer // this is required, since F# doesn't have `use! mutable` syntax
w.Value <- w.Value + 1 // only writer has Value setter
// since combined with `use!`, both handles will be automatically
// released at the end of the scope
}
Vec
Vec<'a>
or 'a vec
is a persistent immutable data type, which provides a fairly fast add/remove operations (which append value to the tail of the collection as oposed to list), which are O(log32(n)) (which for an in-memory data structure is effectivelly close to O(1)). It's also optimized for for
loop traversals which are close in speed to native array.
// Create an empty vector.
let a = Vec.empty
// Initialize vector with function mapping indexes (0-based) to elements.
let a = Vec.init 100 (fun i -> i * i)
// ... or initialize it from other collection
let a = Vec.ofArray [|1;2;3|]
let b = Vec.ofSeq [1;2;3]
// Check if it's empty
if Vec.isEmpty a then printfn "Vector is empty"
// ... or get number of stored elements
let len = Vec.length a
// Add new element to the tail
let a = Vec.ofArray [|1;2|]
let b = a |> Vec.add 3 // result: b = [|1;2;3|]
// ... or add entire collection of elements at once
let c = a |> Vec.append [3;4;5] // result: c = [|1;2;3;4;5|]
// You can also replace an element at given index (without changing original structure)
let a = Vec.ofSeq ["Hello"; "world"; "!"]
let d = c |> Vec.replace 1 "F#" // result d = [|"Hello"; "F#"; "!"|]
// You can treat Vec as immutable stack
let a = Vec.ofSeq [1;2;3]
let (b, last) = Vec.pop a //result: b = [|1;2|], last = 3
// ... or just drop the last value
let b = Vec.initial a
// Vectors offer fast random access by index
let a = Vec.init 100 (fun i -> i * i)
let b = a |> Vec.item 51 // returns: ValueSome 2500
// You can also try to find an index of a value (if it exists), starting from either beginning or end of a vector
let x = a |> Vec.indexOf -199 // in case when value is not found -1 is returned
let y = a |> Vec.lastIndexOf 100
// There are also helper methods for a first and last element
let (ValueSome first) = a |> Vec.head
let (ValueSome last) = a |> Vec.last
// Just like array, vector also has `contains` function
if Vec.contains 100 a then printfn "Value 100 exists within a vector"
// ...or you can use more generic checkers
let exists = a |> Vec.exists (fun x -> x % 2 = 0)
let (ValueSome x) = a |> Vec.find (fun x -> x % 2 = 0)
... and other common collection operations like: iter
, iteri
, map
, mapi
, filter
, choose
, forall
, fold
, foldBack
, reduce
and rev
(reverse).
Vec<'a>
benchmarks
Belows are some of the benchmarks for this implementation:
Appending
Here, we're appending elements one by one in a loop of 1, 100, 1000 and 10 000 items (starting with an empty collection). Compared implementations are:
- MutableListAppend used as a baseline - it represents
System.Collections.Generic.List<'a>.Add(x)
. - ImmutableListAppend represents
System.Collections.Immutable.ImmutableList<'a>.Add(x)
. - ImmutableArrayAppend represents
System.Collections.Immutable.ImmutableArray<'a>.Add(x)
. - VecAppend which is a immutable vector as implemented by this library.
- FSharpxVectorAppend represents a
PersistentVector<'a>
from FSharpx.Collections, which is another implementation of the same data structure as the one implemented here.
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19043
AMD Ryzen 9 3950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=5.0.301
[Host] : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT DEBUG
DefaultJob : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT
Method | to_append | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|---|
MutableListAppend | 1 | 24.01 ns | 0.234 ns | 0.219 ns | 1.00 | 0.00 | 0.0105 | - | - | 88 B |
ImmutableListAppend | 1 | 31.81 ns | 0.165 ns | 0.147 ns | 1.32 | 0.01 | 0.0086 | - | - | 72 B |
ImmutableArrayAppend | 1 | 10.82 ns | 0.050 ns | 0.044 ns | 0.45 | 0.00 | 0.0038 | - | - | 32 B |
FSharpxVectorAppend | 1 | 138.81 ns | 0.491 ns | 0.460 ns | 5.78 | 0.06 | 0.0640 | 0.0001 | - | 536 B |
VecAppend | 1 | 31.74 ns | 0.286 ns | 0.267 ns | 1.32 | 0.02 | 0.0086 | - | - | 72 B |
MutableListAppend | 100 | 582.76 ns | 6.361 ns | 5.312 ns | 1.00 | 0.00 | 0.2618 | 0.0024 | - | 2192 B |
ImmutableListAppend | 100 | 16,342.72 ns | 117.443 ns | 98.070 ns | 28.05 | 0.36 | 4.1351 | 0.1068 | - | 34704 B |
ImmutableArrayAppend | 100 | 6,002.86 ns | 118.309 ns | 165.853 ns | 10.39 | 0.25 | 5.1155 | 0.0114 | - | 42800 B |
FSharpxVectorAppend | 100 | 5,926.07 ns | 117.629 ns | 130.744 ns | 10.24 | 0.30 | 3.0212 | 0.0229 | - | 25304 B |
VecAppend | 100 | 4,855.78 ns | 13.802 ns | 12.910 ns | 8.33 | 0.08 | 2.4033 | 0.0153 | - | 20136 B |
MutableListAppend | 1000 | 4,483.57 ns | 13.701 ns | 12.146 ns | 1.00 | 0.00 | 1.9836 | 0.1221 | - | 16600 B |
ImmutableListAppend | 1000 | 253,531.53 ns | 795.365 ns | 705.070 ns | 56.55 | 0.28 | 60.0586 | 11.7188 | - | 502896 B |
ImmutableArrayAppend | 1000 | 445,113.89 ns | 734.748 ns | 687.284 ns | 99.28 | 0.36 | 481.2012 | 9.2773 | - | 4028000 B |
FSharpxVectorAppend | 1000 | 56,181.22 ns | 570.053 ns | 533.228 ns | 12.54 | 0.13 | 30.2734 | 1.5869 | - | 253320 B |
VecAppend | 1000 | 49,773.56 ns | 277.159 ns | 259.255 ns | 11.10 | 0.07 | 24.5361 | 1.2817 | - | 205400 B |
MutableListAppend | 10000 | 78,526.62 ns | 487.892 ns | 456.375 ns | 1.00 | 0.00 | 41.6260 | 41.6260 | 41.6260 | 262456 B |
ImmutableListAppend | 10000 | 4,097,281.84 ns | 18,994.301 ns | 16,837.953 ns | 52.17 | 0.41 | 792.9688 | 296.8750 | 31.2500 | 6653616 B |
ImmutableArrayAppend | 10000 | 34,045,578.54 ns | 109,326.205 ns | 102,263.800 ns | 433.57 | 2.48 | 47593.7500 | 7281.2500 | - | 400280002 B |
FSharpxVectorAppend | 10000 | 617,216.00 ns | 4,030.169 ns | 3,769.823 ns | 7.86 | 0.08 | 313.4766 | 97.6563 | - | 2624120 B |
VecAppend | 10000 | 561,254.52 ns | 8,496.611 ns | 7,947.735 ns | 7.15 | 0.13 | 256.3477 | 79.1016 | - | 2146432 B |
Note: since FSharpx implementation internally represents stored elements as obj
, in case of value types there's an additional cost related to boxing.
Iterating
Here, we're iterating over 10, 1000 and 200 000 elements of the collections in a for item in collection do
loop to check the data structure overhead over that operation:
- MutableListEnumerate used as a baseline - it represents
System.Collections.Generic.List<'a>.Add(x)
. - ImmutableListEnumerate represents
System.Collections.Immutable.ImmutableList<'a>.Add(x)
. - ImmutableArrayEnumerate represents
System.Collections.Immutable.ImmutableArray<'a>.Add(x)
. - VecEnumerate which is a immutable vector as implemented by this library.
- FSharpxVectorEnumerate represents a
PersistentVector<'a>
from FSharpx.Collections, which is another implementation of the same data structure as the one implemented here.
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19043
AMD Ryzen 9 3950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=5.0.301
[Host] : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT DEBUG
DefaultJob : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT
Method | count | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|---|
MutableListEnumerate | 10 | 39.276 ns | 0.0803 ns | 0.0670 ns | 1.00 | 0.00 | - | - | - | - |
ImmutableListEnumerate | 10 | 559.366 ns | 3.2115 ns | 2.8469 ns | 14.24 | 0.09 | - | - | - | - |
ImmutableArrayEnumerate | 10 | 3.944 ns | 0.0590 ns | 0.0552 ns | 0.10 | 0.00 | - | - | - | - |
FSharpxVectorEnumerate | 10 | 197.010 ns | 0.8751 ns | 0.8186 ns | 5.02 | 0.02 | 0.0277 | - | - | 232 B |
VecEnumerate | 10 | 43.809 ns | 0.4378 ns | 0.4095 ns | 1.12 | 0.01 | - | - | - | - |
MutableListEnumerate | 1000 | 3,204.809 ns | 7.2374 ns | 6.7698 ns | 1.00 | 0.00 | - | - | - | - |
ImmutableListEnumerate | 1000 | 43,366.618 ns | 861.5367 ns | 846.1443 ns | 13.55 | 0.26 | - | - | - | - |
ImmutableArrayEnumerate | 1000 | 346.527 ns | 1.1570 ns | 0.9662 ns | 0.11 | 0.00 | - | - | - | - |
FSharpxVectorEnumerate | 1000 | 12,570.358 ns | 12.8635 ns | 12.0325 ns | 3.92 | 0.01 | 0.0153 | - | - | 232 B |
VecEnumerate | 1000 | 4,117.957 ns | 17.7939 ns | 16.6444 ns | 1.28 | 0.00 | - | - | - | - |
MutableListEnumerate | 200000 | 729,758.190 ns | 2,451.2115 ns | 2,292.8647 ns | 1.00 | 0.00 | - | - | - | - |
ImmutableListEnumerate | 200000 | 8,112,826.228 ns | 33,897.5019 ns | 30,049.2525 ns | 11.12 | 0.05 | - | - | - | 2 B |
ImmutableArrayEnumerate | 200000 | 68,341.672 ns | 99.3552 ns | 77.5700 ns | 0.09 | 0.00 | - | - | - | - |
FSharpxVectorEnumerate | 200000 | 3,812,772.604 ns | 12,600.5878 ns | 11,786.5976 ns | 5.22 | 0.02 | - | - | - | 233 B |
VecEnumerate | 200000 | 814,029.893 ns | 2,160.2270 ns | 2,020.6776 ns | 1.12 | 0.00 | - | - | - | - |
Building a collection out of array
Here, we are converting an array of 1000 elements into a final representation of a given collection:
- ResizeArrayOfArray represents construction of
new System.Collections.Generic.List<'a>(items)
. - ImmutableListOfArray represents construction via
ImmutableList.Create<User>(items)
. - FSharpListOfArray represents
List.ofArray items
- VecOfArray represents
Vec.ofArray items
- an implementation of immutable vector made in this repository. - FSharpxVecOfArray represents a
PersistentVector.ofSeq items
from FSharpx.Collections, which is another implementation of the same data structure as the one implemented here.
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19043
AMD Ryzen 9 3950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=5.0.301
[Host] : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT DEBUG
DefaultJob : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT
Method | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|
ResizeArrayOfArray | 921.8 ns | 1.39 ns | 1.30 ns | 0.9623 | 0.0534 | - | 7.87 KB |
ImmutableListOfArray | 22,494.3 ns | 141.69 ns | 132.54 ns | 5.7373 | 1.0986 | - | 46.92 KB |
FSharpListOfArray | 9,403.1 ns | 54.43 ns | 50.91 ns | 3.8223 | 0.5417 | - | 31.25 KB |
FSharpxVecOfArray | 42,502.8 ns | 501.70 ns | 469.29 ns | 1.2207 | 0.0610 | - | 10.26 KB |
VecOfArray | 5,047.4 ns | 49.37 ns | 43.77 ns | 2.3346 | 0.1144 | - | 19.1 KB |
Note: since FSharpx implementation internally represents stored elements as obj
, in case of value types there's an additional cost related to boxing.
Channel
Channel module allows you to operate over System.Threading.Channel<T>
in more F#-idiomatic way.
// create a writer/reader pair for multi-producer/single-consumer bounded channel
let writer, reader = Channel.boundedMpsc 10
for item in [|1..9|]
do! writer.WriteAsync(1)
// you can try to pull all items currently stored inside of the channel and write them into span
let buffer = Array.zeroCreate 6
let span = Span buffer
let itemsRead = Channel.readTo span reader
// select can be used to await on multiple channels output, returning first pulled value
// (in case when multiple channels have values read, the order of channels in select is used)
let otherWriter, otherReader = Channel.boundedMpsc 10
do! otherWriter.WriteAsync("hello")
let! value = Channel.select [|
otherReader,
// Channel.map can be used to modify `ChannelReader<'a>` into `ChannelReader<'b>`
reader |> Channel.map (fun item -> string item)
|]
AsyncSeq
AsyncSeq<'a>
alone is an alias over IAsyncEnumerable<'a>
interface available in modern .NET.
AsyncSeq.withCancellation: CancellationToken -> AsyncSeq<'a> -> AsyncSeq<'a>
which explicitly assigns aCancellationToken
to async enumerators created from async sequence.timer: TimeSpan -> AsyncSeq<TimeSpan>
will produce async sequence that will asynchronously "tick" in given intervals, returning time passed from one MoveNextAsync call to another.into: bool -> ChannelWriter<'a> -> AsyncSeq<'a> -> ValueTask
will flush all contents of async sequence into given channel, returning value task that can be awaited to completion. Set flag to determine if channel should be closed once async sequence completes sending all element.tryHead: AsyncSeq<'a> -> ValueTask<'a option>
will try to (asynchronously) return first element of the async sequence, if it's not empty.tryLast: AsyncSeq<'a> -> ValueTask<'a option>
will try to (asynchronously) return last element of the async sequence, if it's not empty.collect: AsyncSeq<'a> -> ValueTask<'a[]>
will try to materialize all elements of an async sequence into an array, returning value task that will complete one async sequence has no more elements to pull.fold: ('s->'a->ValueTask<'s>) -> 's -> AsyncSeq<'a> -> ValueTask<'s>
will execute given function over elements of async sequence, incrementally building accumulating value and returning it eventually once sequence is completed.reduce: ('a->'a->'a) -> AsyncSeq<'a> -> ValueTask<'a option>
works like fold, but instead of picking initial state it will use first async sequence element. If sequence was empty, a None value will be returned eventually.scan: ('s->'a->'a) -> 's -> AsyncSeq<'a> -> AsyncSeq<'s>
works likefold
, but with every incremental step it will emit new (possibly altered) accumulated value as subsequent async sequence.iteri: (int64->'a->ValueTask) -> AsyncSeq<'a> -> ValueTask
,iter: ('a->ValueTask) -> AsyncSeq<'a> -> ValueTask
andignore: AsyncSeq<'a> -> ValueTask
all let you iterate over elements of the async sequence returning value task that finishes once sequence has been emptied.mapParallel: int -> ('a->ValueTask<'b>) -> AsyncSeq<'a> -> AsyncSeq<'b>
lets you (potentially asynchronously) map async sequence elements in parallel. Due to its parallel nature, produced result sequence may output mapped elements in different order than the input.mapi: (int64->'a->'b) -> AsyncSeq<'a> -> AsyncSeq<'b>
,map: ('a->'b) -> AsyncSeq<'a> -> AsyncSeq<'b>
,mapAsynci: (int64->'a->ValueTask<'b>) -> AsyncSeq<'a> -> AsyncSeq<'b>
andmapAsync: ('a->ValueTask<'b>) -> AsyncSeq<'a> -> AsyncSeq<'b>
are all variants of applying mapping function over values (and potentially indexes) produced by upstream into new async sequence of mapped elements pushed downstream.filter: ('a->bool) -> AsyncSeq<'a> -> AsyncSeq<'b>
simply let's you pick (using given predicate function) which of whe upstream elements should be pushed downstream.choose: ('a->ValueTask<'b option>) -> AsyncSeq<'a> -> AsyncSeq<'b>
works like combination ofmapAsync
andfilter
: it allows you to both map and filter out (by making mapping function return None) elements from the upstream.recover: (exn->'ValueTask<'a option) -> AsyncSeq<'a> -> AsyncSeq<'a>
allows you to react on potential exceptions that may have happen while pulling upstream, producing new event out of exception or completing the async sequence (if error mapping function returned None).onError: (exn->AsyncSeq<'a>) -> AsyncSeq<'a> -> AsyncSeq<'a>
allows you to react on potential exceptions that may have happen while pulling upstream, by providing an new stream that will replace failing one. This function works recursively.merge: AsyncSeq<'a> seq -> AsyncSeq<'a>
let's you merge many async sequences into one. Sequences are pulled sequentially: one must be completed first before next one will start to be pulled.mergeParallel: AsyncSeq<'a> seq -> AsyncSeq<'a>
works likemerge
but in parallel fashion: sequences are being pulled at the same time, without waiting for any one to completion.bind: ('a->AsyncSeq<'b>) -> AsyncSeq<'a> -> AsyncSeq<'b>
let's you create a new async sequence for each element pulled from upstream: sequences produced this way will be pulled until completion before a next element from upstream will be pulled.skip: int64 -> AsyncSeq<'a> -> AsyncSeq<'a>
,skipWhile: ('a->bool) -> AsyncSeq<'a> -> AsyncSeq<'a>
andskipWithin: TimeSpan -> AsyncSeq<'a> -> AsyncSeq<'a>
all let you skip elements pulled from the upstream, either given number of them, until a predicate function is satisfied or for a given time window.take: int64 -> AsyncSeq<'a> -> AsyncSeq<'a>
,takeWhile: ('a->bool) -> AsyncSeq<'a> -> AsyncSeq<'a>
andtakeWithin: TimeSpan -> AsyncSeq<'a> -> AsyncSeq<'a>
all let you take elements pulled from the upstream, either given number of them, until a predicate function is satisfied or for a given time window. Once take requirement is met, async sequence will be completed.split: ('a->bool) -> AsyncSeq<'a> -> AsyncSeq<'a>*AsyncSeq<'a>
lets you split upstream async sequence in two. First will be pulled until given predicate function is satisfied. Once that happens it will be completed and second sequence (with element that caused split) will be pulled for all other elements coming from upstream. That means, the predicate is checked only once.delay: ('a->TimeSpan) -> AsyncSeq<'a> -> AsyncSeq<'a>
will delay all elements pulled from upstream by given time returned from function parameter. Function parameter can be used eg. for creating exponential backoff delays if necessary.throttle: int -> TimeSpan -> AsyncSeq<'a> -> AsyncSeq<'a>
will apply rate limiting to upstream async sequence, limiting number of elements returned downstream to not reach over given number in specified time span.keepAlive: TimeSpan -> ('a->'a) -> AsyncSeq<'a> -> AsyncSeq<'a>
will monitor rate in which elements from upstream are incoming. They will fall over specified time span, the last received element will be applied into generator function, that will produce a new element that will be generated instead to keep the rate of upstream in specified time bounds.grouped: int -> AsyncSeq<'a> -> AsyncSeq<'a[]>
groups incoming elements into buckets of given size and pull them downstream as one. Final bucket (send right before sequence completion) can be smaller than requested size.buffered: int -> AsyncSeq<'a> -> AsyncSeq<'a[]>
let's you to disassociate rate of elements pulled downstream from upstream, especially useful when downstream pulls elements at slower rate. Upstream will be continuously (and asynchronously) pulled for elements up until given buffer size is reached. If in the meantime downstream will pull this sequence, it will receive array with all elements pulled so far.zipWith: ('a->'b->'c) -> AsyncSeq<'a> -> AsyncSeq<'b> -> AsyncSeq<'c>
andzip: AsyncSeq<'a> -> AsyncSeq<'b> -> AsyncSeq<'a*'b>
will merge together two async sequences, combining their elements pairwise before sending them downstream.ofSeq: 'a seq -> AsyncSeq<'a>
creates async sequence out of the given sequence.unfold: ('s->ValueTask<('s * 'a) option>) -> 's -> AsyncSeq<'a>
creates an async sequence out of the given generator function. That function takes initial state and continuously applies it producing new state and element to be emitted as result of async sequence. If None was returned, async sequence will be completed.ofFunc: (int64->ValueTask<'a option>) -> AsyncSeq<'a>
creates an async sequence out of the given generator function. That function gets a counter value (incremented with each call) and produce element that will be emitted as sequence's output. If None was returned, the sequence will complete.ofAsync: Async<'a> -> AsyncSeq<'a>
,ofTask: Task<'a> -> AsyncSeq<'a>
andofTaskCancellable: (CancellationToken->Task<'a>) -> AsyncSeq<'a>
all allow to transition F# Async or TPL Tasks into async enumerators. Cancellable version receives cancellation token passed once GetAsyncEnumerator is called and will be invoked on every such call.empty: AsyncSeq<'a>
returns an async sequence that has no elements and will immediately finish once pulled.repeat: 'a -> AsyncSeq<'a>
will produce infinite sequence of given element.singleton: 'a -> AsyncSeq<'a>
wraps given value with async sequence, that will produce that value only once.failed: exn -> AsyncSeq<'a>
returns a malfunctioning async sequence, that will fail with given exception when its enumerator will try to be pulled.ofOption: 'a option -> AsyncSeq<'a>
wraps given option with async sequence, that will produce option's value or complete immediately if option was None.ofChannel: ChannelReader<'a> -> AsyncSeq<'a>
modifies given channel into async sequence, that will complete once writer part of the channel will be explicitly completed.onDispose: (unit->ValueTask) -> AsyncSeq<'a> -> AsyncSeq<'a>
attaches given function to upstream async sequence, that will be called together with that sequence's enumerator dispose call.deduplicate: ('a->'a->bool) -> AsyncSeq<'a> -> AsyncSeq<'a>
will skip over consecutive elements of the upstream async sequence, if they are considered equal according to a given equality function. Unlike "distinct" operators, it doesn't produce globally unique values (as this would require buffering all values), it only deduplicates neighbor elements.interleave: ('a->'a->'a) -> AsyncSeq<'a> -> AsyncSeq<'a>
will produce an extra element (generated using specified function) in between two consecutive elements pulled from upstream.groupBy: int -> int -> ('a->'b) -> AsyncSeq<'a> -> AsyncSeq<('b * AsyncSeq<'a>)>
will group elements of upstream async sequence by the given parameter function, producting async sequence of key and subsequence of grouped elements. Because of their nature, pull-based sequences are not ideal to work safely with grouping operators. For this reason, only up to specified number of subsequences can be active at any time. Additionally all active sequences must be periodically pulled in order to let upstream be called for other concurrent sequences to avoid deadlocking (which is amortized by every group having an internal buffer of a given capacity).
Actors
This is simple actor implementation based on TPL and System.Threading.Channels. These actors don't offer location transparency mechanisms. By default actor's mailbox is unbounded, but this may be configured.
[<Sealed>
type Counter(init) =
inherit UnboundedActor<int>()
let mutable state = 0
override this.Receive msg = uunitVTask {
match msg with
| Add v -> state <- state + v
| Done -> this.Complete() // don't use `do! ctx.DisposeAsync(false)` in this context, as it may deadlock the actor
}
use actor = new Counter(0)
// send a message to an actor
do! actor.Send (Add 1) // for unbouded actors you don't need to await as send will never block
// each actor contains CancellationToken that may be used to bound method execution to a lifecycle of an actor.
let! file = File.ReadAllAsync("file.txt", actor.CancellationToken)
// actor extends ChannelWriter class, so it can be used in this context
AsyncSeq.ofSeq [1;2;3] |> AsyncSeq.into false actor
// actor may be closed gracefully (letting it process all stashed messages first) or abrutly
do! actor.DisposeAsync(true) // abrupt close
do! actor.DisposeAsync(false) // graceful close
// you can also await for actor's termination - this task will complete once actor gets disposed, it may also
// complete with failure as actor doesn't handle exceptions by itself (use try/with if that's necessary)
do! actor.Terminated
Current performance benchmarks as compared to F# MailboxProcessor - example of actor-based counter, processing 1 000 000 messages:
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19043
AMD Ryzen 9 3950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=5.0.301
[Host] : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT DEBUG
DefaultJob : .NET Core 5.0.7 (CoreCLR 5.0.721.25508, CoreFX 5.0.721.25508), X64 RyuJIT
Method | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|
MailboxProcessorPost | 243.63 ms | 4.840 ms | 10.209 ms | 1.00 | 0.00 | 58500.0000 | 2000.0000 | 1500.0000 | 477855.98 KB |
UnboundedActorSend | 46.46 ms | 0.829 ms | 0.776 ms | 0.19 | 0.01 | - | - | - | 1.03 KB |
UnboundedActorSendAsync | 64.40 ms | 2.205 ms | 6.502 ms | 0.27 | 0.03 | 285.7143 | 285.7143 | 285.7143 | 1408.84 KB |
FSharpActorBounded | 686.02 ms | 13.542 ms | 12.005 ms | 2.79 | 0.13 | 36000.0000 | - | - | 296217.38 KB |
FSharpAsyncActor
is F#MailboxProcessor
implementation.FSharpActorUnbounded
is Actor implementation from this lib using default (possibly synchronous) message passing.FSharpActorUnboundedAsync
is Actor implementation from this lib using fully asynchronous message passing.
Schedules
While Task.Delay()
is great way to introduce delays of various kind, sometimes we may need to introduce more advanced delay mechanisms: exponential backoffs, jittered (randomized) delays etc.
For these cases Scheduler
module provides composable API, which can be used for multi-step time spacing mechanism:
// create policy that will execute up to 4 times, with exponential backoff starting from 1 second (1sec, 2sec, 4sec, 8sec)
// with each tick slightly randomized by 10% margin
let policy =
Schedule.exponential 2.0 (TimeSpan.FromSeconds 1.)
|> Schedule.jittered 0.9 1.1
|> Schedule.times 4
// now execute something up to 5 times, gathering result and all potential exceptions that happened along the way
let! (result, failures) =
policy
|> Schedule.retry (fun _ -> uvtask {
return! someHttpCall ()
}) CancellationToken.None
// you can also execute some action N+1 times, with each time delayed from previous one
let sw = Stopwatch.StartNew()
let! results =
Schedule.after (TimeSpan.FromMilliseconds 100.)
|> Schedule.times 3
|> Schedule.spaced (fun _ -> uvtask {
printfn "Time passed: %ims" sw.ElapsedMilliseconds
return 1
}) CancellationToken.None
// Results should look more or less like:
// Time passed: 0ms
// Time passed: 100ms
// Time passed: 200ms
// Time passed: 300ms
Schedules provide simple composable API and can be serialized/deserialized by any serializer able to handle discriminated unions (keep in mind that Schedule.ofSeq
will most likely eagerly evaluate provided sequence in order to serialize it).
Schedules are immutable (advancing a schedule simply produces another, updated schedule), so that they can be safely shared between threads and persisted if necessary. Schedules can be converted into ordinary sequences using Schedule.toSeq
function.