By default, consumer processes in Erlang/Elixir don't have much control over received messages. This is especially true for GenServer
based processes, where messages are processed one by one.
The Workex
library provides the control over message receiving by splitting the consumer in two processes: one which accepts messages, and another process which handles them. This approach can be useful in various scenarios:
- Sometimes, bulk processing can speed up consuming, and processing of
N
items at once is much faster thanN
times processing of a single item. - A maximum queue limit must be set after which the consumer should refuse new request.
- There is a need to eliminate duplicates. An item that arrives in queue makes the previous item of the same kind obsolete.
- A consumer should rearrange incoming messages by some priority (e.g. newest first).
Let's see a simple demonstration. Suppose we have a function that does long processing of some data:
defmodule Processor do
def long_op(data) do
:timer.sleep(100)
IO.inspect data
:ok
end
end
Obviously, this function can run at most 10 times per second. However, the running time is mostly unrelated to the size of the input data. Thus, we can benefit if we do bulking of input messages.
In particular, when a message arrives, the consumer can do following:
- An idle consumer can immediately consume the message.
- A busy consumer can queue the message. When the current processing is done, the consumer will process all queued messages at once.
This allows the consumer to automatically adapt to the incoming load of messages by taking the larger chunks of consumed messages.
Here's how we can do this with Workex
. First, make sure you have Workex
set as dependency in mix.exs
:
def deps do
[{:workex, "~> 0.10.0"}, ...]
end
def application do
[applications: [:workex, ...], ...]
end
Now, you can define the consumer as the callback used by the Workex
behaviour:
defmodule Consumer do
use Workex
# Interface functions are invoked inside client processes
def start_link do
Workex.start_link(__MODULE__, nil)
end
def push(pid, item) do
Workex.push(pid, item)
end
# Callback functions run in the worker process
def init(_), do: {:ok, nil}
def handle(data, state) do
Processor.long_op(data)
{:ok, state}
end
end
The producer can now start the Workex process, and push some data:
{:ok, pid} = Consumer.start_link
for i <- 1..100 do
Consumer.push(pid, i)
:timer.sleep(10)
end
[1] # after 100 ms
[2, 3, 4, 5, 6, 7, 8, 9, 10] # after 200 ms
[11, 12, 13, 14, 15, 16, 17, 18, 19] # after 300 ms
...
As you can see from the output, the first message is consumed immediately. In the meantime, all subsequent messages are aggregated and handled as soon as the consumer becomes idle. This allowed us to process 20 items in 300 ms, even though the processor function (Processor.long_op/1
) takes about 100 ms.
Workex
is a behaviour that runs two generic processes: the Workex
powered process, and the internal worker process. The Workex
process is a facade that accepts incoming items. It is tightly coupled with the worker process. As soon as the worker process is done processing, it notifies the Workex
process, which may in turn provide new data, if there is some. Otherwise, the consumer is considered to be idle until the next message arrives.
The worker process is a long running process. Callback functions are invoked in this process and can manage some state. This works roughly like with GenServer
. The init/1
callback returns the initial state, while handle/2
returns the new state. In case of a crash, both worker, and Workex
process terminate, and the incoming queue is lost. This is analogous to the behavior of plain BEAM processes.
As can be seen from the previous example, messages are by default aggregated in queue. There are two other data aggregation strategies provided:
- A
Workex.Dict
assumes that messages are in form of{key, value}
. New message overwrites the queued one with the same key. The worker receives an unordered list of{key, value}
tuples. - A
Workex.Stack
provides the data in the reversed order. Can be useful when it is useful to consume newer messages first.
To use an alternative aggregation, you can start the server with:
Workex.start_link(MyModule, arg, aggregate: %Workex.Dict{})
You can also implement your own aggregation strategies. This amounts to developing a structure that implements the Workex.Aggregate
protocol.
By default, Workex
doesn't impose a limit to the message queue size. However, in some cases, you may want to refuse accepting new items after the queue size exceeds some limit. This can be done by providing the max_size
option:
Workex.start_link(MyModule, arg, max_size: 10)
If we have 10 items queued, all subsequent items will not enter the queue. Of course, once all queued items are passed to the worker server, the queue is emptied and Workex
process will accept new items.
You can also request that the oldest item is replaced with the new one:
Workex.start_link(MyModule, arg, max_size: 10)
Note that this option doesn't make sense with all aggregations. In particular, it will not work with Workex.Dict
, since this strategy doesn't preserve ordering.
Workex.push/2
is a fire-and-forget operation, which means you have no idea about its outcome. If you need some stronger guarantees, you can use Workex.push_ack/2
which returns :ok
if the item is successfully queued, or an error if the item was not queued (for example if the buffer limit has been reached).
There is also the function Workex.push_block/2
which blocks the client (but not the Workex
process) until the item has been processed by the worker. This is mostly useful if there are many concurrent producers pushing to the consumer, and you want to apply some stronger back pressure.