MessagePipe
MessagePipe is a high-performance in-memory/distributed messaging pipeline for .NET and Unity. It supports all cases of Pub/Sub usage, mediator pattern for CQRS, EventAggregator of Prism(V-VM decoupling), IPC(Interprocess Communication)-RPC, etc.
- Dependency-injection first
- Filter pipeline
- better event
- sync/async
- keyed/keyless
- buffered/bufferless
- singleton/scoped
- broadcast/response(+many)
- in-memory/interprocess/distributed
MessagePipe is faster than standard C# event and 78 times faster than Prism's EventAggregator.
Of course, memory allocation per publish operation is less(zero).
Also providing roslyn-analyzer to prevent subscription leak.
Getting Started
For .NET, use NuGet. For Unity, please read Unity section.
PM> Install-Package MessagePipe
MessagePipe is built on top of a Microsoft.Extensions.DependencyInjection
(for Unity, VContainer
or Zenject
or Builtin Tiny DI
) so set up via ConfigureServices
in .NET Generic Host. Generic Host is widely used in .NET such as ASP.NET Core, MagicOnion, ConsoleAppFramework, MAUI, WPF(with external support), etc so easy to setup.
using MessagePipe;
using Microsoft.Extensions.DependencyInjection;
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
services.AddMessagePipe(); // AddMessagePipe(options => { }) for configure options
})
Get the IPublisher<T>
for publisher, Get the ISubscribe<T>
for subscriber, like a Logger<T>
. T
can be any type, primitive(int, string, etc...), struct, class, enum, etc.
using MessagePipe;
public struct MyEvent { }
public class SceneA
{
readonly IPublisher<MyEvent> publisher;
public SceneA(IPublisher<MyEvent> publisher)
{
this.publisher = publisher;
}
void Send()
{
this.publisher.Publish(new MyEvent());
}
}
public class SceneB
{
readonly ISubscriber<MyEvent> subscriber;
readonly IDisposable disposable;
public SceneB(ISubscriber<MyEvent> subscriber)
{
var bag = DisposableBag.CreateBuilder(); // composite disposable for manage subscription
subscriber.Subscribe(x => Console.WriteLine("here")).AddTo(bag);
disposable = bag.Build();
}
void Close()
{
disposable.Dispose(); // unsubscribe event, all subscription **must** Dispose when completed
}
}
It is similar to event, but decoupled by type as key. The return value of Subscribe is IDisposable
, which makes it easier to unsubscribe than event. You can release many subscriptions at once by DisposableBag
(CompositeDisposable
). See the Managing Subscription and Diagnostics section for more details.
The publisher/subscriber(internally we called MessageBroker) is managed by DI, it is possible to have different broker for each scope. Also, all subscriptions are unsubscribed when the scope is disposed, which prevents subscription leaks.
Default is singleton, you can configure
MessagePipeOptions.InstanceLifetime
toSingleton
orScoped
.
IPublisher<T>/ISubscriber<T>
is keyless(type only) however MessagePipe has similar interface IPublisher<TKey, TMessage>/ISubscriber<TKey, TMessage>
that is keyed(topic) interface.
For example, our real usecase, There is an application that connects Unity and MagicOnion (a real-time communication framework like SignalR) and delivers it via a browser by Blazor. At that time, we needed something to connect Blazor's page (Browser lifecycle) and MagicOnion's Hub (Connection lifecycle) to transmit data. We also need to distribute the connections by their IDs.
Browser <-> Blazor <- [MessagePipe] -> MagicOnion <-> Unity
We solved this with the following code.
// MagicOnion(similar as SignalR, realtime event framework for .NET and Unity)
public class UnityConnectionHub : StreamingHubBase<IUnityConnectionHub, IUnityConnectionHubReceiver>, IUnityConnectionHub
{
readonly IPublisher<Guid, UnitEventData> eventPublisher;
readonly IPublisher<Guid, ConnectionClose> closePublisher;
Guid id;
public UnityConnectionHub(IPublisher<Guid, UnitEventData> eventPublisher, IPublisher<Guid, ConnectionClose> closePublisher)
{
this.eventPublisher = eventPublisher;
this.closePublisher = closePublisher;
}
override async ValueTask OnConnected()
{
this.id = Guid.Parse(Context.Headers["id"]);
}
override async ValueTask OnDisconnected()
{
this.closePublisher.Publish(id, new ConnectionClose()); // publish to browser(Blazor)
}
// called from Client(Unity)
public Task<UnityEventData> SendEventAsync(UnityEventData data)
{
this.eventPublisher.Publish(id, data); // publish to browser(Blazor)
}
}
// Blazor
public partial class BlazorPage : ComponentBase, IDisposable
{
[Parameter]
public Guid ID { get; set; }
[Inject]
ISubscriber<Guid, UnitEventData> UnityEventSubscriber { get; set; }
[Inject]
ISubscriber<Guid, ConnectionClose> ConnectionCloseSubscriber { get; set; }
IDisposable subscription;
protected override void OnInitialized()
{
// receive event from MagicOnion(that is from Unity)
var d1 = UnityEventSubscriber.Subscribe(ID, x =>
{
// do anything...
});
var d2 = ConnectionCloseSubscriber.Subscribe(ID, _ =>
{
// show disconnected thing to view...
subscription?.Dispose(); // and unsubscribe events.
});
subscription = DisposableBag.Create(d1, d2); // combine disposable.
}
public void Dispose()
{
// unsubscribe event when browser is closed.
subscription?.Dispose();
}
}
The main difference of Reactive Extensions' Subject is has no
OnCompleted
. OnCompleted may or may not be used, making it very difficult to determine the intent to the observer(subscriber). Also, we usually subscribe to multiple events from the same (different event type)publisher, and it is difficult to handle duplicate OnCompleted in that case. For this reason, MessagePipe only provides a simple Publish(OnNext). If you want to convey completion, please receive a separate event and perform dedicated processing there.
In other words, this is the equivalent of Relay in RxSwift.
In addition to standard Pub/Sub, MessagePipe supports async handlers, mediator patterns with handlers that accept return values, and filters for pre-and-post execution customization.
This image is a visualization of the connection between all those interfaces.
You may be confused by the number of interfaces, but many functions can be written with a similar, unified API.
Publish/Subscribe
Publish/Subscribe interface has keyed(topic) and keyless, sync and async interface.
// keyless-sync
public interface IPublisher<TMessage>
{
void Publish(TMessage message);
}
public interface ISubscriber<TMessage>
{
IDisposable Subscribe(IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}
// keyless-async
public interface IAsyncPublisher<TMessage>
{
// async interface's publish is fire-and-forget
void Publish(TMessage message, CancellationToken cancellationToken = default(CancellationToken));
ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken = default(CancellationToken));
ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default(CancellationToken));
}
public interface IAsyncSubscriber<TMessage>
{
IDisposable Subscribe(IAsyncMessageHandler<TMessage> asyncHandler, params AsyncMessageHandlerFilter<TMessage>[] filters);
}
// keyed-sync
public interface IPublisher<TKey, TMessage>
where TKey : notnull
{
void Publish(TKey key, TMessage message);
}
public interface ISubscriber<TKey, TMessage>
where TKey : notnull
{
IDisposable Subscribe(TKey key, IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}
// keyed-async
public interface IAsyncPublisher<TKey, TMessage>
where TKey : notnull
{
void Publish(TKey key, TMessage message, CancellationToken cancellationToken = default(CancellationToken));
ValueTask PublishAsync(TKey key, TMessage message, CancellationToken cancellationToken = default(CancellationToken));
ValueTask PublishAsync(TKey key, TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default(CancellationToken));
}
public interface IAsyncSubscriber<TKey, TMessage>
where TKey : notnull
{
IDisposable Subscribe(TKey key, IAsyncMessageHandler<TMessage> asyncHandler, params AsyncMessageHandlerFilter<TMessage>[] filters);
}
All are available in the form of IPublisher/Subscribe<T>
in the DI. async handler can await all subscribers completed by await PublishAsync
. Asynchronous methods can work sequentially or in parallel, depending on AsyncPublishStrategy
(defaults is Parallel
, can be changed by MessagePipeOptions
or by specifying at publish time). If you don't need to wait, you can call void Publish
to act as fire-and-forget.
The before and after of execution can be changed by passing a custom filter. See the Filter section for details.
If an error occurs, it will be propagated to the caller and subsequent subscribers will be stopped. This behavior can be changed by writing a filter to ignore errors.
ISingleton***, IScoped***
I(Async)Publisher(Subscriber)'s lifetime belongs to MessagePipeOptions.InstanceLifetime
. However if declare with ISingletonPublisher<TMessage>
/ISingletonSubscriber<TKey, TMessage>
, ISingletonAsyncPublisher<TMessage>
/ISingletonAsyncSubscriber<TKey, TMessage>
then used singleton lifetime. Also IScopedPublisher<TMessage>
/IScopedSubscriber<TKey, TMessage>
, IScopedAsyncPublisher<TMessage>
/IScopedAsyncSubscriber<TKey, TMessage>
uses scoped lifetime.
Buffered
IBufferedPublisher<TMessage>/IBufferedSubscriber<TMessage>
pair is similar as BehaviorSubject
or Reactive Extensions(More equal is RxSwift's BehaviorRelay
). It returns latest value on Subscribe
.
var p = provider.GetRequiredService<IBufferedPublisher<int>>();
var s = provider.GetRequiredService<IBufferedSubscriber<int>>();
p.Publish(999);
var d1 = s.Subscribe(x => Console.WriteLine(x)); // 999
p.Publish(1000); // 1000
var d2 = s.Subscribe(x => Console.WriteLine(x)); // 1000
p.Publish(9999); // 9999, 9999
DisposableBag.Create(d1, d2).Dispose();
If
TMessage
is class and does not have latest value(null), does not send value on Subscribe.
Keyed buffered publisher/subscriber does not exist because difficult to avoid memory leak of (unused)key and keep latest value.
EventFactory
Using EventFactory
, you can create generic IPublisher/ISubscriber
, IAsyncPublisher/IAsyncSubscriber
, IBufferedPublisher/IBufferedSubscriber
, IBufferedAsyncPublisher/IBufferedAsyncSubscriber
like C# events, with a Subscriber tied to each instance, not grouped by type.
MessagePipe has better properties than a normal C# event
- Using Subscribe/Dispose instead of
+=
,-=
, easy to management subscription - Both sync and async support
- Both bufferless and buffered support
- Enable unsubscribe all subscription from publisher.dispose
- Attaches invocation pipeline behaviour by Filter
- To monitor subscription leak by
MessagePipeDiagnosticsInfo
- TO prevent subscription leak by
MessagePipe.Analyzer
public class BetterEvent : IDisposable
{
// using MessagePipe instead of C# event/Rx.Subject
// store Publisher to private field(declare IDisposablePublisher/IDisposableAsyncPublisher)
IDisposablePublisher<int> tickPublisher;
// Subscriber is used from outside so public property
public ISubscriber<int> OnTick { get; }
public BetterEvent(EventFactory eventFactory)
{
// CreateEvent can deconstruct by tuple and set together
(tickPublisher, OnTick) = eventFactory.CreateEvent<int>();
// also create async event(IAsyncSubscriber) by `CreateAsyncEvent`
// eventFactory.CreateAsyncEvent
}
int count;
void Tick()
{
tickPublisher.Publish(count++);
}
public void Dispose()
{
// You can unsubscribe all from Publisher.
tickPublisher.Dispose();
}
}
If you want to create event outside of DI, see Global Provider section.
IDisposablePublisher<int> tickPublisher;
public ISubscriber<int> OnTick { get; }
ctor()
{
(tickPublisher, OnTick) = GlobalMessagePipe.CreateEvent<int>();
}
Request/Response/All
Similar as MediatR, implement support of mediator pattern.
public interface IRequestHandler<in TRequest, out TResponse>
{
TResponse Invoke(TRequest request);
}
public interface IAsyncRequestHandler<in TRequest, TResponse>
{
ValueTask<TResponse> InvokeAsync(TRequest request, CancellationToken cancellationToken = default);
}
For example, declare handler for Ping type.
public readonly struct Ping { }
public readonly struct Pong { }
public class PingPongHandler : IRequestHandler<Ping, Pong>
{
public Pong Invoke(Ping request)
{
Console.WriteLine("Ping called.");
return new Pong();
}
}
You can get handler like this.
class FooController
{
IRequestHandler<Ping, Pong> requestHandler;
// automatically instantiate PingPongHandler.
public FooController(IRequestHandler<Ping, Pong> requestHandler)
{
this.requestHandler = requestHandler;
}
public void Run()
{
var pong = this.requestHandler.Invoke(new Ping());
Console.WriteLine("PONG");
}
}
For more complex implementation patterns, this Microsoft documentation is applicable.
Declare many request handlers, you can use IRequestAllHandler
, IAsyncRequestAllHandler
instead of single handler.
public interface IRequestAllHandler<in TRequest, out TResponse>
{
TResponse[] InvokeAll(TRequest request);
IEnumerable<TResponse> InvokeAllLazy(TRequest request);
}
public interface IAsyncRequestAllHandler<in TRequest, TResponse>
{
ValueTask<TResponse[]> InvokeAllAsync(TRequest request, CancellationToken cancellationToken = default);
ValueTask<TResponse[]> InvokeAllAsync(TRequest request, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default);
IAsyncEnumerable<TResponse> InvokeAllLazyAsync(TRequest request, CancellationToken cancellationToken = default);
}
public class PingPongHandler1 : IRequestHandler<Ping, Pong>
{
public Pong Invoke(Ping request)
{
Console.WriteLine("Ping1 called.");
return new Pong();
}
}
public class PingPongHandler2 : IRequestHandler<Ping, Pong>
{
public Pong Invoke(Ping request)
{
Console.WriteLine("Ping1 called.");
return new Pong();
}
}
class BarController
{
IRequestAllHandler<Ping, Pong> requestAllHandler;
public FooController(IRequestAllHandler<Ping, Pong> requestAllHandler)
{
this.requestAllHandler = requestAllHandler;
}
public void Run()
{
var pongs = this.requestAllHandler.InvokeAll(new Ping());
Console.WriteLine("PONG COUNT:" + pongs.Length);
}
}
Subscribe Extensions
ISubscriber
(IAsyncSubscriber
) interface requires IMessageHandler<T>
to handle message.
public interface ISubscriber<TMessage>
{
IDisposable Subscribe(IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}
However, the extension method allows you to write Action<T>
directly.
public static IDisposable Subscribe<TMessage>(this ISubscriber<TMessage> subscriber, Action<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters)
public static IDisposable Subscribe<TMessage>(this ISubscriber<TMessage> subscriber, Action<TMessage> handler, Func<TMessage, bool> predicate, params MessageHandlerFilter<TMessage>[] filters)
public static IObservable<TMessage> AsObservable<TMessage>(this ISubscriber<TMessage> subscriber, params MessageHandlerFilter<TMessage>[] filters)
public static IAsyncEnumerable<TMessage> AsAsyncEnumerable<TMessage>(this IAsyncSubscriber<TMessage> subscriber, params AsyncMessageHandlerFilter<TMessage>[] filters)
public static ValueTask<TMessage> FirstAsync<TMessage>(this ISubscriber<TMessage> subscriber, CancellationToken cancellationToken, params MessageHandlerFilter<TMessage>[] filters)
public static ValueTask<TMessage> FirstAsync<TMessage>(this ISubscriber<TMessage> subscriber, CancellationToken cancellationToken, Func<TMessage, bool> predicate, params MessageHandlerFilter<TMessage>[] filters)
Also, the Func<TMessage, bool>
overload can filter messages by predicate (internally implemented with PredicateFilter, where Order is int.MinValue and is always checked first).
AsObservable
can convert message pipeline to IObservable<T>
, it can handle by Reactive Extensions(in Unity, you can use UniRx
). AsObservable
exists in sync subscriber(keyless, keyed, buffered).
AsAsyncEnumerable
can convert message pipeline to IAsyncEnumerable<T>
, it can handle by async LINQ and async foreach. AsAsyncEnumerable
exists in async subscriber(keyless, keyed, buffered).
FirstAsync
gets the first value of message. It is similar as AsObservable().FirstAsync()
, AsObservable().Where().FirstAsync()
. If uses CancellationTokenSource(TimeSpan)
then similar as AsObservable().Timeout().FirstAsync()
. Argument of CancellationToken
is required to avoid task leak.
// for Unity, use cts.CancelAfterSlim(TIimeSpan) instead.
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
var value = await subscriber.FirstAsync(cts.Token);
FirstAsync
exists in both sync and async subscriber(keyless, keyed, buffered).
Filter
Filter system can hook before and after method invocation. It is implemented with the Middleware pattern, which allows you to write synchronous and asynchronous code with similar syntax. MessagePipe provides different filter types - sync (MessageHandlerFilter<T>
), async (AsyncMessageHandlerFilter<T>
), request (RequestHandlerFilter<TReq, TRes>
) and async request (AsyncRequestHandlerFilter<TReq, TRes>
). To implement other concerete filters the above filter types can be extended.
Filters can be specified in three places - global(by MessagePipeOptions.AddGlobalFilter
), per handler type, and per subscription. These filters are sorted according to the Order specified in each of them, and are generated when subscribing.
Since the filter is generated on a per subscription basis, the filter can have a state.
public class ChangedValueFilter<T> : MessageHandlerFilter<T>
{
T lastValue;
public override void Handle(T message, Action<T> next)
{
if (EqualityComparer<T>.Default.Equals(message, lastValue))
{
return;
}
lastValue = message;
next(message);
}
}
// uses(per subscribe)
subscribe.Subscribe(x => Console.WriteLine(x), new ChangedValueFilter<int>(){ Order = 100 });
// add per handler type(use generics filter, write open generics)
[MessageHandlerFilter(typeof(ChangedValueFilter<>), 100)]
public class WriteLineHandler<T> : IMessageHandler<T>
{
public void Handle(T message) => Console.WriteLine(message);
}
// add per global
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
services.AddMessagePipe(options =>
{
options.AddGlobalMessageHandlerFilter(typeof(ChangedValueFilter<>), 100);
});
});
use the filter by attribute, you can use these attributes: [MessageHandlerFilter(type, order)]
, [AsyncMessageHandlerFilter(type, order)]
, [RequestHandlerFilter(type, order)]
, [AsyncRequestHandlerFilter(type, order)]
.
These are idea showcase of filter.
public class PredicateFilter<T> : MessageHandlerFilter<T>
{
private readonly Func<T, bool> predicate;
public PredicateFilter(Func<T, bool> predicate)
{
this.predicate = predicate;
}
public override void Handle(T message, Action<T> next)
{
if (predicate(message))
{
next(message);
}
}
}
public class LockFilter<T> : MessageHandlerFilter<T>
{
readonly object gate = new object();
public override void Handle(T message, Action<T> next)
{
lock (gate)
{
next(message);
}
}
}
public class IgnoreErrorFilter<T> : MessageHandlerFilter<T>
{
readonly ILogger<IgnoreErrorFilter<T>> logger;
public IgnoreErrorFilter(ILogger<IgnoreErrorFilter<T>> logger)
{
this.logger = logger;
}
public override void Handle(T message, Action<T> next)
{
try
{
next(message);
}
catch (Exception ex)
{
logger.LogError(ex, ""); // error logged, but do not propagate
}
}
}
public class DispatcherFilter<T> : MessageHandlerFilter<T>
{
readonly Dispatcher dispatcher;
public DispatcherFilter(Dispatcher dispatcher)
{
this.dispatcher = dispatcher;
}
public override void Handle(T message, Action<T> next)
{
dispatcher.BeginInvoke(() =>
{
next(message);
});
}
}
public class DelayRequestFilter : AsyncRequestHandlerFilter<int, int>
{
public override async ValueTask<int> InvokeAsync(int request, CancellationToken cancellationToken, Func<int, CancellationToken, ValueTask<int>> next)
{
await Task.Delay(TimeSpan.FromSeconds(request));
var response = await next(request, cancellationToken);
return response;
}
}
Managing Subscription and Diagnostics
Subscribe returns IDisposable
; when call Dispose
then unsubscribe. A better reason than event is that it is easy to Unsubscribe. To manage multiple IDisposable, you can use CompositeDisposable
in Rx(UniRx) or DisposableBag
included in MessagePipe.
IDisposable disposable;
void OnInitialize(ISubscriber<int> subscriber)
{
var d1 = subscriber.Subscribe(_ => { });
var d2 = subscriber.Subscribe(_ => { });
var d3 = subscriber.Subscribe(_ => { });
// static DisposableBag: DisposableBag.Create(1~7(optimized) or N);
disposable = DisposableBag.Create(d1, d2, d3);
}
void Close()
{
// dispose all subscription
disposable?.Dispose();
}
IDisposable disposable;
void OnInitialize(ISubscriber<int> subscriber)
{
// use builder pattern, you can use subscription.AddTo(bag)
var bag = DisposableBag.CreateBuilder();
subscriber.Subscribe(_ => { }).AddTo(bag);
subscriber.Subscribe(_ => { }).AddTo(bag);
subscriber.Subscribe(_ => { }).AddTo(bag);
disposable = bag.Build(); // create final composite IDisposable
}
void Close()
{
// dispose all subscription
disposable?.Dispose();
}
IDisposable disposable;
void OnInitialize(ISubscriber<int> subscriber)
{
var bag = DisposableBag.CreateBuilder();
// calling once(or x count), you can use DisposableBag.CreateSingleAssignment to hold subscription reference.
var d = DisposableBag.CreateSingleAssignment();
// you can invoke Dispose in handler action.
// assign disposable, you can use `SetTo` and `AddTo` bag.
// or you can use d.Disposable = subscriber.Subscribe();
subscriber.Subscribe(_ => { d.Dispose(); }).SetTo(d).AddTo(bag);
disposable = bag.Build();
}
void Close()
{
disposable?.Dispose();
}
The returned IDisposable
value must be handled. If it is ignored, it will leak. However Weak reference, which is widely used in WPF, is an anti-pattern. All subscriptions should be managed explicitly.
You can monitor subscription count by MessagePipeDiagnosticsInfo
. It can get from service provider(or DI).
public sealed class MessagePipeDiagnosticsInfo
{
/// <summary>Get current subscribed count.</summary>
public int SubscribeCount { get; }
/// <summary>
/// When MessagePipeOptions.EnableCaptureStackTrace is enabled, list all stacktrace on subscribe.
/// </summary>
public StackTraceInfo[] GetCapturedStackTraces(bool ascending = true);
/// <summary>
/// When MessagePipeOptions.EnableCaptureStackTrace is enabled, groped by caller of subscribe.
/// </summary>
public ILookup<string, StackTraceInfo> GetGroupedByCaller(bool ascending = true)
}
If you monitor SubscribeCount, you can check leak of subscription.
public class MonitorTimer : IDisposable
{
CancellationTokenSource cts = new CancellationTokenSource();
public MonitorTimer(MessagePipeDiagnosticsInfo diagnosticsInfo)
{
RunTimer(diagnosticsInfo);
}
async void RunTimer(MessagePipeDiagnosticsInfo diagnosticsInfo)
{
while (!cts.IsCancellationRequested)
{
// show SubscribeCount
Console.WriteLine("SubscribeCount:" + diagnosticsInfo.SubscribeCount);
await Task.Delay(TimeSpan.FromSeconds(5), cts.Token);
}
}
public void Dispose()
{
cts.Cancel();
}
}
Also, by enabling MessagePipeOptions.EnableCaptureStackTrace (disabled by default), the location of the subscribed location can be displayed, making it easier to find the location of the leak if it exists.
Check the Count of GroupedByCaller, and if any of them show abnormal values, then the stack trace is where it occurs, and you probably ignore Subscription.
for Unity, Window -> MessagePipe Diagnostics
window is useful for monitoring subscritpion. It visualizes MessagePipeDianogsticsInfo
.
To Enable use of the MessagePipeDiagnostics window, require to set up GlobalMessagePipe
.
// VContainer
public class MessagePipeDemo : VContainer.Unity.IStartable
{
public MessagePipeDemo(IObjectResolver resolver)
{
// require this line.
GlobalMessagePipe.SetProvider(resolver.AsServiceProvider());
}
}
// Zenject
void Configure(DiContainer container)
{
GlobalMessagePipe.SetProvider(container.AsServiceProvider());
}
// builtin
var prodiver = builder.BuildServiceProvider();
GlobalMessagePipe.SetProvider(provider);
Analyzer
In previous section, we anounce The returned IDisposable value **must** be handled
. To prevent subscription leak, we provide roslyn analyzer.
PM> Install-Package MessagePipe.Analyzer
This will raise an error for unhandled Subscribe
.
This analyzer can use after Unity 2020.2(see: Roslyn analyzers and ruleset files document). MessagePipe.Analyzer.dll
exists in releases page.
Currently Unity's analyzer support is incomplete. We are complementing analyzer support with editor extension, please check the Cysharp/CsprojModifier.
IDistributedPubSub / MessagePipe.Redis
For the distributed(networked) Pub/Sub, you can use IDistributedPublisher<TKey, TMessage>
, IDistributedSubscriber<TKey, TMessage>
instead of IAsyncPublisher
.
public interface IDistributedPublisher<TKey, TMessage>
{
ValueTask PublishAsync(TKey key, TMessage message, CancellationToken cancellationToken = default);
}
public interface IDistributedSubscriber<TKey, TMessage>
{
// and also without filter overload.
public ValueTask<IAsyncDisposable> SubscribeAsync(TKey key, IMessageHandler<TMessage> handler, MessageHandlerFilter<TMessage>[] filters, CancellationToken cancellationToken = default);
public ValueTask<IAsyncDisposable> SubscribeAsync(TKey key, IAsyncMessageHandler<TMessage> handler, AsyncMessageHandlerFilter<TMessage>[] filters, CancellationToken cancellationToken = default);
}
IAsyncPublisher
means in-memory Pub/Sub. Since processing over the network is fundamentally different, you need to use a different interface to avoid confusion.
Redis is available as a standard network provider.
PM> Install-Package MessagePipe.Redis
use AddMessagePipeRedis
to enable redis provider.
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
services.AddMessagePipe();
services.AddMessagePipeRedis(IConnectionMultiplexer | IConnectionMultiplexerFactory, configure);
})
IConnectionMultiplexer
overload, you can pass StackExchange.Redis's ConnectionMultiplexer
directly. Implement own IConnectionMultiplexerFactory
to allow for per-key distribution and use from connection pools.
MessagePipeRedisOptions
, you can configure serialization.
public sealed class MessagePipeRedisOptions
{
public IRedisSerializer RedisSerializer { get; set; }
}
public interface IRedisSerializer
{
byte[] Serialize<T>(T value);
T Deserialize<T>(byte[] value);
}
In default uses MessagePack for C#'s ContractlessStandardResolver
. You can change to use other MessagePackSerializerOptions
by new MessagePackRedisSerializer(options)
or implement own serializer wrapper.
MessagePipe has in-memory IDistributedPublisher/Subscriber for local test usage.
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
var config = ctx.Configuration.Get<MyConfig>();
services.AddMessagePipe();
if (config.IsLocal)
{
// use in-memory IDistributedPublisher/Subscriber in local.
services.AddInMemoryDistributedMessageBroker();
}
else
{
// use Redis IDistributedPublisher/Subscriber
services.AddMessagePipeRedis();
}
});
InterprocessPubSub, IRemoteAsyncRequest / MessagePipe.Interprocess
For the interprocess(NamedPipe/UDP/TCP) Pub/Sub(IPC), you can use IDistributedPublisher<TKey, TMessage>
, IDistributedSubscriber<TKey, TMessage>
similar as MessagePipe.Redis
.
PM> Install-Package MessagePipe.Interprocess
MessagePipe.Interprocess is also exsits on Unity(except NamedPipe).
use AddMessagePipeUdpInterprocess
, AddMessagePipeTcpInterprocess
, AddMessagePipeNamedPipeInterprocess
, AddMessagePipeUdpInterprocessUds
, AddMessagePipeTcpInterprocessUds
to enable interprocess provider(Uds is Unix domain socket, most performant option).
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
services.AddMessagePipe();
services.AddMessagePipeUdpInterprocess("127.0.0.1", 3215, configure); // setup host and port.
// services.AddMessagePipeTcpInterprocess("127.0.0.1", 3215, configure);
// services.AddMessagePipeNamedPipeInterprocess("messagepipe-namedpipe", configure);
// services.AddMessagePipeUdpInterprocessUds("domainSocketPath")
// services.AddMessagePipeTcpInterprocessUds("domainSocketPath")
})
public async P(IDistributedPublisher<string, int> publisher)
{
// publish value to remote process.
await publisher.PublishAsync("foobar", 100);
}
public async S(IDistributedSubscriber<string, int> subscriber)
{
// subscribe remote-message with "foobar" key.
await subscriber.SubscribeAsync("foobar", x =>
{
Console.WriteLine(x);
});
}
when injected IDistributedPublisher
, process will be server
, start to listen client. when injected IDistributedSubscriber
, process will be client
, start to connect to server. when DI scope is closed, server/client connection is closed.
Udp is connectionless protocol so does not require server is started before client connect. However protocol limitation, does not send over 64K message. We're recommend to use this if message is not large.
Namedpipe is 1:1 connection, can not connect multiple subscribers.
Tcp has no such restrictions and is the most flexible of all the options.
In default uses MessagePack for C#'s ContractlessStandardResolver
for message serialization. You can change to use other MessagePackSerializerOptions
by MessagePipeInterprocessOptions.MessagePackSerializerOptions.
services.AddMessagePipeUdpInterprocess("127.0.0.1", 3215, options =>
{
// You can configure other options, `InstanceLifetime` and `UnhandledErrorHandler`.
options.MessagePackSerializerOptions = StandardResolver.Options;
});
For IPC-RPC, you can use IRemoteRequestHandler<in TRequest, TResponse>
that invoke remote IAsyncRequestHandler<TRequest, TResponse>
. using TcpInterprocess
or NamedPipeInterprocess
enabled it.
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
services.AddMessagePipe();
services.AddMessagePipeTcpInterprocess("127.0.0.1", 3215, x =>
{
x.HostAsServer = true; // if remote process as server, set true(otherwise false(default)).
});
});
// example: server handler
public class MyAsyncHandler : IAsyncRequestHandler<int, string>
{
public async ValueTask<string> InvokeAsync(int request, CancellationToken cancellationToken = default)
{
await Task.Delay(1);
if (request == -1)
{
throw new Exception("NO -1");
}
else
{
return "ECHO:" + request.ToString();
}
}
}
// client
async void A(IRemoteRequestHandler<int, string> remoteHandler)
{
var v = await remoteHandler.InvokeAsync(9999);
Console.WriteLine(v); // ECHO:9999
}
For Unity, requires to import MessagePack-CSharp package and needs slightly different configuration.
// example of VContainer
var builder = new ContainerBuilder();
var options = builder.RegisterMessagePipe(configure);
var sc = builder.AsServiceCollection(); // require to convert ServiceCollection to enable Intereprocess
var interprocessOptions = sc.AddMessagePipeTcpInterprocess();
// register manually.
// IDistributedPublisher/Subscriber
sc.RegisterTcpInterprocessMessageBroker<int, int>(interprocessOptions);
// RemoteHandler
builder.RegisterAsyncRequestHandler<int, string, MyAsyncHandler>(options); // for server
sc.RegisterTcpRemoteRequestHandler<int, string>(interprocessOptions); // for client
MessagePipeOptions
You can configure MessagePipe behaviour by MessagePipeOptions
in AddMessagePipe(Action<MMessagePipeOptions> configure)
.
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
// var config = ctx.Configuration.Get<MyConfig>(); // optional: get settings from configuration(use it for options configure)
services.AddMessagePipe(options =>
{
options.InstanceLifetime = InstanceLifetime.Scoped;
#if DEBUG
// EnableCaptureStackTrace slows performance, so recommended to use only in DEBUG and in profiling, disable it.
options.EnableCaptureStackTrace = true;
#endif
});
})
Option has these properties(and method).
public sealed class MessagePipeOptions
{
AsyncPublishStrategy DefaultAsyncPublishStrategy; // default is Parallel
HandlingSubscribeDisposedPolicy HandlingSubscribeDisposedPolic; // default is Ignore
InstanceLifetime InstanceLifetime; // default is Singleton
InstanceLifetime RequestHandlerLifetime; // default is Scoped
bool EnableAutoRegistration; // default is true
bool EnableCaptureStackTrace; // default is false
void SetAutoRegistrationSearchAssemblies(params Assembly[] assemblies);
void SetAutoRegistrationSearchTypes(params Type[] types);
void AddGlobal***Filter<T>();
}
public enum AsyncPublishStrategy
{
Parallel, Sequential
}
public enum InstanceLifetime
{
Singleton, Scoped, Transient
}
public enum HandlingSubscribeDisposedPolicy
{
Ignore, Throw
}
DefaultAsyncPublishStrategy
IAsyncPublisher
has PublishAsync
method. If AsyncPublishStrategy.Sequential, await each subscribers. If Parallel, uses WhenAll.
public interface IAsyncPublisher<TMessage>
{
// using Default AsyncPublishStrategy
ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken = default);
ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default);
// snip others...
}
public interface IAsyncPublisher<TKey, TMessage>
where TKey : notnull
{
// using Default AsyncPublishStrategy
ValueTask PublishAsync(TKey key, TMessage message, CancellationToken cancellationToken = default);
ValueTask PublishAsync(TKey key, TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default);
// snip others...
}
public interface IAsyncRequestAllHandler<in TRequest, TResponse>
{
// using Default AsyncPublishStrategy
ValueTask<TResponse[]> InvokeAllAsync(TRequest request, CancellationToken cancellationToken = default);
ValueTask<TResponse[]> InvokeAllAsync(TRequest request, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default);
// snip others...
}
MessagePipeOptions.DefaultAsyncPublishStrategy
's default is Parallel
.
HandlingSubscribeDisposedPolicy
When ISubscriber.Subscribe
after MessageBroker(publisher/subscriber manager) is disposed(for example, scope is disposed), choose Ignore
(returns empty IDisposable
) or Throw
exception. Default is Ignore
.
InstanceLifetime
Configure MessageBroker(publisher/subscriber manager)'s lifetime of DI cotainer. You can choose Singleton
or Scoped
. Default is Singleton
. When choose Scoped
, each messagebrokers manage different subscribers and when scope is disposed, unsubscribe all managing subscribers.
RequestHandlerLifetime
Configure IRequestHandler/IAsyncRequestHandler's lifetime of DI container. You can choose Singleton
or Scoped
or Transient
. Default is Scoped
.
EnableAutoRegistration/SetAutoRegistrationSearchAssemblies/SetAutoRegistrationSearchTypes
Register IRequestHandler
, IAsyncHandler
and filters to DI container automatically on startup. Default is true
and default search target is CurrentDomain's all assemblies and types. However, this sometimes fails to detect the assembly being stripped. In that case, you can enable the search by explicitly adding it to SetAutoRegistrationSearchAssemblies
or SetAutoRegistrationSearchTypes
.
[IgnoreAutoRegistration]
attribute allows to disable auto registration which attribute attached.
EnableCaptureStackTrace
See the details Managing Subscription and Diagnostics section, if true
then capture stacktrace on Subscribe. It is useful for debugging but performance will be degraded. Default is false
and recommended to enable only debug.
AddGlobal***Filter
Add global filter, for example logging filter will be useful.
public class LoggingFilter<T> : MessageHandlerFilter<T>
{
readonly ILogger<LoggingFilter<T>> logger;
public LoggingFilter(ILogger<LoggingFilter<T>> logger)
{
this.logger = logger;
}
public override void Handle(T message, Action<T> next)
{
try
{
logger.LogDebug("before invoke.");
next(message);
logger.LogDebug("invoke completed.");
}
catch (Exception ex)
{
logger.LogError(ex, "error");
}
}
}
To enable all types, use open generics.
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
services.AddMessagePipe(options =>
{
// use typeof(Filter<>, order);
options.AddGlobalMessageHandlerFilter(typeof(LoggingFilter<>), -10000);
});
});
Global provider
If you want to get publisher/subscriber/handler from globally scope, get IServiceProvider
before run and set to static helper called GlobalMessagePipe
.
var host = Host.CreateDefaultBuilder()
.ConfigureServices((ctx, x) =>
{
x.AddMessagePipe();
})
.Build(); // build host before run.
GlobalMessagePipe.SetProvider(host.Services); // set service provider
await host.RunAsync(); // run framework.
GlobalMessagePipe
has these static method(GetPublisher<T>
, GetSubscriber<T>
, CreateEvent<T>
, etc...) so you can get globally.
Integration with other DI library
All(popular) DI libraries has Microsoft.Extensions.DependencyInjection
bridge so configure by MS.E.DI and use bridge if you want.
Compare with Channels
System.Threading.Channels(for Unity, UniTask.Channels
) uses Queue internal, the producer is not affected by the performance of the consumer, and the consumer can control the flow rate(back pressure). This is a different use than MessagePipe's Pub/Sub.
Unity
You need to install Core library and choose VContainer or Zenject or BuiltinContainerBuilder
for runtime. You can install via UPM git URL package or asset package(MessagePipe.*.unitypackage) available in MessagePipe/releases page.
- Core
https://github.com/Cysharp/MessagePipe.git?path=src/MessagePipe.Unity/Assets/Plugins/MessagePipe
- VContainer
https://github.com/Cysharp/MessagePipe.git?path=src/MessagePipe.Unity/Assets/Plugins/MessagePipe.VContainer
- Zenject
https://github.com/Cysharp/MessagePipe.git?path=src/MessagePipe.Unity/Assets/Plugins/MessagePipe.Zenject
Andalso, requires UniTask to install, all ValueTask
declaration in .NET is replaced to UniTask
.
Unity version does not have open generics support(for IL2CPP) and does not support auto registration. Therefore, all required types need to be manually registered.
VContainer's installation sample.
public class GameLifetimeScope : LifetimeScope
{
protected override void Configure(IContainerBuilder builder)
{
// RegisterMessagePipe returns options.
var options = builder.RegisterMessagePipe(/* configure option */);
// Setup GlobalMessagePipe to enable diagnostics window and global function
builder.RegisterBuildCallback(c => GlobalMessagePipe.SetProvider(c.AsServiceProvider()));
// RegisterMessageBroker: Register for IPublisher<T>/ISubscriber<T>, includes async and buffered.
builder.RegisterMessageBroker<int>(options);
// also exists RegisterMessageBroker<TKey, TMessage>, RegisterRequestHandler, RegisterAsyncRequestHandler
// RegisterMessageHandlerFilter: Register for filter, also exists RegisterAsyncMessageHandlerFilter, Register(Async)RequestHandlerFilter
builder.RegisterMessageHandlerFilter<MyFilter<int>>();
builder.RegisterEntryPoint<MessagePipeDemo>(Lifetime.Singleton);
}
}
public class MessagePipeDemo : VContainer.Unity.IStartable
{
readonly IPublisher<int> publisher;
readonly ISubscriber<int> subscriber;
public MessagePipeDemo(IPublisher<int> publisher, ISubscriber<int> subscriber)
{
this.publisher = publisher;
this.subscriber = subscriber;
}
public void Start()
{
var d = DisposableBag.CreateBuilder();
subscriber.Subscribe(x => Debug.Log("S1:" + x)).AddTo(d);
subscriber.Subscribe(x => Debug.Log("S2:" + x)).AddTo(d);
publisher.Publish(10);
publisher.Publish(20);
publisher.Publish(30);
var disposable = d.Build();
disposable.Dispose();
}
}
Zenject's installation sample.
void Configure(DiContainer builder)
{
// BindMessagePipe returns options.
var options = builder.BindMessagePipe(/* configure option */);
// BindMessageBroker: Register for IPublisher<T>/ISubscriber<T>, includes async and buffered.
builder.BindMessageBroker<int>(options);
// also exists BindMessageBroker<TKey, TMessage>, BindRequestHandler, BindAsyncRequestHandler
// BindMessageHandlerFilter: Bind for filter, also exists BindAsyncMessageHandlerFilter, Bind(Async)RequestHandlerFilter
builder.BindMessageHandlerFilter<MyFilter<int>>();
// set global to enable diagnostics window and global function
GlobalMessagePipe.SetProvider(builder.AsServiceProvider());
}
Zenject version is not supported
InstanceScope.Singleton
for Zenject's limitation. The default isScoped
, which cannot be changed.
BuiltinContainerBuilder
is builtin minimum DI library for MessagePipe, it no needs other DI library to use MessagePipe. Here is installation sample.
var builder = new BuiltinContainerBuilder();
builder.AddMessagePipe(/* configure option */);
// AddMessageBroker: Register for IPublisher<T>/ISubscriber<T>, includes async and buffered.
builder.AddMessageBroker<int>(options);
// also exists AddMessageBroker<TKey, TMessage>, AddRequestHandler, AddAsyncRequestHandler
// AddMessageHandlerFilter: Register for filter, also exists RegisterAsyncMessageHandlerFilter, Register(Async)RequestHandlerFilter
builder.AddMessageHandlerFilter<MyFilter<int>>();
// create provider and set to Global(to enable diagnostics window and global fucntion)
var provider = builder.BuildServiceProvider();
GlobalMessagePipe.SetProvider(provider);
// --- to use MessagePipe, you can use from GlobalMessagePipe.
var p = GlobalMessagePipe.GetPublisher<int>();
var s = GlobalMessagePipe.GetSubscriber<int>();
var d = s.Subscribe(x => Debug.Log(x));
p.Publish(10);
p.Publish(20);
p.Publish(30);
d.Dispose();
BuiltinContainerBuilder does not supports scope(always
InstanceScope.Singleton
),IRequestAllHandler/IAsyncRequestAllHandler
, and many DI functionally, so we recommend to use byGlobalMessagePipe
when use BuiltinContainerBuilder.
Adding global filter, you can not use open generics filter so recommended to create these helper method.
// Register IPublisher<T>/ISubscriber<T> and global filter.
static void RegisterMessageBroker<T>(IContainerBuilder builder, MessagePipeOptions options)
{
builder.RegisterMessageBroker<T>(options);
// setup for global filters.
options.AddGlobalMessageHandlerFilter<MyMessageHandlerFilter<T>>();
}
// Register IRequestHandler<TReq, TRes>/IRequestAllHandler<TReq, TRes> and global filter.
static void RegisterRequest<TRequest, TResponse, THandler>(IContainerBuilder builder, MessagePipeOptions options)
where THandler : IRequestHandler
{
builder.RegisterRequestHandler<TRequest, TResponse, THandler>(options);
// setup for global filters.
options.AddGlobalRequestHandlerFilter<MyRequestHandlerFilter<TRequest, TResponse>>();
}
Also you can use GlobalMessagePipe
and MessagePipe Diagnostics
window. see: Global provider and Managing Subscription and Diagnostics section.
License
This library is licensed under the MIT License.