vrok/messenger-reply

Symfony messenger middleware & stamps to reply to messages

v1.0.0 2020-06-18 15:32 UTC

This package is auto-updated.

Last update: 2024-04-29 10:29:16 UTC


README

This is a library to allow symfony/messenger to reply to messages with a result.
This is meant to be used in a setup with AMQP transport and two symfony instances talking to each other over the same broker: e.g. a web frontend and a microservice, where the frontend sends tasks to the service and requests a reply, for example a generated PDF file.

CI Status Coverage Status

Setup

Installation on both sides (You need the ReplyToStamp on the request side and the middleware + stamp on the receiving side): composer require vrok/messenger-reply

You need request and reply message classes that are equal on both sides, e.g. use a shared composer package:

namespace MyNamespace\Message;

class GeneratePdfMessage
{
    /**
     * @var string
     */
    private string $latex;

    public function __construct(string $latex)
    {
        $this->latex = $latex;
    }

    /**
     * @return string
     */
    public function getLatex(): string
    {
        return $this->latex;
    }
}

...

namespace MyNamespace\Message;

class PdfResultMessage
{
    private string $pdfContent;

    public function __construct(string $pdfContent)
    {
        $this->pdfContent = $pdfContent;
    }

    /**
     * @return string
     */
    public function getPdfContent(): string
    {
        return $this->pdfContent;
    }
}

Requesting side

Add a transport for the shared AMQP broker, routing to the input of the receiver (having the same exchange and queue name as the receivers input queue).

framework:
    messenger:
        transports:
            pdf-requests:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: pdf-service
                        type: direct
                        default_publish_routing_key: input
                    queues:
                        input:
                            binding_keys: [input]
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

Add a transport for the shared AMQP broker, for receiving the replies (matching the exchange and queue name of the receivers output queue): We need separate transports as messenger:consume [transportname] consumes all messages in all queues for that transport.

framework:
    messenger:
        transports:
            pdf-results:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: pdf-service
                        type: direct
                        default_publish_routing_key: output
                    queues:
                        input:
                            binding_keys: [output]
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

Route your requests to the shared transport/queue:

framework:
    messenger:
        routing:
            # e.g. 
            'MyNamespace\GeneratePdfMessage': pdf-requests

Replying side

Configure the middleware service:

services:
    Vrok\MessengerReply\ReplyMiddleware:
        tags:
            - { name: monolog.logger, channel: messenger }
        calls:
            - [setLogger, ['@logger']]

Enable the middleware on your message bus:
(We have to disable the default middleware and explicitly define the order as there is no priority option, just adding our service to the middleware-option would add it before send_middleware. See symfony/symfony#28568)

framework:
    messenger:
        buses:
            messenger.bus.default:
                default_middleware: false
                middleware:
                    - {id: 'add_bus_name_stamp_middleware', arguments: ['messenger.bus.default']}
                    - reject_redelivered_message_middleware
                    - dispatch_after_current_bus
                    - failed_message_processing_middleware
                    - send_message
                    - handle_message
                    - Vrok\MessengerReply\ReplyMiddleware

Configure an input transport where you send messages from the external applications, which are consumed by your worker(s). And an output transport where the replies are sent, optimally with one queue for each application sending requests, so the only consume the replies meant for them. We need separate transports as messenger:consume [transportname] consumes all messages in all queues for that transport.

framework:
    messenger:
        transports:
            input:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: pdf-service
                        type: direct
                        default_publish_routing_key: input
                    queues:
                        input:
                            binding_keys: [input]
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

            output:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: pdf-service
                        type: direct
                        default_publish_routing_key: output
                    queues:
                        output:
                            binding_keys: [output]
                        output_1:
                            binding_keys: [output_1]
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

All replies should be routed to the output transport:

framework:
    messenger:
        routing:
            # Route your messages to the transports
            '*': output

Usage

Dispatch the request message with the attached ReplyStamp so the receiver knows where to send the replies:

    use MyNamespace\GeneratePdfMessage;
    use Vrok\MessengerReply\ReplyToStamp;

    $e = new Envelope(new GeneratePdfMessage('LaTeX content'));
    $this->bus->dispatch($e
        ->with(new ReplyToStamp('output'))
    );

Implement a MessageHandler that handles the requests and returns the reply Message object:

use MyNamespace\GeneratePdfMessage
use MyNamespace\PdfResultMessage

class GeneratePdfMessageHandler implements
    MessageHandlerInterface
{
    public function __invoke(GeneratePdfMessage $message): PdfResultMessage
    {
        $LaTeX = $message->getLatex();
        $pdfContent = "<fakepdf>$LaTeX</fakePdf>";
        $reply = new PdfResultMessage($pdfContent);
        return $reply;
    }
}

Consume requests (only on the input queue!) on the receiver side:

./bin/console messenger:consume input

Consume replies (only on the output queue!) on the requesting side:

./bin/console messenger:consume pdf-results

If you need to know on the requesting side which task to resume etc. with the received reply, you can implement the Vrok\MessengerReply\TaskIdentifierMessageInterface (and use the Vrok\MessengerReply\TaskIdentifierMessageTrait) on your request and reply message classes to automatically transfer the task and/or identifier properties given on the request to the reply. We cannot use stamps for this as stamps are not accessible by MessageHandlers.