A Flowpack.JobQueue.Common implementation to support RabbitMQ as backend
Disclaimer: This is a fork/remake of an initial version which was made by ttreeagency!
To use RabbitMQ as a backend for a JobQueue you need a running RabbitMQ Service. You can use our docker image which also includes the management console t3n/rabbitmq
Install the package using composer:
composer require t3n/jobqueue-rabbitmq
RabbitMQ allows a wide range of configuration and setups. Have a look at
Settings.yaml for an overview with all available
The simplest configuration for a job queue could look like this:
Flowpack: JobQueue: Common: queues: simple-queue: className: 't3n\JobQueue\RabbitMQ\Queue\RabbitQueue' executeIsolated: true options: queueOptions: # we want the queue to persist messages so they are stored even if no consumer (aka worker) is connected durable: true # multiple worker should be able to consume a queue at the same time exclusive: false autoDelete: false client: host: localhost port: 5672 username: guest password: guest
A more complex configuration could also configure an exchange and some routing keys.
In this example we will configure one exchange which all producers connect to. If your
RabbitMQ server has the
rabbitmq_delayed_message_exchange plugin (our docker image
ships it by default) enabled we can also use the delay feature.
This configuration will configure several queues for one exchange. Message are routed
to the queue based on a
routingKey. We will use three parts for it:
We will use the type
job to indicate it's a job to execute. We could also use something like
log or whatever. The shape of the routing key is up to you!
So after all we will configure several producers/job queues.
The producer queues are meant to be used with
The consumer queue will be used within the
./flow job:work command.
In this example we will end up with two queues. One queue with all messages matching
the routing key
vendor.jobs.*, so whenever the
producer-tags queue is used
the message will end up in this queue. And another queue that fetches all jobs for all vendors.
To actually start a consumer run
./flow job:work --queue all-jobs.
This pattern enables you to implement different kind of advanced pub/sup implementations.
Flowpack: JobQueue: Common: presets: rabbit: className: 't3n\JobQueue\RabbitMQ\Queue\RabbitQueue' executeIsolated: true maximumNumberOfReleases: 3 releaseOptions: delay: 5 options: routingKey: '' useDLX: true # enable support for dead letter exchange, requires configuration in RabbitMQ queueOptions: # don't declare a queue by default # all messages are forwarded to the exchange "t3n" declare: false exchangeName: 't3n' # the queue should be durable and don't delete itself durable: true autoDelete: false # several worker are allowed per queue exclusive: false exchange: name: 't3n' # this exchange should support delayed messages type: 'x-delayed-message' passive: false durable: true autoDelete: false arguments: # the origin type is topic so we can use routingkeys including `*` or `#` x-delayed-type: 'topic' client: host: localhost port: 5672 username: guest password: guest heartbeat: 60 # Sets RabbitMQ connection heartbeat to 60 seconds queues: producer-import: preset: rabbit options: routingKey: 'vendor.jobs.import' producer-tags: preset: rabbit options: routingKey: 'vendor.jobs.tags' vendor-jobs: preset: rabbit options: routingKey: 'vendor.jobs.*' queueOptions: # this queue should actually be declared declare: true all-jobs: preset: rabbit options: routingKey: '*.jobs.*' queueOptions: # this queue should actually be declared declare: true