mkoprek/rabbitmq-dlq-bundle

1.0.0 2021-08-18 14:03 UTC

This package is auto-updated.

Last update: 2024-04-19 21:55:58 UTC


README

Build Status codecov

Configuration

This bundle is extension to RabbitMqBundle, it will automatically add DLQ queues to existing multiple_consumer in old_sound_rabbit_mq.yaml

    multiple_consumers:
        default:
            connection: default
            exchange_options:
                name: 'exchange'
                type: 'topic'
            graceful_max_execution:
                timeout: 60
            queues:
                legacy.investments.investment_added.event:
                    name: 'legacy.investments.investment_added.event'
                    routing_key: 'legacy.investments.investment_added.event'
                    callback: Namespace\InvestmentAddedLegacyConsumer
                legacy.investments.investment_edited.event:
                    name: 'legacy.investments.investment_edited.event'
                    routing_key: 'legacy.investments.investment_edited.event'
                    callback: Namespace\InvestmentEditedLegacyConsumer

After that configuration you will have 2 additional DLQ queues with routing keys:

  • legacy.investments.investment_added.retry
  • legacy.investments.investment_edited.retry

Each *.retry queue will re-route all messages back to original queue after 30s delay.

To put message to *.retry queue you just need to throw any Exception when parsing message.

Consuming

You are creating consumer like in example above - by adding callback. This callback MUST extends AbstractMessageConsumer.

That is it! If everything is OK, just leave it.
If there was any problem, then throw Exception.

Producing

Just inject MessageProducerInterface to your service where you need to produce message. Then create class with extends AbstractMessage or implements MessageInterface.

Message

<?php
declare(strict_types=1);

use MKoprek\RabbitmqDlqBundle\Message\AbstractMessage;

class Message extends AbstractMessage
{
    public const ROUTING_KEY = 'legacy.investments.investment_added.event';

    public function __construct(array $array)
    {
        $this->payload = [
            'id' => '7186971d-1b63-46ba-9804-012e8477d370',
            'name' => 'Lorem Ipsum',
            'array' => $array,
        ];
    }
}

Producer

<?php
declare(strict_types=1);

use MKoprek\RabbitmqDlqBundle\Producer\MessageProducerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class ProduceMessage
{
    public function __construct(private MessageProducerInterface $producer)
    {
    }

    protected function produce(InputInterface $input, OutputInterface $output): void
    {
        $this->producer->produce(
            new Message(['some_key' => 'some_val'])
        );
    }
}