avto-dev/amqp-rabbit-laravel-queue

RabbitMQ laravel queue driver

v2.5.0 2021-04-16 06:53 UTC

This package is auto-updated.

Last update: 2021-11-16 08:10:10 UTC


README

Laravel

RabbitMQ-based Laravel queue driver

Version PHP Version Build Status Coverage Downloads count License

This package allows to use RabbitMQ queues for queued Laravel (prioritized) jobs. Fully configurable.

Installed php extension ext-amqp is required. Installation steps can be found in Dockerfile.

For jobs delaying you also should install rabbitmq-delayed-message-exchange plugin for RabbitMQ. Delaying is optional feature.

Install

Important: Before using this package you should install avto-dev/amqp-rabbit-manager into your application. Installation steps can be found here.

Require this package with composer using the following command:

$ composer require avto-dev/amqp-rabbit-laravel-queue "^2.0"

Installed composer is required (how to install composer). Also you need to fix the major version of package.

You need to fix the major version of package.

After that you should modify your configuration files:

./config/rabbitmq.php

RabbitMQ queues and exchanges configuration:

<?php

use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;

return [

    // ...

    'queues' => [

        'jobs' => [
            'name'         => env('JOBS_QUEUE_NAME', 'jobs'),
            'flags'        => AmqpQueue::FLAG_DURABLE, // Remain queue active when a server restarts
            'arguments'    => [
                'x-max-priority' => 255, // @link <https://www.rabbitmq.com/priority.html>
            ],
            'consumer_tag' => null,
        ],

        'failed' => [
            'name'         => env('FAILED_JOBS_QUEUE_NAME', 'failed-jobs'),
            'flags'        => AmqpQueue::FLAG_DURABLE,
            'arguments'    => [
                'x-message-ttl' => 604800000, // 7 days, @link <https://www.rabbitmq.com/ttl.html>
                'x-queue-mode'  => 'lazy', // @link <https://www.rabbitmq.com/lazy-queues.html>
            ],
            'consumer_tag' => null,
        ],

    ],

    // ...

    'exchanges' => [

        // RabbitMQ Delayed Message Plugin is required (@link: <https://git.io/fj4SE>)
        'delayed-jobs' => [
            'name'      => env('DELAYED_JOBS_EXCHANGE_NAME', 'jobs.delayed'),
            'type'      => 'x-delayed-message',
            'flags'     => AmqpTopic::FLAG_DURABLE, // Remain active when a server restarts
            'arguments' => [
                'x-delayed-type' => AmqpTopic::TYPE_DIRECT,
            ],
        ],

    ],

    // ...

    'setup' => [
        'rabbit-default' => [
            'queues' => [
                'jobs',
                'failed',
            ],
            'exchanges' => [
                'delayed-jobs'
            ],
        ],
    ],
];

./config/queue.php

Laravel queue settings:

<?php

use AvtoDev\AmqpRabbitLaravelQueue\Connector;

return [

    // ...

    'default' => env('QUEUE_DRIVER', 'rabbitmq'),

    // ...

    'connections' => [

        // ...

        'rabbitmq' => [
            'driver'              => Connector::NAME,
            'connection'          => 'rabbit-default',
            'queue_id'            => 'jobs',
            'delayed_exchange_id' => 'delayed-jobs',
            'timeout'             => (int) env('QUEUE_TIMEOUT', 0), // The timeout is in milliseconds
            'resume'              => (bool) env('QUEUE_RESUME', false), // Resume consuming when timeout is over
        ],
    ],

    // ...

    'failed' => [
        'connection' => 'rabbit-default',
        'queue_id'   => 'failed',
    ],
];

resume can be used with non-zero timeout value for periodic connection reloading (for example, if you set 'timeout' => 30000 and 'resume' => true, queue worker will unsubscribe and subscribe back to the queue every 30 seconds without process exiting).

You can remove delayed_exchange_id for disabling delayed jobs feature.

At the end, don't forget to execute command php ./artisan rabbit:setup.

How jobs delaying works?

Very simple:

Usage

You can dispatch your jobs as usual (dispatch(new Job) or dispatch(new Job)->delay(10)), commands like queue:work, queue:failed, queue:retry and others works fine.

Additional features:

  • Jobs delaying (plugin rabbitmq_delayed_message_exchange for RabbitMQ server is required);
  • Jobs priority (job should implements PrioritizedJobInterface interface);
  • Automatically delayed messages exchanges bindings (only if you use command rabbit:setup for queues and exchanges creation);
  • The ability to store the state of job

State storing

Using this package you can store any variables (except resources and callable entities) between job restarts (just use trait WithJobStateTrait in your job class). But you should remember - state is available only inside job handle method.

Consumer custom tag prefix

Every consumer has an identifier that is used by client libraries to determine what handler to invoke for a given delivery. Their names vary from protocol to protocol. Consumer tags and subscription IDs are two most commonly used terms.

If you want to add custom prefix to the consumer tag, you can specify it with an additional argument in the AvtoDev\AmqpRabbitLaravelQueue\Worker::__construct method.

⚠️ Warning

Be careful with commands queue:failed and queue:retry. If during command execution something happens (lost connection, etc) you may loose all failed jobs!

You should avoid to use next method (broker does not guarantee operations order, so calling results may be wrong):

  • \AvtoDev\AmqpRabbitLaravelQueue\Queue::size()
  • \AvtoDev\AmqpRabbitLaravelQueue\Failed\RabbitQueueFailedJobProvider::count()

Testing

For package testing we use phpunit framework and docker-ce + docker-compose as develop environment. So, just write into your terminal after repository cloning:

$ make build
$ make latest # or 'make lowest'
$ make test

Changes log

Release date Commits since latest release

Changes log can be found here.

Support

Issues Issues

If you will find any package errors, please, make an issue in current repository.

License

This is open-sourced software licensed under the MIT License.