Open.ChannelExtensions
A set of extensions for optimizing/simplifying System.Threading.Channels usage.
Click here for detailed documentation.
Highlights
Read & Write
With optional concurrency levels.
- Reading all entries in a channel.
- Writing all entries from a source to a channel.
- Piping (consuming) all entries to a buffer (channel).
.AsAsyncEnumerable()
(IAsyncEnumerable
) support for .NET Standard 2.1+ and .NET Core 3+
ChannelReader
Operations
Special Filter
Transform
Batch
Join
Installation
Install-Package Open.ChannelExtensions
Examples
Being able to define an asynchronous pipeline with best practice usage using simple expressive syntax:
await Channel
.CreateBounded<T>(10)
.SourceAsync(source /* IEnumerable<Task<T>> */)
.PipeAsync(
maxConcurrency: 2,
capacity: 5,
transform: asyncTransform01)
.Pipe(transform02, /* capacity */ 3)
.ReadAllAsync(finalTransformedValue => {
// Do something async with each final value.
});
await source /* IEnumerable<T> */
.ToChannel(boundedSize: 10, singleReader: true)
.PipeAsync(asyncTransform01, /* capacity */ 5)
.Pipe(
maxConcurrency: 2,
capacity: 3,
transform: transform02)
.ReadAll(finalTransformedValue => {
// Do something with each final value.
});
Reading (until the channel is closed)
One by one read each entry from the channel
await channel.ReadAll(
entry => { /* Processing Code */ });
await channel.ReadAll(
(entry, index) => { /* Processing Code */ });
await channel.ReadAllAsync(
async entry => { await /* Processing Code */ });
await channel.ReadAllAsync(
async (entry, index) => { await /* Processing Code */ });
Read concurrently each entry from the channel
await channel.ReadAllConcurrently(
maxConcurrency,
entry => { /* Processing Code */ });
await channel.ReadAllConcurrentlyAsync(
maxConcurrency,
async entry => { await /* Processing Code */ });
Writing
If complete
is true
, the channel will be closed when the source is empty.
Dump a source enumeration into the channel
// source can be any IEnumerable<T>.
await channel.WriteAll(source, complete: true);
// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>.
await channel.WriteAllAsync(source, complete: true);
Synchronize reading from the source and process the results concurrently
// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>.
await channel.WriteAllConcurrentlyAsync(
maxConcurrency, source, complete: true);
Filter & Transform
// Filter and transform when reading.
channel.Reader
.Filter(predicate) // .Where()
.Transform(selector) // .Select()
.ReadAllAsync(async value => {/*...*/});
Batching
values.Reader
.Batch(10 /*batch size*/)
.WithTimeout(1000) // Any non-empty batches are flushed every second.
.ReadAllAsync(async batch => {/*...*/});
Joining
batches.Reader
.Join()
.ReadAllAsync(async value => {/*...*/});
Pipelining / Transforming
Transform and buffer entries
// Transform values in a source channel to new unbounded channel.
var transformed = channel.Pipe(
async value => /* transformation */);
// Transform values in a source channel to new unbounded channel with a max concurrency of X.
const int X = 4;
var transformed = channel.Pipe(
X, async value => /* transformation */);
// Transform values in a source channel to new bounded channel bound of N entries.
const int N = 5;
var transformed = channel.Pipe(
async value => /* transformation */, N);
// Transform values in a source channel to new bounded channel bound of N entries with a max concurrency of X.
const int X = 4;
const int N = 5;
var transformed = channel.Pipe(
X, async value => /* transformation */, N);
// or
transformed = channel.Pipe(
maxConcurrency: X,
capacity: N,
transform: async value => /* transformation */);