werkspot / message-queue
A generic message queue.
Requires
- php: ^7.1
- php-amqplib/php-amqplib: ^2.6
- psr/simple-cache: ^1.0
- ramsey/uuid: ^3.5
- roave/security-advisories: dev-master
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.0
- mockery/mockery: v1.0.0-alpha1
- phpunit/phpunit: ^6.0
This package is auto-updated.
Last update: 2020-10-21 07:22:30 UTC
README
What this project is
A library capable of delivering a message to a destination asynchronously, as soon as possible or at a specified date and time.
The message to be delivered can be anything but its serialization must be taken care of by a MessageRepositoryInterface, who's implementation must be provided by the code using this library.
The destination can be specified by any string, but the interpretation of that string, and the effective delivery of the message to the destination must be taken care by the MessageDeliveryServiceInterface, who's implementation must be provided by the code using this library.
This MessageQueue uses two internal queues, one for messages that are scheduled for delivery (the ScheduledQueue, using some persistence mechanism like MySQL) and another queue for messages that are in line for delivery (the DeliveryQueue, using rabbitMq).
Why this project exists
A message queue is useful to run asynchronous tasks, as soon as possible or at a specified date and time, thus balancing the load on the servers across the time, and allowing for faster responses to users as they will not need to wait for tasks to be done inline which can be done async, like for example sending out emails.
On top of this library we can build a Message Bus, which can decide if a message should be delivered in sync or async. In turn, on top of that Message Bus we can build a Command Bus, which delivers one message to only one destination, or an Event Bus, which can deliver one message to several destinations.
Usage
The MessageQueueService
is the entry point to the message queue.
$messageQueueService = new MessageQueueService( new ScheduledQueueService( new MessageRepository(/*...*/) // implemented by the code using this library ) ); $messageQueueService->enqueueMessage( $someObjectOrStringOrWhatever, // some payload to deliver, persisted by the MessageRepository '{"deliver_to": "SomeServiceId"}', // destination to be decoded by the delivery service (MessageDeliveryServiceInterface) new DateTimeImmutable(), // delivery date and time 5, // priority [] // some whatever metadata );
in order to move messages from the ScheduledQueue to the DeliveryQueue we need one ScheduledQueueToDeliveryQueueWorker to be running in the background. And to move messages from the DeliveryQueue to the actual destination we need at least one DeliveryQueueToHandlerWorker to be running in the background.
Our $scheduledQueueWorker
will be run, for example, by a CLI command which will be kept alive by a process management
tool like Supervisor.
$scheduledQueueWorker = new ScheduledQueueToDeliveryQueueWorker( new ScheduledQueueService(new MessageRepository(/*...*/)), new AmqpProducer(new AMQPLazyConnection(/*...*/), new UuidMessageIdGenerator()), 'some_queue_name', new SomeLogger(/*...*/) ); $scheduledQueueWorker->moveMessageBatch(50);
Like the $scheduledQueueWorker
, the $deliveryQueueWorker
is also started by a CLI command and kept alive by a
process management tool like Supervisor.
$logger = new SomeLogger(/*...*/); $deliveryQueueWorker = new DeliveryQueueToHandlerWorker( new AmqpConsumer( new AMQPLazyConnection(/*...*/), new AmqpMessageHandler( new MessageHandler(/*...*/), new SomeCache(/*...*/), new PersistenceClient(/*...*/), $logger ), $logger ), 'some_queue_name' ); $deliveryQueueWorker->startConsuming(300);
Installation
To install the library, run the command below and you will get the latest version:
composer require werkspot/message-queue
Tests
To execute the tests run:
make test
Coverage
To generate the test coverage run:
make test_with_coverage
Code standards
To fix the code standards run:
make cs-fix