RabbitMQ-based Laravel queue driver

This package allows to use RabbitMQ queues for queued Laravel (prioritized) jobs. Fully configurable.

Installed php extension ext-amqp is required. Installation steps can be found in Dockerfile.

For jobs delaying you also should install rabbitmq-delayed-message-exchange plugin for RabbitMQ. Delaying is optional feature.


Important: Before using this package you should install avto-dev/amqp-rabbit-manager into your application. Installation steps can be found here.

Require this package with composer using the following command:

$ composer require avto-dev/amqp-rabbit-laravel-queue "^2.0"

Installed composer is required (how to install composer). Also you need to fix the major version of package.

You need to fix the major version of package.

After that you should modify your configuration files:


RabbitMQ queues and exchanges configuration:


use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;

return [

    // ...

    'queues' => [

        'jobs' => [
            'name'         => env('JOBS_QUEUE_NAME', 'jobs'),
            'flags'        => AmqpQueue::FLAG_DURABLE, // Remain queue active when a server restarts
            'arguments'    => [
                'x-max-priority' => 255, // @link <https://www.rabbitmq.com/priority.html>
            'consumer_tag' => null,

        'failed' => [
            'name'         => env('FAILED_JOBS_QUEUE_NAME', 'failed-jobs'),
            'flags'        => AmqpQueue::FLAG_DURABLE,
            'arguments'    => [
                'x-message-ttl' => 604800000, // 7 days, @link <https://www.rabbitmq.com/ttl.html>
                'x-queue-mode'  => 'lazy', // @link <https://www.rabbitmq.com/lazy-queues.html>
            'consumer_tag' => null,


    // ...

    'exchanges' => [

        // RabbitMQ Delayed Message Plugin is required (@link: <https://git.io/fj4SE>)
        'delayed-jobs' => [
            'name'      => env('DELAYED_JOBS_EXCHANGE_NAME', 'jobs.delayed'),
            'type'      => 'x-delayed-message',
            'flags'     => AmqpTopic::FLAG_DURABLE, // Remain active when a server restarts
            'arguments' => [
                'x-delayed-type' => AmqpTopic::TYPE_DIRECT,


    // ...

    'setup' => [
        'rabbit-default' => [
            'queues' => [
            'exchanges' => [


Laravel queue settings:


use AvtoDev\AmqpRabbitLaravelQueue\Connector;

return [

    // ...

    'default' => env('QUEUE_DRIVER', 'rabbitmq'),

    // ...

    'connections' => [

        // ...

        'rabbitmq' => [
            'driver'              => Connector::NAME,
            'connection'          => 'rabbit-default',
            'queue_id'            => 'jobs',
            'delayed_exchange_id' => 'delayed-jobs',
            'timeout'             => (int) env('QUEUE_TIMEOUT', 0), // The timeout is in milliseconds
            'resume'              => (bool) env('QUEUE_RESUME', false), // Resume consuming when timeout is over

    // ...

    'failed' => [
        'connection' => 'rabbit-default',
        'queue_id'   => 'failed',

resume can be used with non-zero timeout value for periodic connection reloading (for example, if you set 'timeout' => 30000 and 'resume' => true, queue worker will unsubscribe and subscribe back to the queue every 30 seconds without process exiting).

You can remove delayed_exchange_id for disabling delayed jobs feature.

At the end, don't forget to execute command php ./artisan rabbit:setup.

How jobs delaying works?

Very simple:


You can dispatch your jobs as usual (dispatch(new Job) or dispatch(new Job)->delay(10)), commands like queue:work, queue:failed, queue:retry and others works fine.

Additional features:

  • Jobs delaying (plugin rabbitmq_delayed_message_exchange for RabbitMQ server is required);
  • Jobs priority (job should implements PrioritizedJobInterface interface);
  • Automatically delayed messages exchanges bindings (only if you use command rabbit:setup for queues and exchanges creation);
  • The ability to store the state of job

State storing

Using this package you can store any variables (except resources and callable entities) between job restarts (just use trait WithJobStateTrait in your job class). But you should remember - state is available only inside job handle method.

Consumer custom tag prefix

Every consumer has an identifier that is used by client libraries to determine what handler to invoke for a given delivery. Their names vary from protocol to protocol. Consumer tags and subscription IDs are two most commonly used terms.

If you want to add custom prefix to the consumer tag, you can specify it with an additional argument in the AvtoDev\AmqpRabbitLaravelQueue\Worker::__construct method.

⚠️ Warning

Be careful with commands queue:failed and queue:retry. If during command execution something happens (lost connection, etc) you may loose all failed jobs!

You should avoid to use next method (broker does not guarantee operations order, so calling results may be wrong):

  • \AvtoDev\AmqpRabbitLaravelQueue\Queue::size()
  • \AvtoDev\AmqpRabbitLaravelQueue\Failed\RabbitQueueFailedJobProvider::count()


For package testing we use phpunit framework and docker-ce + docker-compose as develop environment. So, just write into your terminal after repository cloning:

$ make build
$ make latest # or 'make lowest'
$ make test

