• Stars
    star
    315
  • Rank 128,583 (Top 3 %)
  • Language
    Erlang
  • License
    Other
  • Created almost 11 years ago
  • Updated over 5 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

External buffer processes to protect against mailbox overflow in Erlang

Build Status

PO Box

High throughput Erlang applications often get bitten by the fact that Erlang mailboxes are unbounded and will keep accepting messages until the node runs out of memory.

In most cases, this problem can be solved by imposing a rate limit on the producers, and it is recommended to explore this idea before looking at this library.

When it is impossible to rate-limit the messages coming to a process and that your optimization efforts remain fruitless, you need to start shedding load by dropping messages.

PO Box can help by shedding the load for you, and making sure you won't run out of memory.

The Principles

PO Box is a library that implements a buffer process. Erlang processes will receive their messages locally (at home), and may become overloaded because they have to both deal with their mailbox and day-to-day tasks:

         messages
            |
            V
+-----[Pid or Name]-----+
|      |         |      |
|      | mailbox |      |
|      +---------+      |
|       |               |
|    receive            |
+-----------------------+

A PO Box process will be where you will ask for your messages to go through. The PO Box process will implement a buffer (see Types of Buffer for details) that will do nothing but churn through messages and drop them when the buffer is full for you.

Depending on how you use the API, the PO Box can tell you it received new data, so you can then ask for the data, or you can tell it to send the data to you directly, without notification:

                                                  messages
                                                     |
                                                     V
+---------[Pid]---------+                +--------[POBox]--------+
|                       |<-- got mail ---|      |         |      |
|                       |                |      | mailbox |      |
|   <important stuff>   |--- send it! -->|      +---------+      |
|                       |                |       |               |
|                       |<---<messages>--|<---buffer             |
+-----------------------+                +-----------------------+

To be more detailed, a PO Box is a state machine with an owner process (which it receives messages for), and it has 3 states:

  • Active
  • Notify
  • Passive

The passive state basically does nothing but accumulate messages in the buffer and drop them when necessary.

The notify state is enabled by the user by calling the PO Box. Its sole task is to verify if there is any message in the buffer. If there is, it will respond to the PO Box's owner with a {mail, BoxPid, new_data} message sent directly to the pid. If there is no message in the buffer, the process will wait in the notify state until it gets one. As soon as the notification is sent, it reverts back to the passive state.

The active state is the only one that can send actual messages to the owner process. The user can call the PO Box to set it active, and if there are any messages in the buffer, all the messages it contains get sent as a list to the owner. If there are no messages, the PO Box waits until there is one to send it. After forwarding the messages, the PO Box reverts to the passive state.

The FSM can be illustrated as crappy ASCII as:

         ,---->[passive]------(user makes active)----->[active]
         |         | ^                                  |  ^  |
         |         | '---(sends message to user)--<-----'  |  |
         |  (user makes notify)                            |  |
         |         |                                       |  |
(user is notified) |                                       |  |
         |         V                                       |  |
         '-----[notify]---------(user makes active)--------'  |
                     ^----------(user makes notify)<----------'

Types of buffer

Currently, there are three types of built-in buffers supported: queues and stacks, and keep_old queues. You can also provide your own buffer implementation using the pobox_buf behaviour. See samples/pobox_queue_buf.erl for an example implementation.

Queues will keep messages in order, and drop oldest messages to make place for new ones. If you have a buffer of size 3 and receive messages a, b, c, d, e in that order, the buffer will contain messages [c,d,e].

keep_old queues will keep messages in order, but block newer messages from entering, favoring keeping old messages instead. If you have a buffer of size 3 and receive messages a, b, c, d, e in that order, the buffer will contain messages [a,b,c].

Stacks will not guarantee any message ordering, and will drop the top of the stack to make place for the new messages first. for the same messages, the stack buffer should contain the messages [e,b,a].

To choose between a queue and a stack buffer, you should consider the following criterias:

  • Do you need messages in order? Choose one of the queues.
  • Do you need the latest messages coming in to be kept, or the oldest ones? If so, pick queue and keep_old, respectively.
  • Do you need low latency? Then choose a stack. Stacks will give you many messages with low latency with a few with high latency. Queues will give you a higher overall latency, but less variance over time.

More buffer types could be supported in the future, if people require them.

How to build it

./rebar compile

How to run tests

./rebar compile ct

How to use it

Start a buffer with any of the following:

start_link(OwnerPid, MaxSize, BufferType)
start_link(OwnerPid, MaxSize, BufferType, InitialState)
start_link(Name, OwnerPid, MaxSize, BufferType)
start_link(Name, OwnerPid, MaxSize, BufferType, InitialState)
start_link(#{
    name => Name,
    owner => OwnerPid,
    max => MaxSize, %% mandatory
    type => BufferType,
    initial_state => InitialState,
    heir => HeirPid,
    heir_data => HeirData
})
start_link(Name, #{
    owner => OwnerPid,
    max => MaxSize, %% mandatory
    type => BufferType,
    initial_state => InitialState,
    heir => Heir,
    heir_data => HeirData
})

Where:

  • Name is any name a regular gen_fsm process can accept (including {via,...} tuples)
  • OwnerPid is the pid of the PO Box owner. It's the only one that can communicate with it in terms of setting state and reading messages. The OwnerPid can be either a pid or an atom. The PO Box will set up a link directly between itself and OwnerPid, and won't trap exits. If you're using named processes (atoms) and want to have the PO Box survive them individually, you should unlink the processes manually. This also means that processes that terminate normally won't kill the POBox.
  • MaxSize is the maximum number of messages in a buffer. Note that this is a mandatory property.
  • BufferType can be either queue, stack or keep_old and specifies which type is going to be used. You can also provide your buffer module using {mod, Module}.
  • InitialState can be either passive or notify. The default value is set to notify. Having the buffer passive is desirable when you start it during an asynchronous init and do not want to receive notifications right away.
  • Heir The name or pid of a process that will take over the PO Box if the owner dies. You can use a local registered name such as an atom or you can also use {global, Name} and {via, Module, Name} if you wish. If the Heir is a name the name is resolved as soon as the Owner dies. A message will be sent to the heir to notify it of the transfer and the PO Box will be put into passive state. The format of this message should look like {pobox_transfer, BoxPid, PreviousOwnerPid, HeirData, Reason}.
  • HeirData is data that should be sent as part of the pobox_transfer message to the heir when the owner dies.

The buffer can be made active by calling:

pobox:active(BoxPid, FilterFun, FilterState)

The FilterFun is a function that will take messages one by one along with custom state and can return:

  • {{ok, Message}, NewState}: the message will be sent.
  • {drop, NewState}: the message will be dropped.
  • skip: the message is left in the buffer and whatever was filtered so far gets sent.

A function that would blindly forward all messages could be written as:

fun(Msg, _) -> {{ok,Msg},nostate} end

A function that would limit binary messages by size could be written as:

fun(Msg, Allowed) ->
    case Allowed - byte_size(Msg) of
        N when N < 0 -> skip;
        N -> {{ok, Msg}, N}
    end
end

Or you could drop messages that are empty binaries by doing:

fun(<<>>, State) -> {drop, State};
   (Msg, State) -> {{ok,Msg}, State}
end.

The resulting message sent will be:

{mail, BoxPid, Messages, MessageCount, MessageDropCount}

Finally, the PO Box can be forced to notify by calling:

pobox:notify(BoxPid)

Which is objectively much simpler.

Messages can be sent to a PO Box by calling pobox:post(BoxPid, Msg) or sending a message directly to the process as BoxPid ! {post, Msg}.

The ownership of the PO Box can be transfered to another process by calling:

pobox:give_away(BoxPid, DestPid, DestData, Timeout)

or

pobox:give_away(BoxPid, DestPid, Timeout)

which is equivalent to:

pobox_give_away(BoxPid, DestPid, undefined, Timeout)

The call should return true on success and false on failure. Note that you can only call this from within the owner process otherwise the call always fails. If DestData is not provided it will be sent as undefined in the pobox_transfer message.

The destination process should receive a message of the following form:

{pobox_transfer, BoxPid, PreviousOwnerPid, DestData | undefined, give_away}

Example Session

First start a PO Box for the current process:

1> {ok, Box} = pobox:start_link(self(), 10, queue).
{ok,<0.39.0>}

We'll also define a spammer function that will just keep mailing a bunch of messages:

2> Spam = fun(F,N) -> pobox:post(Box,N), F(F,N+1) end.
#Fun<erl_eval.12.17052888>

Because we're in the shell, the function takes itself as an argument so it can both remain anonymous and loop. Each message is an increasing integer.

I can start the process and wait for a while:

3> Spammer = spawn(fun() -> Spam(Spam,0) end).
<0.42.0>

Let's see if we have anything in our PO box:

4> flush().
Shell got {mail, <0.39.0>, new_data}
ok

Yes! Let's get that content:

5> pobox:active(Box, fun(X,ok) -> {{ok,X},ok} end, ok).
ok
6> flush().
Shell got {mail,<0.39.0>,
                [778918,778919,778920,778921,778922,778923,778924,778925,
                 778926,778927],
                10,778918}
ok

So we have 10 messages with seqential IDs (we used a queue buffer), and the process kindly dropped over 700,000 messages for us, keeping our node's memory safe.

The spammer is still going and our PO Box is in passive mode. Let's cut to the chase and go directly to the active state:

7> pobox:active(Box, fun(X,ok) -> {{ok,X},ok} end, ok).
ok
8> flush().
Shell got {mail,<0.39.0>,
                [1026883,1026884,1026885,1026886,1026887,1026888,1026889,
                 1026890,1026891,1026892],
                10,247955}
ok

Nice. We can go back to notification mode too:

9> pobox:notify(Box).
ok
10> flush().
Shell got {mail, <0.39.0>, new_data}
ok

And keep going on and on and on.

Notes

  • Be careful to have a lightweight filter function if you expect constant overload from messages that keep coming very very fast. While the buffer filters out whatever messages you have, the new ones keep accumulating in the PO Box's own mailbox!
  • It is possible for a process to have multiple PO Boxes, although coordinating the multiple state machines together may get tricky.
  • The library is a generalization of ideas designed and implemented in logplex by Geoff Cant's (@archaelus). Props to him.
  • Using a keep_old buffer with a filter function that selects one message at a time would be equivalent to a naive bounded mailbox similar to what plenty of users asked for before. Tricking the filter function to forward the message (self() ! Msg) while dropping it will allow to do selective receives on bounded mailboxes.
  • When using post_sync/3 keep in mind that full doesn't mean your message will be dropped unless you are using the keep_old buffer type or a custom buffer that behaves the same way as keep_old.

Contributing

Accepted contributions need to be non-aesthetic, and provide some new functionality, fix abstractions, improve performance or semantics, and so on.

All changes received must be tested and not break existing tests.

Changes to currently untested functionality should ideally first provide a separate commit that shows the current behaviour working with the new tests (or some of the new tests, if you expand on the functionality), and then your own feature (and additional tests if required) in its own commit so we can verify nothing breaks in unpredictable ways.

Tests are written using Common Test. PropEr tests will be accepted, because they objectively rule. Ideally, you will wrap your PropEr tests in a Common Test suite so we can run everything with one command.

If you need help, feel free to ask for it in issues or pull requests. These rules are strict, but we're nice people!

Roadmap

This is more a wishlist than a roadmap, in no particular order:

  • Provide default filter functions in a new module

Changelog

  • 1.2.0: added heir and give_away functionality / fixed keep_old buffer size tracking
  • 1.1.0: added pobox_buf behaviour to add custom buffer implementations
  • 1.0.4: move to gen_statem implementation to avoid OTP 21 compile errors and OTP 20 warnings
  • 1.0.3: fix typespecs to generate fewer errors
  • 1.0.2: explicitly specify registered to be [] for relx compatibility, switch to rebar3
  • 1.0.1: fixing bug where manually dropped messages (with the active filter) would result in wrong size values and crashes for queues.
  • 1.0.0: A PO Box links itself to the process that it receives data for.
  • 0.2.0: Added PO Box's pid in the newdata message so a process can own more than a PO Box. Changed internal queue and stack size monitoring to be O(1) in all cases.
  • 0.1.1: adding keep_old queue, which blocks new messages from entering a filled queue.
  • 0.1.0: initial commit

Authors / Thanks

  • Fred Hebert / @ferd: library generalization and current implementation
  • Geoff Cant / @archaelus: design, original implementation
  • Jean-Samuel BΓ©dard / @jsbed: adaptation to gen_statem behaviour
  • Eric des Courtis / @edescourtis: added pobox_buf behaviour & heir/give_away functionality

More Repositories

1

recon

Collection of functions and scripts to debug Erlang in production.
Erlang
1,317
star
2

erlang-history

Hacks to add shell history to Erlang's shell
Erlang
496
star
3

vmstats

tiny Erlang app to generate information on the Erlang VM
Erlang
253
star
4

dispcount

Erlang task dispatcher based on ETS counters.
Erlang
212
star
5

erlpass

A library to handle password hashing and changing in a safe manner, independent from any kind of storage whatsoever.
Erlang
165
star
6

backoff

Simple exponential backoffs in Erlang
Erlang
132
star
7

merklet

Merkle Trees for data replication in Erlang
Erlang
76
star
8

sups

PropEr model helper library to validate implementations of supervisor trees
Erlang
63
star
9

recon_demo

Playground for recon, for practice and demos.
Erlang
59
star
10

zippers

A library for functional zipper data structures in Erlang. Read more on zippers @ http://ferd.ca/yet-another-article-on-zippers.html
Erlang
53
star
11

flatlog

A custom formatter for the Erlang logger application that turns maps into single line text logs
Erlang
52
star
12

cth_readable

Common Test hooks for more readable logs
Erlang
49
star
13

dandelion

A weed is a plant considered undesirable in a particular situation, "a plant in the wrong place". Taxonomically, the term "weed" has no botanical significance, because a plant that is a weed in one context is not a weed when growing in a situation where it is wanted.
Erlang
46
star
14

lrw

Lowest Random Weight hashing for neatly rebalancing hashes
Erlang
45
star
15

simhash

Simhashing for Erlang -- hashing algorithm to find near-duplicates in binary data.
Erlang
43
star
16

bertconf

Make ETS tables out of statc BERT files that are auto-reloaded
Erlang
42
star
17

ReVault

ReVault is a peer-to-peer self-hosted file synchronization project.
Erlang
42
star
18

slider

A WxErlang application to generate slidesets.
Erlang
38
star
19

dlhttpc

dispcount-based lhttpc fork for massive amounts of requests to limited endpoints
Erlang
37
star
20

rebar3_proper

Run PropEr test suites with rebar3
Erlang
37
star
21

batchio

io:format middle-man that buffers and batches output sent to the io server for better throughput
Erlang
36
star
22

erl_crashdump_analyzer

shell script to analyze Erlang crash dumps and find some (generally) useful information.
Shell
30
star
23

hairnet

An Erlang library wrapping AES-GCM (AEAD) crypto in a Fernet-like interface
Erlang
29
star
24

howistart-erlang1-code

Code for my tutorial on howistart.org
Erlang
26
star
25

useragent

Identify browsers and OSes from user agent strings, in Erlang
Erlang
25
star
26

hubble

create, read, and update deep Erlang data structures, accessible through explicit paths.
Erlang
18
star
27

cth_retry

Common Test hooks to retry the last failing cases // Now built in Rebar3
Erlang
14
star
28

cascading-failures

crappy bit of Erlang code whose sole purpose is to crash repeatedly.
Erlang
12
star
29

my-finger

Just waiting for pull requests
11
star
30

rebar3_shellrpc

A plugin to send commands to a running rebar3 shell
Erlang
10
star
31

alias_transform

A parse transform to introduce module aliasing into Erlang code
Erlang
10
star
32

blogerl

My own blog engine. It's been held together with duct tape since 2010
HTML
10
star
33

tend

The Erl Next Door -- a useful platform with which we can quickly load dependencies for tutorials or demonstrations online, and letting people try code as they see fit.
Erlang
9
star
34

erl_subgraph_compile

A rebar plugin to only do partial re-builds of some files without any safety checks.
Erlang
9
star
35

calcalc

Calendrical Calculations; Erlang port of Dershowitz & Reingold's algorithms.
Erlang
6
star
36

bitarray

NIF to replace HiPE bitarray functions
JavaScript
6
star
37

start_wrap

Dumb Wrapper to make full releases possible in Erlang with a 'main' loop
Erlang
4
star
38

interclock

Experimental project to write an Erlang database app using Interval Tree Clocks. NO GUARANTEES EVER.
Erlang
4
star
39

rebar3_todo

A rebar3 plugin that scans source code for TODO notes
Erlang
4
star
40

rebar3-alias

Rebar3 Alias Plugin
Erlang
4
star
41

advent-of-code-2021

Trying the advent of code 2021 in Awk
Awk
3
star
42

erl-loadbalance-benchmarks

Erlang
3
star
43

trx

A rebar plugin to export Erlang test data into Visual Studio test format (.trx files)
Erlang
3
star
44

peeranha

Experimental master-to-master DB using ITCs
Erlang
2
star
45

cowboyku

Cowboy fork to be used with Heroku's proxy library
Erlang
1
star