• Stars
    star
    233
  • Rank 172,204 (Top 4 %)
  • Language
    Erlang
  • License
    Other
  • Created over 9 years ago
  • Updated almost 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Simple, Distributed and Scalable PubSub Message Bus written in Erlang

Erlbus

Message/Event Bus written in Erlang.

CI

The PubSub core is a clone of the original, remarkable, and proven Phoenix PubSub Layer, but re-written in Erlang.

A new way to build soft real-time and high scalable messaging-based applications, not centralized but distributed!

See the online documentation.

Introduction

Erlbus is a simple and lightweight library/tool to build messaging-based applications.

Erlbus PubSub implementation was taken from Phoenix Framework, which provides an amazing, scalable and proven PubSub solution. In addition to this, Erlbus provides an usable and simpler interface on top of this implementation.

You can read more about the PubSub implementation HERE.

Installation

Erlang

In your rebar.config:

{deps, [
  {ebus, "0.3.0", {pkg, erlbus}}
]}.

Elixir

In your mix.exs:

def deps do
  [
    {:ebus, "~> 0.3", hex: :erlbus}
  ]
end

Getting Started

Assuming you have a working Erlang installation (18 or later), building erlbus should be as simple as:

$ git clone https://github.com/cabol/erlbus.git
$ cd erlbus
$ make

Quick Start Example

Start an Erlang console with erlbus running:

make shell

Once into the erlang console:

% subscribe the current shell process
ebus:sub(self(), "foo").
ok

% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>

% subscribe spawned PID
ebus:sub(Pid, "foo").
ok

% publish a message
ebus:pub("foo", {foo, "hi"}).
ok

% check received message for Pid
ebus_proc:messages(Pid).
[{foo,"hi"}]

% check received message for self
ebus_proc:messages(self()).
[{foo,"hi"}]

% unsubscribe self
ebus:unsub(self(), "foo").
ok

% publish other message
ebus:pub("foo", {foo, "hello"}).
ok

% check received message for Pid
ebus_proc:messages(Pid).
[{foo,"hi"},{foo,"hello"}]

% check received message for self (last message didn't arrive)
ebus_proc:messages(self()).
[{foo,"hi"}]

% check subscribers (only Pid should be in the returned list)
ebus:subscribers("foo").
[<0.57.0>]

% check topics
ebus:topics().
[<<"foo">>]

% subscribe self to other topic
ebus:sub(self(), "bar").
ok

% check topics
ebus:topics().
[<<"bar">>,<<"foo">>]

% publish other message
ebus:pub("bar", {bar, "hi bar"}).
ok

% check received message for Pid (last message didn't arrive)
ebus_proc:messages(Pid).
[{foo,"hi"},{foo,"hello"}]

% check received message for self
ebus_proc:messages(self()).
[{foo,"hi"},{bar,"hi bar"}]

Note:

You may have noticed that is not necessary additional steps/calls to create/delete a topic, this is automatically handled by ebus, so you don't worry about it!

Now, let's make it more fun, start two Erlang consoles, first one:

erl -name [email protected] -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config

The second one:

erl -name [email protected] -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config

Then what we need to do is put these Erlang nodes in cluster, so from any of them send a ping to the other:

% From node1 ping node2
net_adm:ping('[email protected]').
pong

Excellent, we have both nodes in cluster, thanks to the beauty of Distributed Erlang. So, let's repeat the above exercise but now in two nodes.

In the node1 create a handler and subscription to some topic:

% create a callback fun to use ebus_proc utility
CB1 = fun(Msg) ->
  io:format("CB1: ~p~n", [Msg])
end
#Fun<erl_eval.6.54118792>

% other callback but receiving additional arguments,
% which may be used when message arrives
CB2 = fun(Msg, Args) ->
  io:format("CB2: Msg: ~p, Args: ~p~n", [Msg, Args])
end.
#Fun<erl_eval.12.54118792>

% use ebus_proc utility to spawn a handler
H1 = ebus_proc:spawn_handler(CB1).
<0.70.0>
H2 = ebus_proc:spawn_handler(CB2, ["any_ctx"]).
<0.72.0>

% subscribe handlers
ebus:sub(H1, "foo").
ok
ebus:sub(H2, "foo").
ok

Repeat the same thing above in node2.

Once you have handlers subscribed to the same channel in both nodes, publish some messages from any node:

% publish message
ebus:pub("foo", {foo, "again"}).
CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"
ok

And in the other node you will see those messages have arrived too:

CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"

Let's check subscribers, so from any Erlang console:

% returns local and remote subscribers
ebus:subscribers("foo").
[<7023.67.0>,<7023.69.0>,<0.70.0>,<0.72.0>]

You can also check the tests for more examples about using ebus.

So far, so good! Let's continue!

Point-To-Point Example

The great thing here is that you don't need something special to implement a point-to-point behavior. It is as simple as this:

ebus:dispatch("topic1", #{payload => "M1"}).

Dispatch function gets the subscribers and then picks one of them to send the message out. You can provide a dispatch function to pick up a subscriber, otherwise, a default function is provided (picks a subscriber random).

Dispatch function comes in 3 different flavors:

  • ebus:dispatch/2: receives the topic and the message.
  • ebus:dispatch/3: receives the topic, message and a list of options.
  • ebus:dispatch/4: same as previous but receives as 1st argument the name of the server, which is placed by default in the other functions.

Dispatch options are:

  • {scope, local | global}: allows you to choose if you want to pick a local subscriber o any. Default value: local.
  • {dispatch_fun, fun(([term()]) -> term())}: function to pick up a subscriber. If it isn't provided, a default random function is provided.

To see how this function is implemented go HERE.

Let's see an example:

% subscribe local process
ebus:sub(self(), "foo").
ok

% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>

% subscribe spawned PID
ebus:sub(Pid, "foo").
ok

% check that we have two subscribers
ebus:subscribers("foo").
[<0.57.0>,<0.38.0>]

% now dispatch a message (default dispatch fun and scope)
ebus:dispatch("foo", #{payload => foo}).
ok

% check that only one subscriber received the message
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[]

% dispatch with options
Fun = fun([H | _]) -> H end.
#Fun<erl_eval.6.54118792>
ebus:dispatch("foo", <<"M1">>, [{scope, global}, {dispatch_fun, Fun}]).
ok

% check again
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[<<"M1">>]

Extremely easy isn't?

Distributed Erlbus

Erlbus is distributed by nature, it doesn't require any additional/magical thing.

Once you have an Erlang cluster, messages are broadcasted using PG2, which is the default PubSub adapter. Remember, it's a Phoenix PubSub clone, so the architecture and design it's the same.

Phoenix Channels are supported on PubSub layer, which is the core. Take a look at this blog post.

Important links

Running Tests

make test

Building docs

make docs

Note: Once you run previous command, a new folder doc is created, and you'll have a pretty nice HTML documentation.

Change Log

All notable changes to this project will be documented in the CHANGELOG.md.

Copyright and License

Original work Copyright (c) 2014 Chris McCord

Modified work Copyright (c) 2016 Carlos Andres Bolaños

Erlbus source code is licensed under the MIT License.

NOTE: Pub/Sub implementation was taken from Phoenix Framework.

More Repositories

1

nebulex

In-memory and distributed caching toolkit for Elixir.
Elixir
1,088
star
2

shards

Partitioned ETS tables for Erlang and Elixir
Erlang
292
star
3

kvx

Simple in-memory Key/Value Store written in Elixir using `cabol/ex_shards`
Elixir
99
star
4

west

WEST (Web/Event-driven Systems Tool) is another messaging tool written in Erlang, that enables the building of messaging-based systems
Erlang
43
star
5

ex_shards

Elixir wrapper for cabol/shards .
Elixir
39
star
6

nebulex_ecto

Nebulex and Ecto integration
Elixir
27
star
7

nebulex_examples

Nebulex Examples
Elixir
26
star
8

nebulex_redis_adapter

Nebulex adapter for Redis
Elixir
23
star
9

gen_buffer

A generic message buffer behaviour with pooling and back-pressure for Erlang/Elixir.
Erlang
16
star
10

jchash

Jump Consistent Hash NIF library for Erlang/Elixir
Erlang
15
star
11

cross_db

Simple and flexible database wrapper for Erlang
Erlang
10
star
12

oauth2_mnesia_backend

Mnesia backend for kivra/oauth2 project.
Erlang
7
star
13

dberl

NoSQL DB access tool written in Erlang.
Erlang
6
star
14

tcp_client

An Erlang TCP client connections manager
Erlang
5
star
15

nebulex_cluster

Cluster utilities for Nebulex adapters like Redis or Memcached
Elixir
4
star
16

erlang_adt

Abstract Data Types Examples In Erlang
Erlang
3
star
17

rps

Rock-Paper-Scissors Example using Phoenix Framework
Elixir
3
star
18

nebulex_adapters_cachex

A Nebulex adapter for Cachex
Elixir
3
star
19

oauth2_dberl_backend

dberl backend for kivra/oauth2 project.
Erlang
2
star
20

shards_bench

Performance tests for cabol/shards project.
Erlang
2
star
21

cross_db_couchbase

Couchbase adapter for cabol/cross_db
1
star
22

shards_kv

Simple in-memory KV store using cabol/shards.
1
star
23

nebulex_memcached_adapter

Nebulex adapter for Memcached
Elixir
1
star
24

phoenix_oauth2_example

OAuth2 example using Phoenix and kivra/oauth2
1
star
25

shards_pg

Erlang PG2 implementation using shards!
1
star
26

nebulex_adapters_partitioned

Nebulex adapter for partitioned cache topology
1
star
27

shards_dist

Distributed implementation for Shards
1
star
28

cabol.github.io

Blog
CSS
1
star
29

nebulex_mnesia_adapter

Nebulex adapter for Mnesia
1
star