werkspot/message-queue

This package is abandoned and no longer maintained. No replacement package was suggested.

A generic message queue.

dev-master / 1.0.x-dev 2017-09-19 15:00 UTC

This package is auto-updated.

Last update: 2020-10-21 07:22:30 UTC


README

Author Software License Latest Version Total Downloads

Build Status Coverage Status Quality Score

What this project is

A library capable of delivering a message to a destination asynchronously, as soon as possible or at a specified date and time.

The message to be delivered can be anything but its serialization must be taken care of by a MessageRepositoryInterface, who's implementation must be provided by the code using this library.

The destination can be specified by any string, but the interpretation of that string, and the effective delivery of the message to the destination must be taken care by the MessageDeliveryServiceInterface, who's implementation must be provided by the code using this library.

This MessageQueue uses two internal queues, one for messages that are scheduled for delivery (the ScheduledQueue, using some persistence mechanism like MySQL) and another queue for messages that are in line for delivery (the DeliveryQueue, using rabbitMq).

Why this project exists

A message queue is useful to run asynchronous tasks, as soon as possible or at a specified date and time, thus balancing the load on the servers across the time, and allowing for faster responses to users as they will not need to wait for tasks to be done inline which can be done async, like for example sending out emails.

On top of this library we can build a Message Bus, which can decide if a message should be delivered in sync or async. In turn, on top of that Message Bus we can build a Command Bus, which delivers one message to only one destination, or an Event Bus, which can deliver one message to several destinations.

Usage

The MessageQueueService is the entry point to the message queue.

    $messageQueueService = new MessageQueueService(
        new ScheduledQueueService(
            new MessageRepository(/*...*/) // implemented by the code using this library
        )
    );
    
    $messageQueueService->enqueueMessage(
        $someObjectOrStringOrWhatever,      // some payload to deliver, persisted by the MessageRepository
        '{"deliver_to": "SomeServiceId"}',  // destination to be decoded by the delivery service (MessageDeliveryServiceInterface)
        new DateTimeImmutable(),            // delivery date and time
        5,                                  // priority
        []                                  // some whatever metadata
    );

in order to move messages from the ScheduledQueue to the DeliveryQueue we need one ScheduledQueueToDeliveryQueueWorker to be running in the background. And to move messages from the DeliveryQueue to the actual destination we need at least one DeliveryQueueToHandlerWorker to be running in the background.

Our $scheduledQueueWorker will be run, for example, by a CLI command which will be kept alive by a process management tool like Supervisor.

    $scheduledQueueWorker = new ScheduledQueueToDeliveryQueueWorker(
        new ScheduledQueueService(new MessageRepository(/*...*/)),
        new AmqpProducer(new AMQPLazyConnection(/*...*/), new UuidMessageIdGenerator()),
        'some_queue_name',
        new SomeLogger(/*...*/)
    );
    
    $scheduledQueueWorker->moveMessageBatch(50);

Like the $scheduledQueueWorker, the $deliveryQueueWorker is also started by a CLI command and kept alive by a process management tool like Supervisor.

    $logger = new SomeLogger(/*...*/);
    
    $deliveryQueueWorker = new DeliveryQueueToHandlerWorker(
        new AmqpConsumer(
            new AMQPLazyConnection(/*...*/),
            new AmqpMessageHandler(
                new MessageHandler(/*...*/),
                new SomeCache(/*...*/),
                new PersistenceClient(/*...*/),
                $logger
            ),
            $logger
        ),
        'some_queue_name'
    );
    
    $deliveryQueueWorker->startConsuming(300);

Installation

To install the library, run the command below and you will get the latest version:

composer require werkspot/message-queue

Tests

To execute the tests run:

make test

Coverage

To generate the test coverage run:

make test_with_coverage

Code standards

To fix the code standards run:

make cs-fix