RabbitMQ laravel queue driver
This package allows to use RabbitMQ queues for queued Laravel (prioritized) jobs. Fully configurable.
Installed php extension
ext-amqp is required. Installation steps can be found in Dockerfile.
For jobs delaying you also should install
rabbitmq-delayed-message-exchange plugin for RabbitMQ. Delaying is optional feature.
Require this package with composer using the following command:
$ composer require avto-dev/amqp-rabbit-laravel-queue "^2.0"
composeris required (how to install composer). Also you need to fix the major version of package.
You need to fix the major version of package.
After that you should modify your configuration files:
RabbitMQ queues and exchanges configuration:
<?php use Interop\Amqp\AmqpQueue; use Interop\Amqp\AmqpTopic; return [ // ... 'queues' => [ 'jobs' => [ 'name' => env('JOBS_QUEUE_NAME', 'jobs'), 'flags' => AmqpQueue::FLAG_DURABLE, // Remain queue active when a server restarts 'arguments' => [ 'x-max-priority' => 255, // @link <https://www.rabbitmq.com/priority.html> ], 'consumer_tag' => null, ], 'failed' => [ 'name' => env('FAILED_JOBS_QUEUE_NAME', 'failed-jobs'), 'flags' => AmqpQueue::FLAG_DURABLE, 'arguments' => [ 'x-message-ttl' => 604800000, // 7 days, @link <https://www.rabbitmq.com/ttl.html> 'x-queue-mode' => 'lazy', // @link <https://www.rabbitmq.com/lazy-queues.html> ], 'consumer_tag' => null, ], ], // ... 'exchanges' => [ // RabbitMQ Delayed Message Plugin is required (@link: <https://git.io/fj4SE>) 'delayed-jobs' => [ 'name' => env('DELAYED_JOBS_EXCHANGE_NAME', 'jobs.delayed'), 'type' => 'x-delayed-message', 'flags' => AmqpTopic::FLAG_DURABLE, // Remain active when a server restarts 'arguments' => [ 'x-delayed-type' => AmqpTopic::TYPE_DIRECT, ], ], ], // ... 'setup' => [ 'rabbit-default' => [ 'queues' => [ 'jobs', 'failed', ], 'exchanges' => [ 'delayed-jobs' ], ], ], ];
Laravel queue settings:
<?php use AvtoDev\AmqpRabbitLaravelQueue\Connector; return [ // ... 'default' => env('QUEUE_DRIVER', 'rabbitmq'), // ... 'connections' => [ // ... 'rabbitmq' => [ 'driver' => Connector::NAME, 'connection' => 'rabbit-default', 'queue_id' => 'jobs', 'delayed_exchange_id' => 'delayed-jobs', 'timeout' => (int) env('QUEUE_TIMEOUT', 0), // The timeout is in milliseconds 'resume' => (bool) env('QUEUE_RESUME', false), // Resume consuming when timeout is over ], ], // ... 'failed' => [ 'connection' => 'rabbit-default', 'queue_id' => 'failed', ], ];
resumecan be used with non-zero
timeoutvalue for periodic connection reloading (for example, if you set
'timeout' => 30000and
'resume' => true, queue worker will unsubscribe and subscribe back to the queue every 30 seconds without process exiting).
You can remove
delayed_exchange_id for disabling delayed jobs feature.
At the end, don't forget to execute command
php ./artisan rabbit:setup.
You can dispatch your jobs as usual (
dispatch(new Job) or
dispatch(new Job)->delay(10)), commands like
queue:retry and others works fine.
- Jobs delaying (plugin
rabbitmq_delayed_message_exchangefor RabbitMQ server is required);
- Jobs priority (job should implements
- Automatically delayed messages exchanges bindings (only if you use command
rabbit:setupfor queues and exchanges creation);
- The ability to store the state of
Using this package you can store any variables (except resources and callable entities) between job restarts (just use trait
WithJobStateTrait in your job class). But you should remember - state is available only inside job
Every consumer has an identifier that is used by client libraries to determine what handler to invoke for a given delivery. Their names vary from protocol to protocol. Consumer tags and subscription IDs are two most commonly used terms.
If you want to add custom prefix to the consumer tag, you can specify it with an additional argument in the
Be careful with commands
queue:retry. If during command execution something happens (lost connection, etc) you may loose all failed jobs!
You should avoid to use next method (broker does not guarantee operations order, so calling results may be wrong):
For package testing we use
phpunit framework and
docker-compose as develop environment. So, just write into your terminal after repository cloning:
$ make build $ make latest # or 'make lowest' $ make test
Changes log can be found here.
If you will find any package errors, please, make an issue in current repository.
This is open-sourced software licensed under the MIT License.