What is it and why should I care?
AsyncCollections
is a library that contains a bunch of fast lock-free thread-safe producer-consumer collections tailored for asynchronous usage. Think System.Collections.Concurrent
but fully async
-ready.
Nuget package
https://www.nuget.org/packages/AsyncCollections/
API
IAsyncCollection
Most of collections here implement this interface (I'd say it pretty much describes what this library is all about):
public interface IAsyncCollection<T> : IReadOnlyCollection<T>
{
void Add( T item );
ValueTask<T> TakeAsync( CancellationToken cancellationToken );
int AwaiterCount { get; } // An amount of pending item requests
}
A copy of corefx ValueTask<T>
implementation is used at the moment, it's going to be replaced by the original one when 2.0 is released.
AsyncQueue and AsyncStack
These classes provide queue- and stack-based implementations of IAsyncCollection<T>
.
AsyncQueue<int> queue = new AsyncQueue<int>();
ValueTask<int> itemTask = queue.TakeAsync();
queue.Add( 42 ); // at this point itemTask completes with Result = 42
AsyncStack<int> stack = new AsyncStack<int>();
stack.Add( 64 );
stack.Add( 128 );
int first = await stack.TakeAsync(); // 128
int second = await stack.TaskAsync(); // 64
AsyncCollection
Think System.Concurrent.BlockingCollection
for async
usage. AsyncCollection<T>
wraps anything that implements IProducerConcumerCollection<T>
and turns it into an IAsyncCollection<T>
:
var asyncBag = new AsyncCollection<int>( new ConcurrentBag<int>() );
asyncBag.Add( 984 );
int item = await asyncBag.TakeAsync();
Another trait it shares with BlockingCollection
is a static TakeFromAny
method (which is actually called TakeFromAnyAsync
here):
var bag1 = new AsyncCollection<int>( new ConcurrentBag<int>() );
var bag2 = new AsyncCollection<int>( new ConcurrentBag<int>() );
AsyncCollection<int>[] collections = new [] { bag1, bag2 };
// Will return asynchronously when one of the queues gets an item.
AnyResult<int> result = await AsyncCollection<int>.TakeFromAnyAsync( collections );
// Index of the collection that returned the item.
int index = result.CollectionIndex;
// The actual item that has been returned.
int value = result.Value;
AsyncBoundedPriorityQueue
This is a priority queue with a limited number of priority levels:
var queue = new AsyncBoundedPriorityQueue<int>( priorityLevels: 3 );
queue.Add( 1000 ); // item is added at the lowest priority by default
queue.AddTopPriority( 42 );
queue.AddLowPriority( 999 );
queue.Add( 16, priority: 1 );
// 42 16 1000 999
while ( true )
{
Debug.Write( await queue.TakeAsync() );
Debug.Write( " " );
}
AsyncBatchQueue
This one doesn't implement IAsyncCollection<T>
, because it provides a slightly different experience. Just like AsyncQueue<T>
, AsyncBatchQueue<T>
allows you to add items synchronously and retreive them asynchronously, but the difference is you consume them in batches of the specified size:
AsyncBatchQueue<int> queue = new AsyncBatchQueue<int>( batchSize: 2 );
queue.Add( 42 );
queue.Add( 64 );
queue.Add( 128 );
// Immediately returns a batch of items 42 and 64.
IReadOnlyList<int> batch1 = await queue.TakeAsync();
// Asynchronously returns when the next batch is full
// (128 and whatever the next item will be)
IReadOnlyList<int> batch2 = await queue.TakeAsync();
There's a bunch of scenarios when you might want to make items available for consumption even if the specified batch size is not reached yet. First of all, you can do it manually by calling Flush
:
AsyncBatchQueue<int> queue = new AsyncBatchQueue<int>( 9999 );
queue.Add( 42 );
queue.Flush();
// This will return a batch of 1 item.
IReadOnlyList<int> batch = await queue.TakeAsync();
Second, there's a typical case to flush pending items when a certain amount of time has passed. You can use a WithFlushEvery
extension method to achieve this:
AsyncBatchQueue<int> queue = new AsyncBatchQueue<int>( 10 );
using ( TimerAsyncBatchQueue<int> timerQueue = queue.WithFlushEvery( TimeSpan.FromSeconds( 5 ) ) )
{
timerQueue.Add( 42 );
// This will asynchronously return a batch of 1 item in 5 seconds.
IReadOnlyList<int> batch = await timerQueue.TakeAsync();
}
// Disposing the TimerAsyncBatchQueue simply disposes the inner timer.
// You can continue using the original queue though.
If the queue has no pending items, Flush
won't do anything, so you don't have to worry about creating a lot of empty batches when doing manual/timer flushing.
Benchmarks
The idea behind all the benchmarks is to measure how long it takes to add a fixed amount of items to the collection and them to take them back, using different amount of producer and consumer tasks. The results below were measured by using this configuration:
BenchmarkDotNet=v0.9.6.0
OS=Microsoft Windows NT 6.2.9200.0
Processor=Intel(R) Core(TM) i5-2500 CPU @ 3.30GHz, ProcessorCount=4
Frequency=3239558 ticks, Resolution=308.6841 ns, Timer=TSC
HostCLR=MS.NET 4.0.30319.42000, Arch=64-bit RELEASE [RyuJIT]
JitModules=clrjit-v4.6.1080.0
Type=AsyncQueueBenchmark Mode=Throughput Platform=X64
Jit=RyuJit LaunchCount=1
AsyncQueue benchmark
There are multiple ways to achieve the functionality of AsyncQueue
:
AsyncQueue
itselfAsyncCollection
that wrapsConcurrentQueue
Nito.AsyncEx.AsyncCollection
(https://github.com/StephenCleary/AsyncEx) that wrapsConcurrentQueue
BlockingCollection
(this isn't really asynchronous and will starve the thread pool on large consumer count, but still is an interesting thing to compare against)System.Threading.Tasks.Dataflow.BufferBlock
Method | ConsumerTasks | ProducerTasks | Median | StdDev | Gen 0 | Gen 1 | Gen 2 | Bytes Allocated/Op |
-------------------------------------------- |-------------- |-------------- |---------------- |-------------- |--------- |------ |------ |------------------- |
AsyncQueue | 1 | 1 | 914.6110 us | 4.6121 us | 3,73 | 0,39 | - | 57Â 889,00 |
AsyncCollection( ConcurrentQueue ) | 1 | 1 | 969.8161 us | 4.2769 us | 2,37 | - | - | 31Â 965,97 |
Nito.AsyncEx.AsyncCollection | 1 | 1 | 67,751.7260 us | 104.9323 us | 580,78 | 0,98 | - | 7Â 446Â 934,72 |
System.Concurrent.BlockingCollection | 1 | 1 | 2,108.3728 us | 17.0404 us | 2,25 | - | - | 35Â 318,88 |
System.Threading.Tasks.Dataflow.BufferBlock | 1 | 1 | 1,673.1337 us | 7.6434 us | 19,88 | - | - | 263Â 489,31 |
-------------------------------------------- |-------------- |-------------- |---------------- |-------------- |--------- |------ |------ |------------------- |
AsyncQueue | 1 | 3 | 2,527.1599 us | 8.7526 us | 3,16 | 1,13 | - | 71Â 401,25 |
AsyncCollection( ConcurrentQueue ) | 1 | 3 | 2,904.8245 us | 14.8086 us | 5,02 | 0,94 | - | 91Â 164,24 |
Nito.AsyncEx.AsyncCollection | 1 | 3 | 82,357.5946 us | 2,624.8432 us | 704,33 | 36,10 | - | 10Â 253Â 908,07 |
System.Concurrent.BlockingCollection | 1 | 3 | 6,337.2439 us | 64.5049 us | 4,56 | - | - | 78Â 099,77 |
System.Threading.Tasks.Dataflow.BufferBlock | 1 | 3 | 3,738.7718 us | 48.2398 us | 90,86 | - | - | 1Â 192Â 165,45 |
-------------------------------------------- |-------------- |-------------- |---------------- |-------------- |--------- |------ |------ |------------------- |
AsyncQueue | 3 | 1 | 1,393.2974 us | 89.1313 us | 5,84 | 0,90 | - | 100Â 325,39 |
AsyncCollection( ConcurrentQueue ) | 3 | 1 | 5,931.3962 us | 655.4277 us | 98,69 | 6,88 | 1,18 | 1Â 662Â 971,75 |
Nito.AsyncEx.AsyncCollection | 3 | 1 | 66,368.2408 us | 83.2923 us | 559,00 | 1,00 | - | 7Â 175Â 669,84 |
System.Concurrent.BlockingCollection | 3 | 1 | 2,506.8905 us | 15.4637 us | 2,09 | - | - | 29Â 234,91 |
System.Threading.Tasks.Dataflow.BufferBlock | 3 | 1 | 1,680.4107 us | 13.3545 us | 19,07 | - | - | 273Â 553,48 |
-------------------------------------------- |-------------- |-------------- |---------------- |-------------- |--------- |------ |------ |------------------- |
AsyncQueue | 3 | 3 | 2,673.4342 us | 18.6929 us | 4,28 | 1,17 | - | 89Â 555,79 |
AsyncCollection( ConcurrentQueue ) | 3 | 3 | 3,304.1851 us | 40.6244 us | 6,69 | 0,37 | - | 113Â 443,76 |
Nito.AsyncEx.AsyncCollection | 3 | 3 | 249,474.1560 us | 1,340.5204 us | 1Â 275,00 | 1,00 | - | 16Â 685Â 767,14 |
System.Concurrent.BlockingCollection | 3 | 3 | 6,826.4026 us | 99.6510 us | 4,37 | - | - | 80Â 584,81 |
System.Threading.Tasks.Dataflow.BufferBlock | 3 | 3 | 3,901.0241 us | 55.7299 us | 90,86 | - | - | 1Â 243Â 432,93 |
-------------------------------------------- |-------------- |-------------- |---------------- |-------------- |--------- |------ |------ |------------------- |
AsyncBatchQueue benchmark
There are less alternatives to AsyncBatchQueue
exist in the wild that I know of. As a matter of fact, the only thing I can come up with is System.Threading.Tasks.Dataflow.BatchBlock
, so here it is:
Method | ConsumerTasks | ProducerTasks | Median | StdDev | Gen 0 | Gen 1 | Gen 2 | Bytes Allocated/Op |
------------------------------------------- |-------------- |-------------- |-------------- |----------- |------- |------ |------ |------------------- |
HellBrick.AsyncCollections.AsyncBatchQueue | 1 | 1 | 581.7463 us | 3.2790 us | 125,68 | 4,50 | - | 77Â 281,41 |
System.Threading.Tasks.Dataflow.BatchBlock | 1 | 1 | 891.3451 us | 1.4635 us | 150,34 | - | - | 91Â 860,30 |
------------------------------------------- |-------------- |-------------- |-------------- |----------- |------- |------ |------ |------------------- |
HellBrick.AsyncCollections.AsyncBatchQueue | 1 | 3 | 1,891.6112 us | 10.6826 us | 278,83 | 12,86 | - | 178Â 598,12 |
System.Threading.Tasks.Dataflow.BatchBlock | 1 | 3 | 2,437.1000 us | 4.6214 us | 381,67 | - | - | 230Â 144,44 |
------------------------------------------- |-------------- |-------------- |-------------- |----------- |------- |------ |------ |------------------- |
HellBrick.AsyncCollections.AsyncBatchQueue | 3 | 1 | 891.6061 us | 2.7588 us | 145,75 | 6,90 | - | 98Â 127,34 |
System.Threading.Tasks.Dataflow.BatchBlock | 3 | 1 | 900.5719 us | 5.4585 us | 173,48 | - | - | 110Â 865,66 |
------------------------------------------- |-------------- |-------------- |-------------- |----------- |------- |------ |------ |------------------- |
HellBrick.AsyncCollections.AsyncBatchQueue | 3 | 3 | 2,126.5695 us | 19.0822 us | 485,00 | 18,00 | - | 316Â 228,75 |
System.Threading.Tasks.Dataflow.BatchBlock | 3 | 3 | 2,470.9311 us | 19.1620 us | 395,91 | - | - | 242Â 047,11 |
------------------------------------------- |-------------- |-------------- |-------------- |----------- |------- |------ |------ |------------------- |