• Stars
    star
    372
  • Rank 114,319 (Top 3 %)
  • Language
    C#
  • License
    MIT License
  • Created almost 6 years ago
  • Updated 3 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A set of extensions for optimizing/simplifying System.Threading.Channels usage.

Open.ChannelExtensions

NuGet

A set of extensions for optimizing/simplifying System.Threading.Channels usage.

Click here for detailed documentation.

Highlights

Read & Write

With optional concurrency levels.

  • Reading all entries in a channel.
  • Writing all entries from a source to a channel.
  • Piping (consuming) all entries to a buffer (channel).
  • .AsAsyncEnumerable() (IAsyncEnumerable) support for .NET Standard 2.1+ and .NET Core 3+

Special ChannelReader Operations

  • Filter
  • Transform
  • Batch
  • Join

Installation

Install-Package Open.ChannelExtensions

Examples

Being able to define an asynchronous pipeline with best practice usage using simple expressive syntax:

await Channel
    .CreateBounded<T>(10)
    .SourceAsync(source /* IEnumerable<Task<T>> */)
    .PipeAsync(
        maxConcurrency: 2,
        capacity: 5,
        transform: asyncTransform01)
    .Pipe(transform02, /* capacity */ 3)
    .ReadAllAsync(finalTransformedValue => {
        // Do something async with each final value.
    });
await source /* IEnumerable<T> */
    .ToChannel(boundedSize: 10, singleReader: true)
    .PipeAsync(asyncTransform01, /* capacity */ 5)
    .Pipe(
        maxConcurrency: 2,
        capacity: 3,
        transform: transform02)
    .ReadAll(finalTransformedValue => {
        // Do something with each final value.
    });

Reading (until the channel is closed)

One by one read each entry from the channel

await channel.ReadAll(
    entry => { /* Processing Code */ });
await channel.ReadAll(
    (entry, index) => { /* Processing Code */ });
await channel.ReadAllAsync(
    async entry => { await /* Processing Code */ });
await channel.ReadAllAsync(
    async (entry, index) => { await /* Processing Code */ });

Read concurrently each entry from the channel

await channel.ReadAllConcurrently(
    maxConcurrency,
    entry => { /* Processing Code */ });
await channel.ReadAllConcurrentlyAsync(
    maxConcurrency,
    async entry => { await /* Processing Code */ });

Writing

If complete is true, the channel will be closed when the source is empty.

Dump a source enumeration into the channel

// source can be any IEnumerable<T>.
await channel.WriteAll(source, complete: true);
// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>.
await channel.WriteAllAsync(source, complete: true);

Synchronize reading from the source and process the results concurrently

// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>.
await channel.WriteAllConcurrentlyAsync(
    maxConcurrency, source, complete: true);

Filter & Transform

// Filter and transform when reading.
channel.Reader
    .Filter(predicate) // .Where()
    .Transform(selector) // .Select()
    .ReadAllAsync(async value => {/*...*/});

Batching

values.Reader
    .Batch(10 /*batch size*/)
    .WithTimeout(1000) // Any non-empty batches are flushed every second.
    .ReadAllAsync(async batch => {/*...*/});

Joining

batches.Reader
    .Join()
    .ReadAllAsync(async value => {/*...*/});

Pipelining / Transforming

Transform and buffer entries

// Transform values in a source channel to new unbounded channel.
var transformed = channel.Pipe(
    async value => /* transformation */);
// Transform values in a source channel to new unbounded channel with a max concurrency of X.
const int X = 4;
var transformed = channel.Pipe(
    X, async value => /* transformation */);
// Transform values in a source channel to new bounded channel bound of N entries.
const int N = 5;
var transformed = channel.Pipe(
    async value => /* transformation */, N);
// Transform values in a source channel to new bounded channel bound of N entries with a max concurrency of X.
const int X = 4;
const int N = 5;
var transformed = channel.Pipe(
    X, async value => /* transformation */, N);

// or
transformed = channel.Pipe(
    maxConcurrency: X,
    capacity: N,
    transform: async value => /* transformation */);

More Repositories

1

Open.Database.Extensions

Useful set of utilities and abstractions for simplifying modern data-access operations and ensuring DI compatibility.
C#
22
star
2

Open.Collections

Useful set of collections, and extensions for collections with thread-safe read-write access options.
C#
18
star
3

Open.Serialization

DI/IoC agnostic interfaces for injecting any serialization library.
C#
15
star
4

Open.Disposable.ObjectPools

A set of variations on ObjectPool implementations with differing underlying collections.
C#
14
star
5

Open.Numeric.Primes

Methods and extensions for prime number detection and discovery.
C#
12
star
6

Open.Text

A set of useful extensions for working with strings, string-segments, spans, enums, and value formatting.
C#
12
star
7

Open.Caching

Useful set of utilites and extensions for simplifying cache usage.
C#
12
star
8

Open.Disposable

Provides a set of useful classes when implementing a disposable.
C#
9
star
9

Open.MemoryExtensions

Useful set of extensions for working with Array, Memory, Span. Primarily for ordering/sorting vectors.
C#
7
star
10

Open.Linq.AsyncExtensions

Exposes Linq methods any Task<IEnumerable<T>>.
C#
3
star
11

Open.Range

Range<T> implementation with useful extensions. Useful for ranged value data-sets.
C#
3
star
12

Open.Hierarchy

Interfaces and classes helful in managing tree-like data structures.
C#
3
star
13

Open.Threading.Dataflow

Useful set of extensions and classes for simplifying Dataflow implementations.
C#
3
star
14

Open.Threading.Tasks

A set of utilities and extensions for working with Tasks.
C#
3
star
15

Open.Text.CSV

A set of utilities for reading and writing CSV data in C#.
C#
3
star
16

Open.Threading

Useful set of extensions and classes for simplifying and optimizing thread safe operations and synchronization.
C#
2
star
17

Open.Numeric

Extensions for simplifying working with numbers.
C#
2
star
18

Open.DateTime.Extensions

Simple set of DateTime extensions extensions.
C#
2
star
19

Open.IO.Extensions

IAsyncEnumerable extensions for handling streams.
C#
2
star
20

Open.Compression

Useful set of static methods and extensions for compression including GZip.
C#
2
star
21

Open.Collections.Numeric

Useful set of extensions for processing collections of numerical data.
C#
2
star
22

Open.TaskManager

Server and client classes for managing remote tasks.
C#
2
star
23

Open.Evaluation

This set of interfaces and classes can be used to create serializable functions for use with other systems that require a reproducable structure.
C#
2
star
24

Open.TokenProvider

A simple interface and implementation for properly requesting and managing bearer tokens.
C#
1
star
25

Open.Diagnostics

Diagnostic extensions and utilities.
C#
1
star
26

Open.Arithmetic

Simple set of arithmetic extensions.
C#
1
star
27

Open.XmlReaderExtensions

Extensions for simplifying use of an XmlReader including the use of XPath to acquire data.
1
star
28

Open.Threading.ReadWrite

Useful set of extensions and classes for simplifying and optimizing read-write synchronization.
C#
1
star
29

Open.Cloneable

Provides a common interface for ICloneable with Generic variant.
C#
1
star
30

Open.Measuring

Simple library for defining dimensions and converting units of measure.
TypeScript
1
star
31

Open.Lazy.Extensions

Simple set of Lazy extensions.
C#
1
star
32

Open.Coercion

A useful set of classes to facilitate validation and implicit coercion of types. Commonly strings, and enums.
C#
1
star
33

Open.MongoDB.Extensions

Useful set of utilities and abstractions for simplifying MongoDB operations and ensuring dependency injection compatibility.
C#
1
star
34

Open.RandomizationExtensions

A useful set of extensions for selecting random number from sets.
C#
1
star
35

Open.Data

Utilities and extensions for working with the System.Data namespace.
C#
1
star