RabbitMQ cli consumer
IMPORTANT: Looking for maintainer: #81.
If you are a fellow PHP developer just like me you're probably aware of the following fact: PHP is meant to die.
When using RabbitMQ with pure PHP consumers you have to deal with stability issues. Probably you are killing your consumers regularly. And try to solve the problem with supervisord. Which also means on every deploy you have to restart your consumers. A little bit dramatic if you ask me.
This is a fork of the work done by Richard van den Brand and provides a command that aims to solve the above described problem for RabbitMQ workers by delegate the long running part to a tool written in go which is much better suited for this task. The PHP application then is only executed when there is an AMQP message to process. This is comparable to how HTTP requests usually are handled where the webs server waits for new incoming requests and calls your script once for each request.
This fork came to be, when Richard van den Brand did no longer had the time to maintain his version. The main goals of the fork are:
- Following the principle of The Twelve-Factor App environment dependent settings should be configurable by environment variables.
- All logs, including the output of the called executable, will be available in STDOUT/STDERR.
- The AMQP message will be passed via STDIN (not as argument with its limitation in size)
- Have tests with a decent level of code coverage.
NOTE: If you previously used the consumer of Richard van den Brand, this version should work as a drop in replacement. Do not migrate blindly but do some testing before. Effort was made to remain backwards compatible, no guarantees are made.
Installation
You have the choice to either compile yourself or by installing via package or binary.
Binary
Binaries can be found at: https://github.com/corvus-ch/rabbitmq-cli-consumer/releases
Compiling
This section assumes you're familiar with the Go language.
Use go get
to get the source local:
$ go get github.com/corvus-ch/rabbitmq-cli-consumer
Change to the directory, e.g.:
$ cd $GOPATH/src/github.com/corvus-ch/rabbitmq-cli-consumer
Get the dependencies:
$ go get ./...
Then build and/or install:
$ go build
$ go install
Usage
rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable '/path/to/your/app argument --flag'
Run without arguments or with -help
switch to show the helptext:
Configuration
The file example.conf
contains all available configuration options together
with its explanation.
In Go the zero value for a string is ""
. So, any values not configured in the
config file will result in a empty string. Now imagine you want to define an
empty name for one of the configuration settings. Yes, we now cannot determine
whether this value was empty on purpose or just left out. If you want to
configure an empty string you have to be explicit by using the value <empty>
.
rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --configuration example.conf
Graceful shutdown
The consumer handles the signal SIGTERM. When SIGTERM is received, the AMQP channel will be canceled, preventing any new messages from being consumed. This allows to stop the consumer but let a currently running executable to finishing and acknowledgement of the message.
The executable
Your executable receives the message as the last argument. So consider the following:
rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php
The command.php
file should look like this:
#!/usr/bin/env php
<?php
// This contains first argument
$message = $argv[1];
// Decode to get original value
$original = base64_decode($message);
// Start processing
if (do_heavy_lifting($original)) {
// All well, then return 0
exit(0);
}
// Let rabbitmq-cli-consumer know someting went wrong, message will be requeued.
exit(1);
Or a Symfony2 example:
rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable 'app/console event:processing --env=prod'
Command looks like this:
<?php
namespace Vendor\EventBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class TestCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->addArgument('event', InputArgument::REQUIRED)
->setName('event:processing')
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$message = base64_decode($input->getArgument('event'));
$this->getContainer()->get('mailer')->send($message);
exit(0);
}
}
Compression
Depending on what you're passing around on the queue, it may be wise to enable compression support. If you don't you may encouter the infamous "Argument list too long" error.
When compression is enabled, the message gets compressed with zlib maximum compression before it's base64 encoded. We have to pay a performance penalty for this. If you are serializing large php objects I suggest to turn it on. Better safe then sorry.
In your config:
[rabbitmq]
compression = On
And in your php app:
#!/usr/bin/env php
<?php
// This contains first argument
$message = $argv[1];
// Decode to get compressed value
$original = base64_decode($message);
// Uncompresss
if (! $original = gzuncompress($original)) {
// Probably wanna throw some exception here
exit(1);
}
// Start processing
if (do_heavy_lifting($original)) {
// All well, then return 0
exit(0);
}
// Let rabbitmq-cli-consumer know someting went wrong, message will be requeued.
exit(1);
Including properties and message headers
If you need to access message headers and or properties, call the command with
the --include
option set.
rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --include
The script then will receive a json encoded data structure which looks like the following.
{
"properties": {
"application_headers": {
"name": "value"
},
"content_type": "",
"content_encoding": "",
"delivery_mode": 1,
"priority": 0,
"correlation_id": "",
"reply_to": "",
"expiration": "",
"message_id": "",
"timestamp": "0001-01-01T00:00:00Z",
"type": "",
"user_id": "",
"app_id": ""
},
"delivery_info": {
"message_count": 0,
"consumer_tag": "ctag-./rabbitmq-cli-consumer-1",
"delivery_tag": 2,
"redelivered": true,
"exchange": "example",
"routing_key": ""
},
"body": ""
}
Change your script according to the following example.
#!/usr/bin/env php
<?php
// This contains first argument
$input = $argv[1];
// Decode to get original value also decrompress acording to your configuration.
$data = json_decode(base64_decode($input));
// Start processing
if (do_heavy_lifting($data->body, $data->properties)) {
// All well, then return 0
exit(0);
}
// Let rabbitmq-cli-consumer know someting went wrong, message will be requeued.
exit(1);
If you are using symfonies RabbitMQ bundle (php-amqplib/rabbitmq-bundle
) you
can wrap the consumer with the following symfony command.
<?php
namespace Vendor\EventBundle\Command;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class TestCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->addArgument('event', InputArgument::REQUIRED)
->setName('event:processing')
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$data = json_decode(base64_decode($input->getArgument('event')), true);
$message = new AMQPMessage($data['body'], $data['properties']);
/** @var \PhpAmqpLib\Message\AMQPMessage\ConsumerInterface $consumer */
$consumer = $this->getContainer()->get('consumer');
if (false == $consumer->execute($message)) {
exit(1);
}
}
}
Use pipe instead of arguments
When starting the consumer with the --pipe
option, the AMQP message will be
passed on to the executable using STDIN for the message body and fd3 for the
metadata containing the properties and the delivery info encoded as JSON.
rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --pipe
#!/usr/bin/env php
<?php
// Read the metadata from fd3.
$metadata = file_get_contents("php://fd/3");
if (false === $metadata) {
fwrite(STDERR, "failed to read metadata from fd3\n");
exit(1);
}
// Decode the metadata.
$metadata = json_decode($metadata, true);
if (JSON_ERROR_NONE != json_last_error()) {
fwrite(STDERR, "failed to decode metadata\n");
fwrite(STDERR, json_last_error_msg() . PHP_EOL);
exit(1);
}
// Read the body from STDIN.
$body = file_get_contents("php://stdin");
if (false === $body) {
fwrite(STDERR, "failed to read body from STDIN\n");
exit(1);
}
Strict exit code processing
By default, any non-zero exit code will make consumer send a negative acknowledgement and re-queue message back to the queue, in some cases it may cause your consumer to fall into an infinite loop as re-queued message will be getting back to consumer and it probably will fail again.
It's possible to get better control over message acknowledgement by setting up strict exit code processing. In this mode consumer will acknowledge messages only if executable process return an allowed exit code.
Allowed exit codes
Exit Code | Action |
---|---|
0 | Acknowledgement |
3 | Reject |
4 | Reject and re-queue |
5 | Negative acknowledgement |
6 | Negative acknowledgement and re-queue |
All other exit codes will cause consumer to fail.
Run consumer with --strict-exit-code
option to enable strict exit code processing:
rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --strict-exit-code
Make sure your executable returns correct exit code
#!/usr/bin/env php
<?php
// ...
try {
if (do_heavy_lifting($data)) {
// All well, then return 0
exit(0);
}
} catch(InvalidMessageBody $e) {
exit(3); // Message is invalid, just reject and don't try to process again
} catch(TimeoutException $e) {
exit(4); // Reject and try again
} catch(Exception $e) {
exit(1); // Unexpected exception will cause consumer to stop consuming
}
Metrics
Metrics are following the Prometheus conventions.
They are available via HTTP endpoint (http://127.0.0.1:9566/metrics), with default port 9566
and default path /metrics
.
The following metrics are tracked:
Metric | Type | Description |
---|---|---|
rabbitmq_cli_consumer_process_total |
Counter | The total number of processes executed. Processes are aggregated by their exit code. |
rabbitmq_cli_consumer_process_duration_seconds |
Histogram | The time spent by the consumer to process the message. |
rabbitmq_cli_consumer_message_duration_seconds |
Histogram | The time spent from publishing to finished processing the message. This requires the message to have the timestamp header set. |
Contributing and license
This library is licenced under MIT. For information about how to contribute to this project, see CONTRIBUTING.md.