marfatech/rabbit-queue-bundle

Provides possibility for working with rabbitMQ queue by consumers system

This package's canonical repository appears to be gone and the package has been frozen as a result.

Installs: 14 805

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 5

Forks: 0

Open Issues: 1

Type:symfony-bundle

v3.4.0-RC1 2023-08-01 09:46 UTC

README

Latest Stable Version Total Downloads

Введение

Бандл предоставляет инструменты по работе с очередями RabbitMQ посредством механизма producer - consumer.

Содержание

  1. Требования
  2. Установка
  3. Конфигурация
  4. Описание компонентов
  5. Доступные команды
  6. Использование
  7. Использование RouterPublisher
  8. Примеры
  9. Лицензия

Требования

Для корректной работы бандла требуется подключить следующие плагины RabbitMQ:

Установка

Шаг 1: Загрузка бандла

В директории проекта, выполните следующую команду для загрузки наиболее подходящей стабильной версии этого бандла:

    composer require marfatech/rabbit-queue-bundle

Эта команда подразумевает что Composer установлен и доступен глобально.

Шаг 2: Подключение бандла

Необходимо включить бандл добавив его в список зарегистрированных бандлов в app/AppKernel.php файл вашего проекта:

<?php
// app/AppKernel.php

class AppKernel extends Kernel
{
    // ...

    public function registerBundles()
    {
        $bundles = [
            // ...

            new MarfaTech\Bundle\RabbitQueueBundle\MarfaTechRabbitQueueBundle(),
        ];

        return $bundles;
    }

    // ...
}

Конфигурация

Чтобы начать использовать бандл, необходимо описать конфигурацию подключения к RabbitMQ.

# app/packages/marfatech_rabbit_queue.yaml
marfatech_rabbit_queue:
    connections:
        default:
            host: 'rabbitmq'              # хост для подключения к rabbitMQ
            port: 5672                    # порт для подключения к rabbitMQ
            username: 'rabbitmq_user'     # логин для подключения к rabbitMQ
            password: 'rabbitmq_password' # пароль для подключения к rabbitMQ
            vhost: 'example_vhost'        # виртуальный хост для подключения (необязательный параметр)
            connection_timeout: 3         # таймаут соединения @deprecated используйте options.connection_timeout
            read_write_timeout: 3         # таймаут на чтение/запись @deprecated используйте options.read_write_timeout
            heartbeat: 0                  # частота heartbeat @deprecated используйте options.heartbeat
    options:                              # опции для попыток подключений ко всем хостам из списка по очереди (необязательный параметр)
        connection_timeout: 3             # таймаут соединения
        read_write_timeout: 3             # таймаут на чтение/запись
        heartbeat: 0                      # частота heartbeat
        lazy_connection: false            # Lazy соединение инициализируется в момент использования
        reconnect_retries: 3              # Количество попыток реконнекта к RabbitMq при потере соединения int или null (по умолчанию 0)
    consumer:
        wait_timeout: 3                   # таймаут ожидания новых сообщений для обработки пачки в секундах (по умолчанию 3)
        idle_timeout: 0                   # таймаут ожидания сообщений в пустой очереди в секундах (по умолчанию 0 - нет таймаута)
        batch_timeout: 0                  # таймаут сборки пачки сообщений в секундах (по умолчанию 0 - нет таймаута)
        default_max_processed_tasks_count: 1000    # максимальное количество задач в обработке (по умолчанию 1000)

При указании options значения ключей конфигурации connection_timeout, read_write_timeout, heartbeat будут взяты из него. В случае если options не указан значения этих ключей конфигурации будет взято из первого значения ключа конфигурации connections.

Попытка подключения к хостам, указанных в ключе конфигурации connection, будет происходить по очереди и вернет первое удачное подключение. Multiple hosts connections

При указании lazy_connection = true соединение будет инициализировано не при инициализации всех классов, а в момент использования.

Параметр reconnect_retries используется для автоматического реконнекта при обрыве соединения с RabbitMq. Может принимать целочисленное значение (максимальное количество попыток реконнекта), либо null для бесконечного переподключения.

Описание компонентов

Producer

Producer - используется для отправки сообщений в очередь.

Для этих целей в бандле реализован RabbitMqProducer, с помощью которого можно отправлять сообщения в очередь с заданными параметрами.

<?php
$data = ['message' => 'example']; # Сообщение
$options = ['key' => 'unique_key', 'delay' => 1000]; # Опции, в зависимости от типа очереди
$routingKey = 'test.routing.key'; # Ключ маршрутизации сообщения, для очередей с типом `ROUTER`
$properties = ['type' => 'test']; # Дополнительные свойства сообщения AMQPMessage

/** @var \MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */
$producer->put('queue_name', $data, $options, $routingKey, $properties);

Queue Pool

Все сообщения по умолчанию передаются в очередь через пул сообщений, который позволяет производить отложенную отправку накопленных сообщений. Это поведение можно изменить индивидуально для каждого сообщения передав опцию use-queue-pool во время вызова функции MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface::put.

<?php

use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface;

$data = ['message' => 'example'];
$options = [QueueOptionEnum::USE_QUEUE_POOL => false];

/** @var RabbitMqProducerInterface $producer */
$producer->put('queue_name', $data, $options);

Publisher

Публикация сообщений в очередь происходит с помощью специальных классов паблишеров. Producer определяет какой паблишер использовать для публикации по типу очереди, с которым связан паблишер.

Соответственно на каждый новый тип очереди требуется свой класс Publisher с кастомной логикой обработки/валидации и публикации сообщений в канал.

Бандл поддерживает следующие типы очередей и обменников:

  • FIFO
  • Delay
  • Deduplicate
  • Deduplicate + Delay
  • Router

Router используется для создания разветвленной топологии как описано тут и тут

При желании добавить собственный тип очереди, необходимо создать класс Publisher наследующий AbstractPublisher или реализующий PublisherInterface.

Пример DelayPublisher:

<?php

declare(strict_types=1);

namespace MarfaTech\Bundle\RabbitQueueBundle\Publisher;

use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Definition\DefinitionInterface;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Exception\RabbitQueueException;

use function is_int;
use function sprintf;

class DelayPublisher extends AbstractPublisher
{
    public const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DELAY;
    
    /**
    * Custom prepare options logic
    */
    protected function prepareOptions(DefinitionInterface $definition, array $options): array
    {
        $delay = $options[QueueOptionEnum::DELAY] ?? null;

        if (!is_int($delay)) {
            $message = sprintf(
                'Element for queue "%s" must be with option %s. See %s',
                $definition::getQueueName(),
                QueueOptionEnum::DELAY,
                QueueOptionEnum::class
            );

            throw new RabbitQueueException($message);
        }

        $amqpTableOption[QueueHeaderOptionEnum::X_DELAY] = $delay * 1000;

        return $amqpTableOption;
    }

    /**
    * Queue type supported by publisher
    */
    public static function getQueueType(): string
    {
        return (string) self::QUEUE_TYPE;
    }
}

Consumer

Consumer - Используется для получения и обработки сообщений из очереди.

Для реализации логики обработки сообщений необходимо создать класс consumer, реализующий ConsumerInterface, либо наследующий AbstractConsumer, который содержит предустановленные значения для некоторых методов.

<?php

declare(strict_types=1);

namespace Acme\AppBundle\Consumer;

use MarfaTech\Bundle\RabbitQueueBundle\Consumer\AbstractConsumer;

class ExampleConsumer extends AbstractConsumer
{
    public const DEFAULT_BATCH_SIZE = 100; # Размер пачки

    /**
     * {@inheritDoc}
     */
    public function process(array $messageList): void
    {
        foreach ($messageList as $item) {
            $data = $this->decodeMessageBody($item); # Decode message by hydrator

            // handle some task by specific logic
        }
    }

    /**
     * {@inheritDoc}
     */
    public function getBindQueueName(): string
    {
        return 'example';
    }

    /**
     * {@inheritDoc}
     */
    public static function getName(): string
    {
        return 'example';
    }
}

В методе process() необходимо реализовать обработку полученных сообщений. Сообщения поступают пачками, размер которых задается константой DEFAULT_BATCH_SIZE (по умолчанию = 1).

Сумма DEFAULT_BATCH_SIZE со всех потребителей одной очереди не должна превышать значения 65535.

Hydrator

Для удобства работы с сообщениями разных форматов бандл предоставляет инструменты гидрации (кодирование/декодирование сообщений в необходимый формат).

По умолчанию доступны следующие гидраторы:

  • JsonHydrator - для работы с сообщениями в формате json (используется по умолчанию).
  • PlainTextHydrator - для работы с простыми текстовыми сообщениями.

Также существует возможность создания собственного гидратора. Для этого необходимо реализовать HydratorInterface и изменить параметр конфигурации hydrator_name на тип нового гидратора.

Definition

RabbitMQ позволяет создавать сложные схемы очередей, состоящие из несколько взаимосвязанных exchange и queue.

Для удобства работы со схемами бандл предоставляет возможность сохранения схем очередей в специальные классы Definition, которые реализуют DefinitionInterface.

Пример FIFO:

<?php

declare(strict_types=1);

namespace MarfaTech\Bundle\RabbitQueueBundle\Definition;

use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ExampleFifoDefinition implements DefinitionInterface
{
    public const QUEUE_NAME = QueueEnum::EXAMPLE_FIFO;
    public const ENTRY_POINT = self::QUEUE_NAME;

    /**
     * {@inheritDoc}
     */
    public function init(AMQPStreamConnection $connection): void
    {
        $channel = $connection->channel();

        $channel->queue_declare(
            self::ENTRY_POINT,
            false,
            true,
            false,
            false
        );
    }

    /**
     *
     * {@inheritDoc}
     */
    public function getEntryPointName(): string
    {
        return self::ENTRY_POINT;
    }

    /**
     * {@inheritDoc}
     */
    public function getQueueType(): int
    {
        return QueueTypeEnum::FIFO;
    }

    /**
     * {@inheritDoc}
     */
    public static function getQueueName(): string
    {
        return self::QUEUE_NAME;
    }
}

Пример delay + deduplicate:

<?php

declare(strict_types=1);

namespace MarfaTech\Bundle\RabbitQueueBundle\Definition;

use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class ExampleDeduplicateDelayDefinition implements DefinitionInterface
{
    public const QUEUE_NAME = QueueEnum::EXAMPLE_DEDUPLICATE_DELAY;
    public const ENTRY_POINT = self::QUEUE_NAME . '@exchange_deduplication';

    private const SECOND_POINT = self::QUEUE_NAME . '@exchange_delay';
    private const THIRD_POINT = self::QUEUE_NAME;

    /**
     * {@inheritDoc}
     */
    public function init(AMQPStreamConnection $connection): void
    {
        $channel = $connection->channel();

        $channel->exchange_declare(
            self::ENTRY_POINT,
            'x-message-deduplication',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(['x-cache-size' => 1_000_000_000])
        );

        $channel->exchange_declare(
            self::SECOND_POINT,
            'x-delayed-message',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT])
        );

        $channel->queue_declare(
            self::THIRD_POINT,
            false,
            true,
            false,
            false
        );

        $channel->exchange_bind(self::SECOND_POINT, self::ENTRY_POINT);
        $channel->queue_bind(self::THIRD_POINT, self::SECOND_POINT);
    }

    /**
     * {@inheritDoc}
     */
    public function getEntryPointName(): string
    {
        return self::ENTRY_POINT;
    }

    /**
     * {@inheritDoc}
     */
    public function getQueueType(): int
    {
        return QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE | QueueTypeEnum::DELAY;
    }

    /**
     * {@inheritDoc}
     */
    public static function getQueueName(): string
    {
        return self::QUEUE_NAME;
    }
}

В методе init() объявляется структура очереди состоящая из необходимых exchanges, queue и bindings с помощью стандартных методов php-amqplib.

Метод getEntryPointName() - отвечает за точку входа сообщений. Точкой входа может быть название exchange или queue в зависимости от структуры схемы.

Метод getQueueName() - название очереди, куда в конечном итоге попадут сообщения.

Жизненный цикл сообщения:

Сообщение -> Producer -> EntryPoint -> Структура очереди exchanges, bindings -> Queue -> Consumer

Таким образом producer отправляет сообщения на точку входа, а consumer забирает сообщения из очереди.

В простейшем случае при использовании обычной очереди FIFO, точкой входа будет являться название очереди.

Доступные команды

  1. rabbit:consumer:run - запускает выбранный консьюмер.
php bin/console rabbit:consumer:run <name> # <name> - название консьюмера.
  1. rabbit:definition:update - загружает все схемы очередей RabbitMQ в соответствии с существующими классами Definition.

Примечание: Данная команда не обновляет существующие схемы.

php bin/console rabbit:definition:update
  1. rabbit:consumer:list - выводит список консьюмеров, зарегистрированных в проекте.
php bin/console rabbit:consumer:list

Пример вывода команды:

 Total consumers count: 2
+--------------------+------------+
| Queue Name         | Batch Size |
+--------------------+------------+
| example_first      | 1          |
| example_second     | 100        |
+--------------------+------------+

Использование

Шаг 1: Создание схемы очереди (Definition)

Для инициализации схемы, требуется создать класс Definition, который реализует DefinitionInterface. В методе init нужно объявить структуру очереди состоящию из необходимых exchanges, queue и bindings с помощью стандартных методов работы с каналом php-amqplib.

Пример создания Definition

Шаг 2: Создание consumer'а

Далее необходимо создать класс-consumer, наследующий AbstractConsumer. А в методе process реализовать обработку полученных сообщений.

Пример создания Consumer

Если в проекте не работает механизм autowire, то вам понадобится зарегистрировать consumer с тегом marfatech_rabbit_queue.consumer:

services:
    app.acme.consumer:
        class:      Acme\AppBundle\Consumer\ExampleConsumer
        tags:
            - { name: marfatech_rabbit_queue.consumer }

Шаг 3: Загрузка схем очередей RabbitMQ

Чтобы загрузить схемы definition в RabbitMQ необходимо выполнить команду rabbit:definition:update. Данная команда обновит схему в соответствии с существующими классами Definition, реализующими DefinitionInterface.

php bin/console rabbit:definition:update

Шаг 4: Запуск consumer'а

Чтобы запустить consumer необходимо выполнить команду rabbit:consumer:run rabbit. Для запуска нужно передать имя конкретного consumer.

Запуск ранее описанного consumer'а будет выглядеть так:

php bin/console rabbit:consumer:run example

Для просмотра списка всех зарегистрированных consumer'ов достаточно выполнить команду rabbit:consumer:list.

Использование RouterPublisher

RouterPublisher следует использовать в случаях, когда нужно множество очередей, а каждое сообщение должно попадать сразу в некоторое их подмножество, определяемое по routingKey сообщения. Для таких целей нужно создать Definition, в котором будет определена только exchange типа direct, topic или fanout. Эта Definition будет использоваться в качестве точки входя для сообщений. После этого нужно создать по одной Definition на каждую очередь, и все их биндить на первую Definition. Можно создать сложную маршрутизацию, если вместо очередей создавать и биндить Definition типа первой.

Пример Definition с exchange:

<?php

declare(strict_types=1);

namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;

use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ExampleTopicExchangeDefinition implements DefinitionInterface
{
    public const QUEUE_NAME = QueueEnum::EXAMPLE_TOPIC_EXCHENGE;
    public const ENTRY_POINT = self::QUEUE_NAME;

    /**
     * {@inheritDoc}
     */
    public function init(AMQPStreamConnection $connection): void
    {
        $channel = $connection->channel();

        $channel->exchange_declare(
            self::QUEUE_NAME,
            'topic',
            false,
            true,
        );
    }

    /**
     * {@inheritDoc}
     */
    public function getEntryPointName(): string
    {
        return self::ENTRY_POINT;
    }

    /**
     * {@inheritDoc}
     */
    public function getQueueType(): int
    {
        return QueueTypeEnum::ROUTER;
    }

    /**
     * {@inheritDoc}
     */
    public static function getQueueName(): string
    {
        return self::QUEUE_NAME;
    }
}

Пример Definition для очереди

<?php

declare(strict_types=1);

namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;

use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ExampleRoutedQueryDefinition implements DefinitionInterface
{
    public const QUEUE_NAME = QueueEnum::EXAMPLE_ROUTED_FIFO;
    public const ENTRY_POINT = QueueEnum::EXAMPLE_TOPIC_EXCHENGE; // это QUEUE_NAME из примера выше
    public const ROUTING = [
        '*.orange.*',
        'big.#',
        '*.black.car'
    ];

    /**
     * {@inheritDoc}
     */
    public function init(AMQPStreamConnection $connection): void
    {
        $channel = $connection->channel();

        $channel->queue_declare(
            self::QUEUE_NAME,
            false,
            true,
            false,
            false
        );
        
        foreach (self::ROUTING as $route) {
            $channel->queue_bind(self::QUEUE_NAME, self::ENTRY_POINT, $route); // биндим на exchange из первой Definition
        }
    }

    /**
     * {@inheritDoc}
     */
    public function getEntryPointName(): string
    {
        return self::ENTRY_POINT;
    }

    /**
     * {@inheritDoc}
     */
    public function getQueueType(): int
    {
        return QueueTypeEnum::FIFO;
    }

    /**
     * {@inheritDoc}
     */
    public static function getQueueName(): string
    {
        return self::QUEUE_NAME;
    }
}

После определения биржи и очередей отправка сообщений будет выглядеть как и раньше, но сообщения будут попадать в очереди только при подходящем routingKey (четвертый параметр в методе put()).

<?php
$data = ['message' => 'example']; # Сообщение
$options = [];

/** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */
$producer->put('queue_name', $data, $options, 'small.orange.bicycle'); // попадет в очередь по роуту '*.orange.*'
$producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // попадет в очередь по роуту 'big.#'
$producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера

Важно!!! Длина routeKey не должна превышать 255 символов

Примеры

Использование RewindPartialException

Чтобы перемотать сообщения в конец очереди, нужно выбросить исключение RewindPartialException. Первый аргумент принимает массив идентификаторов (тегов) сообщений. Второй аргумент - массив, где ключ - тег сообщения, значение - контекст сообщения. С помощью контекста можно управлять логикой обработки сообщения. Получить контекст:

$headers = $message->get('application_headers');
$context = $headers->getNativeData()[QueueHeaderOptionEnum::X_CONTEXT];

Лицензия

license