
AMQP wrapper for Laravel and Lumen to publish and consume messages

Simple PhpAmqpLib wrapper for interaction with RabbitMQ

Add the following to your require part within the composer.json:

"comlaude/laravel-amqp": "^1.0.0"
$ php composer update


$ php composer require comlaude/laravel-amqp



Create a config folder in the root directory of your Lumen application and copy the content from vendor/comlaude/laravel-amqp/config/amqp.php to config/amqp.php.

Adjust the properties to your needs.

return [

    'use' => 'production',

    'properties' => [

        'production' => [
            'host'                  => 'localhost',
            'port'                  =>  5672,
            'username'              => '',
            'password'              => '',
            'vhost'                 => '/',
            'connect_options'       => [],
            'ssl_options'           => [],

            'exchange'              => 'amq.topic',
            'exchange_type'         => 'topic',
            'exchange_passive'      => false,
            'exchange_durable'      => true,
            'exchange_auto_delete'  => false,
            'exchange_internal'     => false,
            'exchange_nowait'       => false,
            'exchange_properties'   => [],

            'queue_force_declare'   => false,
            'queue_passive'         => false,
            'queue_durable'         => true,          // only change when not using quorum queues
            'queue_exclusive'       => false,
            'queue_auto_delete'     => false,         // only change when not using quorum queues
            'queue_nowait'          => false,
            'queue_properties'      => [
                'x-ha-policy' => ['S', 'all'],
                'x-queue-type' => ['S', 'quorum'],
                // 'x-dead-letter-exchange' => ['S', 'amq.topic-dlx'], // if provided an exchange and queue (queue_name-dlx) will be automatically declared
                // 'x-delivery-limit' => ['I', 5],                     // the delivery limit will be set on the relevant queue but not the DLX queue itself
            'queue_acknowledge_is_final' => true,     // if important work is done inside a consumer after the acknowledge call, this should be false
            'queue_reject_is_final'      => true,     // if important work is done inside a consumer after the reject call, this should be false

            'consumer_tag'              => '',
            'consumer_no_local'         => false,
            'consumer_no_ack'           => false,
            'consumer_exclusive'        => false,
            'consumer_nowait'           => false,
            'timeout'                   => 0,        // seconds
            'persistent'                => false,
            'persistent_restart_period' => 0,        // seconds
            'request_accepted_timeout'  => 0.5,      // seconds in decimal accepted
            'request_handled_timeout'   => 5,        // seconds in decimal accepted

            'qos'                   => true,
            'qos_prefetch_size'     => 0,
            'qos_prefetch_count'    => 1,
            'qos_a_global'          => false,

Register the Lumen Service Provider in bootstrap/app.php:

| Register Service Providers




Add Facade Support for Lumen 5.2+

$app->withFacades(true, [
    'ComLaude\Amqp\Facades\Amqp' => 'Amqp',


Open config/app.php and add the service provider and alias:

'Amqp' => 'ComLaude\Amqp\Facades\Amqp',

Publishing a message

Push message with routing key

Amqp::publish('routing-key', 'message');

Push message with routing key and overwrite properties

Amqp::publish('routing-key', 'message' , ['exchange' => '']);

Consuming messages

Consume messages forever

Amqp::consume(function ($message) {




Consume messages, with custom settings

Amqp::consume(function ($message) {



}, [
    'timeout' => 2,
    'vhost'   => 'vhost3',
    'queue'   => 'queue-name',
    'persistent' => true // required if you want to listen forever

Fanout example

Publishing a message

Amqp::publish('', 'message' , [
    'exchange_type' => 'fanout',
    'exchange' => 'amq.fanout',

Disable publishing

This is useful for development and sync requirements, if you are using observers or events to trigger messages over AMQP you may want to temporarily disable the publishing of messages. When turning the publishing off the publish method will silently drop the message and return.

Check state

if(Amqp::isEnabled()) {
    // It is going to publish





Remote procedure call server and client

RPC is potentially an anti-pattern in a microservices world so do not use it carelessly, nevertheless sometimes you just need that request-response behaviour and you're willing to accept its limitations. Simply return a response from within a consumer handler, if the message is a request from a client, the response will automatically be routed to the correct requestor. There are 2 configurable timeouts to prevent infinite-blocking waits.

request_accepted_timeout - time to wait for confirmation from server that a job is being worked on, this is a check if anybody is listening at all and should be quite small

request_handled_timeout - time to wait for the full request to be completed (all messages), be careful to ensure this is large enough if your job is long-lasting or if the number of messages to be handled is large


Amqp::consume(function ($message) {


   return "I handled this message " . $message->getBody();



Amqp::request('example.routing.key', [


], function ($message) {

   echo("The remote server said " . $message->getBody());


// Or for single message requests you can just do
$response = Amqp::requestWithResponse('example.routing.key', 'quickly');
// $response is already the message content as a string "I handled this message quickly"

Consume messages, with dead letter exchange configured

When using the x-dead-letter-exchange parameter in queue properties the package will additionally:

  • declare the <queue_name>-dlx queue
  • declare the exchange itself

When the consumer fails or requeues the message for 5 times the message will instead be routed to this new queue via the dead letter exchange.

Amqp::consume(function ($message) {



}, [
    'timeout' => 2,
    'vhost'   => 'vhost3',
    'queue'   => 'my-example-queue',
    'persistent' => true // required if you want to listen forever
    'queue_properties'      => [
        'x-ha-policy' => ['S', 'all'],
        'x-queue-type' => ['S', 'quorum'],
        'x-dead-letter-exchange' => ['S', 'amq.topic-dlx'], // will auto-declare queue named my-example-queue-dlx
        'x-delivery-limit' => ['I', 5], // after 5 deliveries the message is routed to my-example-queue-dlx



This package is open-sourced software licensed under the MIT license