• Stars
    star
    134
  • Rank 270,967 (Top 6 %)
  • Language
    PHP
  • License
    MIT License
  • Created over 5 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

php-amqplib wrapper that eases the consumption of RabbitMQ. A painless way of using RabbitMQ

anik/amqp codecov PHP Version Require Total Downloads Latest Stable Version

anik/amqp is a php-amqplib wrapper that eases the consumption of RabbitMQ. A painless way of using RabbitMQ.

Note

Previously, the package could be used with Laravel, Laravel Zero, Lumen out of the box. From v2, the Laravel support has been removed. If you are looking for implementation with Laravel, you can use anik/laravel-amqp. If you were using this package with Laravel, and you want to upgrade to Laravel 9, please consider using anik/amqp-to-laravel-amqp if you want to migrate to anik/laravel-amqp later.

Examples

Checkout the repository for example.

Requirements

  • PHP ^7.2 | ^8.0
  • PHP-AMQPLib ^3.0

Installation

To install the package, run

composer require anik/amqp

Documentation

For V1: https://medium.com/@sirajul.anik/rabbitmq-for-php-developers-c17cd019a90

Connection

To create an AMQP Connection, you can use

  • Anik\Amqp\AmqpConnectionFactory::make
  • Anik\Amqp\AmqpConnectionFactory::makeFromArray
<?php

use Anik\Amqp\AmqpConnectionFactory;
use PhpAmqpLib\Connection\AMQPLazySSLConnection;

$host = '127.0.0.1';
$port = 5672;
$user = 'user';
$password = 'password';
$vhost = '/';
$options = []; // options to be proxied to the amqp connection class
$ofClass = AMQPLazySSLConnection::class;

$connection = AmqpConnectionFactory::make($host, $port, $user, $password, $vhost, $options, $ofClass);
$hosts = [
    [
        'host' => $host,
        'port' => $port,
        'user' => $user,
        'password' => $password,
        'vhost' => $vhost,
    ],
    [
        'host' => $host,
        'port' => $port,
        'user' => $user,
        'password' => $password,
        'vhost' => $vhost,
    ]
];

// With AmqpConnectionFactory::makeFromArray method, you can try to connect to multiple host
$connection = AmqpConnectionFactory::makeFromArray($hosts, $options, $ofClass);

Exchange

Also, there are four specific exchange classes.

  • Anik\Amqp\Exchanges\Direct for direct exchange.
  • Anik\Amqp\Exchanges\Fanout for fanout exchange.
  • Anik\Amqp\Exchanges\Headers for headers exchange.
  • Anik\Amqp\Exchanges\Topic for topic exchange.

You can still use Anik\Amqp\Exchanges\Exchange base class to create your own exchange.

To instantiate an exchange, you can do like

<?php

use Anik\Amqp\Exchanges\Exchange;
use Anik\Amqp\Exchanges\Fanout;
use Anik\Amqp\Exchanges\Topic;

$exchange = new Exchange('anik.amqp.direct.exchange', Exchange::TYPE_DIRECT);

$exchange = Exchange::make(['name' => 'anik.amqp.direct.exchange', 'type' => Exchange::TYPE_DIRECT]);

$exchange = new Topic('anik.amqp.topic.exchange');

$exchange = Fanout::make(['name' => 'anik.amqp.fanout.exchange']);

When creating an exchange instance with

  • Exchange::make - name and type keys must be present in the given array.
  • Topic::make Fanout::make Headers::make Direct::make - name key must be present in the given array.

Anik\Amqp\Exchanges\Exchange contains a few predefined exchange types, you can use them as reference.

  • TYPE_DIRECT for direct type.
  • TYPE_TOPIC for topic type.
  • TYPE_FANOUT for fanout type.
  • TYPE_HEADERS for headers type.

The Exchange::make method also accepts the following keys when making an exchange instance.

  • declare Type: bool. Default: false. If you want to declare the exchange.
  • passive Type: bool. Default: false. If the exchange is passive.
  • durable Type: bool. Default: true. If the exchange is durable.
  • auto_delete Type: bool. Default: false. If the exchange should auto delete.
  • internal Type: bool. Default: false. If the exchange is internal.
  • no_wait Type: bool. Default: false. If the client should not wait for the server's reply.
  • arguments Type: array. Default: [].
  • ticket Type: null | integer. Default: null.

You can also reconfigure the exchange instance using $exchange->reconfigure($options). The $options array accepts the above keys as well.

Also, you can use the following methods to configure your exchange instance.

  • setName - Accepts: string. The only way to change exchange name after instantiation.
  • setDeclare - Accepts: bool.
  • setType - Accepts: bool.
  • setPassive - Accepts: bool.
  • setDurable - Accepts: bool.
  • setAutoDelete - Accepts: bool.
  • setInternal - Accepts: bool.
  • setNowait - Accepts: bool.
  • setArguments - Accepts: array.
  • setTicket - Accepts: null | integer.

Queue

To instantiate a queue, you can do like

<?php

use Anik\Amqp\Queues\Queue;

$queue = new Queue('anik.amqp.direct.exchange.queue');

$queue = Queue::make(['name' => 'anik.amqp.direct.exchange.queue']);

When creating a queue instance with

  • Queue::make - name keys must be present in the given array.

The Queue::make method also accepts the following keys when making a queue instance.

  • declare Type: bool. Default: false. If you want to declare the queue.
  • passive Type: bool. Default: false. If the queue is passive.
  • durable Type: bool. Default: true. If the queue is durable.
  • exclusive Type: bool. Default: false. If the queue is exclusive.
  • auto_delete Type: bool. Default: false. If the queue should auto delete.
  • no_wait Type: bool. Default: false. If the client should not wait for the server's reply.
  • arguments Type: array. Default: [].
  • ticket Type: null | integer. Default: null.

You can also reconfigure the queue instance using $queue->reconfigure($options). The $options array accepts the above keys as well.

Also, you can use the following methods to configure your queue instance.

  • setName - Accepts: string. The only way to change queue name after instantiation.
  • setDeclare - Accepts: bool.
  • setType - Accepts: bool.
  • setPassive - Accepts: bool.
  • setDurable - Accepts: bool.
  • setExclusive - Accepts: bool.
  • setAutoDelete - Accepts: bool.
  • setNowait - Accepts: bool.
  • setArguments - Accepts: array.
  • setTicket - Accepts: null | integer.

Qos

To instantiate a Qos, you can do like

<?php

use Anik\Amqp\Qos\Qos;

$prefetchSize = 0;
$prefetchCount = 0;
$global = false;

$qos = new Qos($prefetchSize, $prefetchCount, $global);

$qos = Queue::make(['prefetch_size' => $prefetchSize, 'prefetch_count' => $prefetchCount, 'global' => $global]);

The Qos::make method also accepts the following key when making a qos instance.

  • prefetch_size Type: int. Default: 0.
  • prefetch_count Type: int. Default: 0.
  • global Type: bool. Default: true.

You can also reconfigure the qos instance using $qos->reconfigure($options). The $options array accepts the above keys as well.

Also, you can use the following methods to configure your qos instance.

  • setPrefetchCount - Accepts: int.
  • setPrefetchSize - Accepts: int.
  • setGlobal - Accepts: bool.

Publish/Produce message

To produce/publish messages, you'll need the Anik\Amqp\Producer instance. To instantiate the class

<?php

use Anik\Amqp\Producer;

$producer = new Producer($connection, $channel);

The constructor accepts

  • $connection Type: PhpAmqpLib\Connection\AbstractConnection. Required.
  • $channel Type: null | PhpAmqpLib\Channel\AMQPChannel. Optional.

If $channel is not provided or null, class uses the channel from the $connection.

Once the producer class is instantiated, you can set a channel with setChannel. Method accepts PhpAmqpLib\Channel\AMQPChannel instance.

There are three ways to publish messages

Bulk Publish

Producer::publishBatch - to publish multiple messages in bulk.

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publishBatch($messages, $routingKey, $exchange, $options);
  • $messages Type: Anik\Amqp\Producible[]. If any of the message is not the type of Producible interface, it'll throw error.
  • $routingKey Type: string. Routing key. Default '' (empty string).
  • $exchange Type: null | Anik\Amqp\Exchanges\Exchange.
  • $options Type: array. Runtime configuration.
    • Key exchange - Accepts: array.
      • If you pass null as $exchange, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass $exchange with Exchange instance and $options['exchange'], exchange instance will be reconfigured accordingly with the values available in $options['exchange']. Keys are same as Exchange::make's $options.
    • Key publish - Accepts: array.
      • Key mandatory Default false.
      • Key immediate Default false.
      • Key ticket Default null.
      • Key batch_count. Default: 500. To make a batch of X messages before publishing a batch.

Publish

Producer::publish - to publish a single message. Uses Producer::publishBatch under the hood.

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publish($message, $routingKey, $exchange, $options);
  • $message Type: Anik\Amqp\Producible.
  • $routingKey Type: string. Routing key. Default '' (empty string).
  • $exchange Type: null | Anik\Amqp\Exchanges\Exchange.
  • $options Type: array. Runtime configuration.
    • Key exchange - Accepts: array.
      • If you pass null as $exchange, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass $exchange with Exchange instance and $options['exchange'], exchange instance will be reconfigured accordingly with the values available in $options['exchange']. Keys are same as Exchange::make's $options.
    • Key publish - Accepts: array.
      • Key mandatory Default false.
      • Key immediate Default false.
      • Key ticket Default null.

Publish Basic

Producer::publishBasic - to publish a single message using AMQPChannel::basic_publish method.

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publishBasic($message, $routingKey, $exchange, $options);
  • $message Type: Anik\Amqp\Producible.
  • $routingKey Type: string. Routing key. Default '' (empty string).
  • $exchange Type: null | Anik\Amqp\Exchanges\Exchange.
  • $options Type: array. Runtime configuration.
    • Key exchange - Accepts: array.
      • If you pass null as $exchange, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass $exchange with Exchange instance and $options['exchange'], exchange instance will be reconfigured accordingly with the values available in $options['exchange']. Keys are same as Exchange::make's $options.
    • Key publish - Accepts: array.
      • Key mandatory Default false.
      • Key immediate Default false.
      • Key ticket Default null.

ProducibleMessage: Implementation of Producible Interface

The package comes with Anik\Amqp\ProducibleMessage, a generic implementation of Anik\Amqp\Producible interface.

You can instantiate the class like

<?php

use Anik\Amqp\ProducibleMessage;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$msg = new ProducibleMessage('take my message to rabbitmq');

$msg = new ProducibleMessage('take my message to rabbitmq', [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);

$msg = (new ProducibleMessage())->setMessage('take my message to rabbitmq')->setProperties([
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers' => new AMQPTable(['key' => 'value']),
]);

Consumer

To consume messages, you'll need the Anik\Amqp\Consumer instance. To instantiate the class

<?php

use Anik\Amqp\Consumer;

$consumer = new Consumer($connection, $channel, $options);

The constructor accepts

  • $connection Type: PhpAmqpLib\Connection\AbstractConnection. Required.
  • $channel Type: null | PhpAmqpLib\Channel\AMQPChannel. Optional.
  • $options Type: array. Optional. Configurations for consumer.
    • tag Type: string. Default sprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid()). To set consumer tag.
    • no_local Type: bool. Default false.
    • no_ack Type: bool. Default false.
    • exclusive Type: bool. Default false.
    • no_wait Type: bool. Default false.
    • arguments Type: bool. Default [].
    • ticket Type: null | int. Default null.

If $channel is not provided or null, class uses the channel from the $connection.

Once the consumer class is instantiated, you can access the following methods.

  • setChannel - Accepts: PhpAmqpLib\Channel\AMQPChannel instance.
  • reconfigure - Accepts: array. To reconfigure the instance. Valid keys are same as constructor's options keys.
  • setConsumerTag - Accepts: string. Default sprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid()).
  • setNoLocal - Accepts: bool. Default false.
  • setNoAck - Accepts: bool. Default false.
  • setExclusive - Accepts: bool. Default false.
  • setNowait - Accepts: bool. Default false.
  • setArguments - Accepts: array. Default [].
  • setTicket - Accepts: null | int. Default null.

To consume messages,

<?php

use Anik\Amqp\Consumer;

(new Consumer($connection, $channel, $options))->consume($handler, $bindingKey, $exchange, $queue, $qos, $options);
  • $handler Type: Anik\Amqp\Consumable.
  • $bindingKey Type: string. Binding key. Default '' (empty string).
  • $exchange Type: null | Anik\Amqp\Exchanges\Exchange.
  • $queue Type: null | Anik\Amqp\Queues\Queue.
  • $qos Type: null | Anik\Amqp\Qos\Qos.
  • $options Type: array. Runtime configuration.
    • consumer - Accepts: array. Keys are same as Consumer::__construct's options.
    • exchange - Accepts: array. Keys are same as Exchange::make's options.
      • If you pass null as $exchange, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass $exchange with Exchange instance and $options['exchange'], exchange instance will be reconfigured accordingly with the values available in $options['exchange'].
    • queue - Accepts: array. Keys are same as Queue::make's options.
      • If you pass null as $queue, then you must provide a valid configuration through this key to create a queue under the hood. If you pass $queue with Queue instance and $options['queue'], queue instance will be reconfigured accordingly with the values available in $options['queue'].
    • qos - Accepts: array. Keys are same as Qos::make's options.
      • If you pass $qos with Qos instance and $options['qos'], qos instance will be reconfigured accordingly. If $qos is null and $options['qos'] holds value, QoS will be applied to the channel. If $qos is null and $options['qos'] is not available, NO QoS WILL BE APPLIED TO THE CHANNEL
    • bind - Accepts: array. For binding queue to the exchange.
      • no_wait. Default false.
      • arguments. Default [].
      • ticket. Default null.
    • consume - Accepts: array. Following values are passed to the AMQPChannel::wait().
      • allowed_methods Default null.
      • non_blocking Default false.
      • timeout Default 0.

ConsumableMessage: Implementation of Consumable Interface

The package comes with Anik\Amqp\ConsumableMessage, a generic implementation of Anik\Amqp\Consumable interface.

You can instantiate the class like

<?php

use Anik\Amqp\ConsumableMessage;
// use PhpAmqpLib\Message\AMQPMessage;

$msg = new ConsumableMessage(function (ConsumableMessage $message/*, AMQPMessage $original*/) {
    echo $message->getMessageBody() . PHP_EOL;
    echo $message->getRoutingKey() . PHP_EOL;
    $message->ack();
    // Alternatively, $original->ack();

    /** 
     * Method: `decodeMessage` 
     * Returns:
     *      - `array` if message body contains valid json
     *      - `null` if json could not be decoded 
     */
    var_dump($message->decodeMessage());

    /** 
     * Method: `decodeMessageAsObject` 
     * Returns:
     *      - `\stdClass` if message body contains valid json
     *      - `null` if json could not be decoded 
     */
    var_dump($message->decodeMessageAsObject());
});

NOTE: Calling any method on ConsumableMessage instance without setting AMQPMessage will throw exception.

Issues?

If you find any issue/bug/missing feature, please submit an issue and PRs if possible.

More Repositories

1

bd-income-tax-calculator

Visit https://ssi-anik.github.io/bd-income-tax-calculator/ for the calculator. Documentation [WIP]
JavaScript
71
star
2

form-request

Form request is a package for Lumen that lets developer validate form requests like Laravel does.
PHP
49
star
3

dataset

Data set is PHP package for importing & exporting data within CSV & Database with data manipulation
PHP
34
star
4

laravel-custom-auth

Article: Laravel Custom Auth with guard & driver
PHP
33
star
5

json-diff-checker

JSON diff checker checks is an extension which checks for the differences between two JSONs
JavaScript
30
star
6

laravel-amqp

anik/amqp wrapper for Laravel-ish frameworks
PHP
27
star
7

laravel-broadcast-demo

Article: Laravel PWA to implement Broadcasting
PHP
20
star
8

elastic-apm-php

elastic/apm-agent-php wrapper to push PHP application transactions to APM Server
PHP
16
star
9

IAmHungry

i am HUNGRY is a location based android application where you can find the restaurants or food zones within 2KM.
Java
14
star
10

loguzz

Loguzz is a logger for guzzle request and response
PHP
13
star
11

sales-management

Android application with web api and web front end
JavaScript
12
star
12

rabbitmq-tutorial

RabbitMQ Tutorial for medium
PHP
11
star
13

oopsifier

Oopsifier is an extension for chrome, that helps you to remove the unwanted news from your facebook news feed by adding them to the extension's list.
JavaScript
10
star
14

iLostMouse

i lost mouse is an android and windows application where you can handle your computer mouse with your android phone
C#
10
star
15

laravel-restful-api

Basic laravel restful api
PHP
8
star
16

sse-with-mercure

Implementation of Mercure with Laravel/Lumen
PHP
8
star
17

mercure

dunglas/mercure publisher implementation for any PHP-based application. Publish messages to the Mercure hub with Laravel Event Broadcasting & Laravel Notifications.
PHP
8
star
18

testbench-lumen

[Package] Lumen Testing Helper for Packages Development
PHP
7
star
19

env-checker

Env checker is a chrome extension that helps user to compare the available envs between two files.
JavaScript
7
star
20

lumen-api-scaffolding

Lumen API scaffolding is a lumen project, ready to work on project with docker
PHP
5
star
21

laravel-backpack-extension

Class based imperative approach for Laravel backpack/crud
PHP
5
star
22

confession-wall

The implementation of APIs & GraphQL in Lumen. Check the documentation. Read the readme files for the list of articles for GraphQL implementation.
PHP
4
star
23

connected-bdix-servers

[WIP]
JavaScript
4
star
24

from-broadcast-to-rtmp

Broadcasts a live stream from one service to another streaming service
JavaScript
4
star
25

elasticsearch-sample-data-generator

Sample data generator and writes in file to upload to Elasticsearch for bulk upload
PHP
4
star
26

portner

Portner is a PHP CLI application to store, suggest the ports you're going to use for your docker.
PHP
3
star
27

prayer-times

Prayer times is an extension for google chrome that shows the salat timings for the current and next day of your timezone.
JavaScript
3
star
28

docker-builds

Docker images for development environment
3
star
29

linqueue

A chrome extension for queuing links.
JavaScript
2
star
30

docker-compose-medium

docker-compose configuration for medium
PHP
2
star
31

portner-desktop

Portner Desktop is a GUI tool to store, suggest the ports you're going to use for your docker.
JavaScript
2
star
32

acm

All the ACM problems I've solved using C, C++, JAVA.
C++
2
star
33

peer-pressure

Peer pressure is a webhook listener for github. It receives the webhook for PR requests and sends them to slack with configurable messages & mentions.
Python
2
star
34

repository-generator

Repository generator is an artisan console based on laravel to ease the creation of generation of repository for the package https://github.com/ssi-anik/repottern
PHP
2
star
35

swoole-opcache-lumen

opcache-swoole with Lumen
1
star
36

save-bandwidth

A chrome extension that saves limited internet from auto streaming facebook videos
JavaScript
1
star
37

USBDetect

Detect if any usb device is inserted or removed
Java
1
star
38

pull-me

Pull me is a PHP Package that helps dumping your table data into CSV with table relationships and data manipulation.
1
star
39

docker-builds-node

Node docker images for development environment
Dockerfile
1
star
40

learning-react-blog

Implementation of https://github.com/ssi-anik/laravel-restful-api
JavaScript
1
star
41

easy-xml

Easy XML is an XML parser for PHP
PHP
1
star
42

laravel-octane-rnd

PHP
1
star
43

elastiquent

Elastiquent is a PHP client for Elasticsearch that provides simple way to communicate with your elasticsearch.
PHP
1
star