anik / amqp
php-amqplib wrapper that eases the consumption of RabbitMQ. A painless way of using RabbitMQ
Installs: 359 875
Dependents: 1
Suggesters: 0
Security: 0
Stars: 135
Watchers: 5
Forks: 22
Open Issues: 2
pkg:composer/anik/amqp
Requires
- php: ^7.2|^8.0
- php-amqplib/php-amqplib: ^3.2
Requires (Dev)
- phpunit/phpunit: ^8.5|^9.5
README
anik/amqp

anik/amqp is a php-amqplib wrapper that eases the consumption of RabbitMQ. A painless way of using RabbitMQ.
Note
Previously, the package could be used with Laravel, Laravel Zero, Lumen out of the box. From v2, the Laravel support
has been removed. If you are looking for implementation with Laravel, you can
use anik/laravel-amqp. If you were using this package with Laravel, and you want to upgrade to Laravel 9, please consider using anik/amqp-to-laravel-amqp if you want to migrate to anik/laravel-amqp later.
Examples
Checkout the repository for example.
Requirements
- PHP
^7.2 | ^8.0 - PHP-AMQPLib
^3.0
Installation
To install the package, run
composer require anik/amqp
Documentation
For V1: https://medium.com/@sirajul.anik/rabbitmq-for-php-developers-c17cd019a90
Connection
To create an AMQP Connection, you can use
Anik\Amqp\AmqpConnectionFactory::makeAnik\Amqp\AmqpConnectionFactory::makeFromArray
<?php use Anik\Amqp\AmqpConnectionFactory; use PhpAmqpLib\Connection\AMQPLazySSLConnection; $host = '127.0.0.1'; $port = 5672; $user = 'user'; $password = 'password'; $vhost = '/'; $options = []; // options to be proxied to the amqp connection class $ofClass = AMQPLazySSLConnection::class; $connection = AmqpConnectionFactory::make($host, $port, $user, $password, $vhost, $options, $ofClass); $hosts = [ [ 'host' => $host, 'port' => $port, 'user' => $user, 'password' => $password, 'vhost' => $vhost, ], [ 'host' => $host, 'port' => $port, 'user' => $user, 'password' => $password, 'vhost' => $vhost, ] ]; // With AmqpConnectionFactory::makeFromArray method, you can try to connect to multiple host $connection = AmqpConnectionFactory::makeFromArray($hosts, $options, $ofClass);
Exchange
Also, there are four specific exchange classes.
Anik\Amqp\Exchanges\Directfor direct exchange.Anik\Amqp\Exchanges\Fanoutfor fanout exchange.Anik\Amqp\Exchanges\Headersfor headers exchange.Anik\Amqp\Exchanges\Topicfor topic exchange.
You can still use Anik\Amqp\Exchanges\Exchange base class to create your own exchange.
To instantiate an exchange, you can do like
<?php use Anik\Amqp\Exchanges\Exchange; use Anik\Amqp\Exchanges\Fanout; use Anik\Amqp\Exchanges\Topic; $exchange = new Exchange('anik.amqp.direct.exchange', Exchange::TYPE_DIRECT); $exchange = Exchange::make(['name' => 'anik.amqp.direct.exchange', 'type' => Exchange::TYPE_DIRECT]); $exchange = new Topic('anik.amqp.topic.exchange'); $exchange = Fanout::make(['name' => 'anik.amqp.fanout.exchange']);
When creating an exchange instance with
Exchange::make-nameandtypekeys must be present in the given array.Topic::makeFanout::makeHeaders::makeDirect::make-namekey must be present in the given array.
Anik\Amqp\Exchanges\Exchange contains a few predefined exchange types, you can use them as reference.
TYPE_DIRECTfor direct type.TYPE_TOPICfor topic type.TYPE_FANOUTfor fanout type.TYPE_HEADERSfor headers type.
The Exchange::make method also accepts the following keys when making an exchange instance.
declareType:bool. Default:false. If you want to declare the exchange.passiveType:bool. Default:false. If the exchange is passive.durableType:bool. Default:true. If the exchange is durable.auto_deleteType:bool. Default:false. If the exchange should auto delete.internalType:bool. Default:false. If the exchange is internal.no_waitType:bool. Default:false. If the client should not wait for the server's reply.argumentsType:array. Default:[].ticketType:null | integer. Default:null.
You can also reconfigure the exchange instance using $exchange->reconfigure($options). The $options array accepts
the above keys as well.
Also, you can use the following methods to configure your exchange instance.
setName- Accepts:string. The only way to change exchange name after instantiation.setDeclare- Accepts:bool.setType- Accepts:bool.setPassive- Accepts:bool.setDurable- Accepts:bool.setAutoDelete- Accepts:bool.setInternal- Accepts:bool.setNowait- Accepts:bool.setArguments- Accepts:array.setTicket- Accepts:null | integer.
Queue
To instantiate a queue, you can do like
<?php use Anik\Amqp\Queues\Queue; $queue = new Queue('anik.amqp.direct.exchange.queue'); $queue = Queue::make(['name' => 'anik.amqp.direct.exchange.queue']);
When creating a queue instance with
Queue::make-namekeys must be present in the given array.
The Queue::make method also accepts the following keys when making a queue instance.
declareType:bool. Default:false. If you want to declare the queue.passiveType:bool. Default:false. If the queue is passive.durableType:bool. Default:true. If the queue is durable.exclusiveType:bool. Default:false. If the queue is exclusive.auto_deleteType:bool. Default:false. If the queue should auto delete.no_waitType:bool. Default:false. If the client should not wait for the server's reply.argumentsType:array. Default:[].ticketType:null | integer. Default:null.
You can also reconfigure the queue instance using $queue->reconfigure($options). The $options array accepts the
above keys as well.
Also, you can use the following methods to configure your queue instance.
setName- Accepts:string. The only way to change queue name after instantiation.setDeclare- Accepts:bool.setType- Accepts:bool.setPassive- Accepts:bool.setDurable- Accepts:bool.setExclusive- Accepts:bool.setAutoDelete- Accepts:bool.setNowait- Accepts:bool.setArguments- Accepts:array.setTicket- Accepts:null | integer.
Qos
To instantiate a Qos, you can do like
<?php use Anik\Amqp\Qos\Qos; $prefetchSize = 0; $prefetchCount = 0; $global = false; $qos = new Qos($prefetchSize, $prefetchCount, $global); $qos = Queue::make(['prefetch_size' => $prefetchSize, 'prefetch_count' => $prefetchCount, 'global' => $global]);
The Qos::make method also accepts the following key when making a qos instance.
prefetch_sizeType:int. Default:0.prefetch_countType:int. Default:0.globalType:bool. Default:true.
You can also reconfigure the qos instance using $qos->reconfigure($options). The $options array accepts the above
keys as well.
Also, you can use the following methods to configure your qos instance.
setPrefetchCount- Accepts:int.setPrefetchSize- Accepts:int.setGlobal- Accepts:bool.
Publish/Produce message
To produce/publish messages, you'll need the Anik\Amqp\Producer instance. To instantiate the class
<?php use Anik\Amqp\Producer; $producer = new Producer($connection, $channel);
The constructor accepts
$connectionType:PhpAmqpLib\Connection\AbstractConnection. Required.$channelType:null | PhpAmqpLib\Channel\AMQPChannel. Optional.
If $channel is not provided or null, class uses the channel from the $connection.
Once the producer class is instantiated, you can set a channel with setChannel. Method
accepts PhpAmqpLib\Channel\AMQPChannel instance.
There are three ways to publish messages
Bulk Publish
Producer::publishBatch - to publish multiple messages in bulk.
<?php use Anik\Amqp\Producer; (new Producer($connection))->publishBatch($messages, $routingKey, $exchange, $options);
$messagesType:Anik\Amqp\Producible[]. If any of the message is not the type ofProducibleinterface, it'll throw error.$routingKeyType:string. Routing key. Default''(empty string).$exchangeType:null | Anik\Amqp\Exchanges\Exchange.$optionsType:array. Runtime configuration.- Key
exchange- Accepts:array.- If you pass
nullas$exchange, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass$exchangewith Exchange instance and$options['exchange'], exchange instance will be reconfigured accordingly with the values available in$options['exchange']. Keys are same asExchange::make's$options.
- If you pass
- Key
publish- Accepts:array.- Key
mandatoryDefaultfalse. - Key
immediateDefaultfalse. - Key
ticketDefaultnull. - Key
batch_count. Default:500. To make a batch of X messages before publishing a batch.
- Key
- Key
Publish
Producer::publish - to publish a single message. Uses Producer::publishBatch under the hood.
<?php use Anik\Amqp\Producer; (new Producer($connection))->publish($message, $routingKey, $exchange, $options);
$messageType:Anik\Amqp\Producible.$routingKeyType:string. Routing key. Default''(empty string).$exchangeType:null | Anik\Amqp\Exchanges\Exchange.$optionsType:array. Runtime configuration.- Key
exchange- Accepts:array.- If you pass
nullas$exchange, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass$exchangewith Exchange instance and$options['exchange'], exchange instance will be reconfigured accordingly with the values available in$options['exchange']. Keys are same asExchange::make's$options.
- If you pass
- Key
publish- Accepts:array.- Key
mandatoryDefaultfalse. - Key
immediateDefaultfalse. - Key
ticketDefaultnull.
- Key
- Key
Publish Basic
Producer::publishBasic - to publish a single message using AMQPChannel::basic_publish method.
<?php use Anik\Amqp\Producer; (new Producer($connection))->publishBasic($message, $routingKey, $exchange, $options);
$messageType:Anik\Amqp\Producible.$routingKeyType:string. Routing key. Default''(empty string).$exchangeType:null | Anik\Amqp\Exchanges\Exchange.$optionsType:array. Runtime configuration.- Key
exchange- Accepts:array.- If you pass
nullas$exchange, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass$exchangewith Exchange instance and$options['exchange'], exchange instance will be reconfigured accordingly with the values available in$options['exchange']. Keys are same asExchange::make's$options.
- If you pass
- Key
publish- Accepts:array.- Key
mandatoryDefaultfalse. - Key
immediateDefaultfalse. - Key
ticketDefaultnull.
- Key
- Key
ProducibleMessage: Implementation of Producible Interface
The package comes with Anik\Amqp\ProducibleMessage, a generic implementation of Anik\Amqp\Producible interface.
You can instantiate the class like
<?php use Anik\Amqp\ProducibleMessage; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $msg = new ProducibleMessage('take my message to rabbitmq'); $msg = new ProducibleMessage('take my message to rabbitmq', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]); $msg = (new ProducibleMessage())->setMessage('take my message to rabbitmq')->setProperties([ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['key' => 'value']), ]);
Consumer
To consume messages, you'll need the Anik\Amqp\Consumer instance. To instantiate the class
<?php use Anik\Amqp\Consumer; $consumer = new Consumer($connection, $channel, $options);
The constructor accepts
$connectionType:PhpAmqpLib\Connection\AbstractConnection. Required.$channelType:null | PhpAmqpLib\Channel\AMQPChannel. Optional.$optionsType:array. Optional. Configurations for consumer.tagType:string. Defaultsprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid()). To set consumer tag.no_localType:bool. Defaultfalse.no_ackType:bool. Defaultfalse.exclusiveType:bool. Defaultfalse.no_waitType:bool. Defaultfalse.argumentsType:bool. Default[].ticketType:null | int. Defaultnull.
If $channel is not provided or null, class uses the channel from the $connection.
Once the consumer class is instantiated, you can access the following methods.
setChannel- Accepts:PhpAmqpLib\Channel\AMQPChannelinstance.reconfigure- Accepts:array. To reconfigure the instance. Valid keys are same as constructor's options keys.setConsumerTag- Accepts:string. Defaultsprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid()).setNoLocal- Accepts:bool. Defaultfalse.setNoAck- Accepts:bool. Defaultfalse.setExclusive- Accepts:bool. Defaultfalse.setNowait- Accepts:bool. Defaultfalse.setArguments- Accepts:array. Default[].setTicket- Accepts:null | int. Defaultnull.
To consume messages,
<?php use Anik\Amqp\Consumer; (new Consumer($connection, $channel, $options))->consume($handler, $bindingKey, $exchange, $queue, $qos, $options);
$handlerType:Anik\Amqp\Consumable.$bindingKeyType:string. Binding key. Default''(empty string).$exchangeType:null | Anik\Amqp\Exchanges\Exchange.$queueType:null | Anik\Amqp\Queues\Queue.$qosType:null | Anik\Amqp\Qos\Qos.$optionsType:array. Runtime configuration.consumer- Accepts:array. Keys are same asConsumer::__construct's options.exchange- Accepts:array. Keys are same asExchange::make's options.- If you pass
nullas$exchange, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass$exchangewith Exchange instance and$options['exchange'], exchange instance will be reconfigured accordingly with the values available in$options['exchange'].
- If you pass
queue- Accepts:array. Keys are same asQueue::make's options.- If you pass
nullas$queue, then you must provide a valid configuration through this key to create a queue under the hood. If you pass$queuewith Queue instance and$options['queue'], queue instance will be reconfigured accordingly with the values available in$options['queue'].
- If you pass
qos- Accepts:array. Keys are same asQos::make's options.- If you pass
$qoswith Qos instance and$options['qos'], qos instance will be reconfigured accordingly. If$qosis null and$options['qos']holds value, QoS will be applied to the channel. If$qosisnulland$options['qos']is not available, NO QoS WILL BE APPLIED TO THE CHANNEL
- If you pass
bind- Accepts:array. For binding queue to the exchange.no_wait. Defaultfalse.arguments. Default[].ticket. Defaultnull.
consume- Accepts:array. Following values are passed to theAMQPChannel::wait().allowed_methodsDefaultnull.non_blockingDefaultfalse.timeoutDefault0.
ConsumableMessage: Implementation of Consumable Interface
The package comes with Anik\Amqp\ConsumableMessage, a generic implementation of Anik\Amqp\Consumable interface.
You can instantiate the class like
<?php use Anik\Amqp\ConsumableMessage; // use PhpAmqpLib\Message\AMQPMessage; $msg = new ConsumableMessage(function (ConsumableMessage $message/*, AMQPMessage $original*/) { echo $message->getMessageBody() . PHP_EOL; echo $message->getRoutingKey() . PHP_EOL; $message->ack(); // Alternatively, $original->ack(); /** * Method: `decodeMessage` * Returns: * - `array` if message body contains valid json * - `null` if json could not be decoded */ var_dump($message->decodeMessage()); /** * Method: `decodeMessageAsObject` * Returns: * - `\stdClass` if message body contains valid json * - `null` if json could not be decoded */ var_dump($message->decodeMessageAsObject()); });
NOTE: Calling any method on ConsumableMessage instance without setting AMQPMessage will throw exception.
Issues?
If you find any issue/bug/missing feature, please submit an issue and PRs if possible.