marfatech / rabbit-queue-bundle
Provides possibility for working with rabbitMQ queue by consumers system
This package's canonical repository appears to be gone and the package has been frozen as a result.
Installs: 14 813
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 5
Forks: 0
Open Issues: 1
Type:symfony-bundle
Requires
- php: ^8.0
- ext-json: *
- ext-pcntl: *
- php-amqplib/php-amqplib: ^2.12
- symfony/config: ~4.4||~5.4||~6.0
- symfony/console: ~4.4||~5.4||~6.0
- symfony/dependency-injection: ~4.4||~5.4||~6.0
- symfony/http-kernel: ~4.4||~5.4||~6.0
Requires (Dev)
- phpunit/phpunit: ^7.0||^8.0||^9.0
- psr/log: ~1.0
- symfony/framework-bundle: ^5.3||~6.0
- symfony/monolog-bundle: ^3.7
Suggests
- symfony/monolog-bundle: Provides advanced logging mechanism
- symfony/symfony: Allows more advanced functionality with Symfony
This package is auto-updated.
Last update: 2024-02-13 11:13:43 UTC
README
Введение
Бандл предоставляет инструменты по работе с очередями RabbitMQ
посредством механизма producer
- consumer
.
Содержание
- Требования
- Установка
- Конфигурация
- Описание компонентов
- Доступные команды
- Использование
- Использование
RouterPublisher
- Примеры
- Лицензия
Требования
Для корректной работы бандла требуется подключить следующие плагины RabbitMQ:
Установка
Шаг 1: Загрузка бандла
В директории проекта, выполните следующую команду для загрузки наиболее подходящей стабильной версии этого бандла:
composer require marfatech/rabbit-queue-bundle
Эта команда подразумевает что Composer установлен и доступен глобально.
Шаг 2: Подключение бандла
Необходимо включить бандл добавив его в список зарегистрированных бандлов в app/AppKernel.php
файл вашего проекта:
<?php // app/AppKernel.php class AppKernel extends Kernel { // ... public function registerBundles() { $bundles = [ // ... new MarfaTech\Bundle\RabbitQueueBundle\MarfaTechRabbitQueueBundle(), ]; return $bundles; } // ... }
Конфигурация
Чтобы начать использовать бандл, необходимо описать конфигурацию подключения к RabbitMQ
.
# app/packages/marfatech_rabbit_queue.yaml marfatech_rabbit_queue: connections: default: host: 'rabbitmq' # хост для подключения к rabbitMQ port: 5672 # порт для подключения к rabbitMQ username: 'rabbitmq_user' # логин для подключения к rabbitMQ password: 'rabbitmq_password' # пароль для подключения к rabbitMQ vhost: 'example_vhost' # виртуальный хост для подключения (необязательный параметр) connection_timeout: 3 # таймаут соединения @deprecated используйте options.connection_timeout read_write_timeout: 3 # таймаут на чтение/запись @deprecated используйте options.read_write_timeout heartbeat: 0 # частота heartbeat @deprecated используйте options.heartbeat options: # опции для попыток подключений ко всем хостам из списка по очереди (необязательный параметр) connection_timeout: 3 # таймаут соединения read_write_timeout: 3 # таймаут на чтение/запись heartbeat: 0 # частота heartbeat lazy_connection: false # Lazy соединение инициализируется в момент использования reconnect_retries: 3 # Количество попыток реконнекта к RabbitMq при потере соединения int или null (по умолчанию 0) consumer: wait_timeout: 3 # таймаут ожидания новых сообщений для обработки пачки в секундах (по умолчанию 3) idle_timeout: 0 # таймаут ожидания сообщений в пустой очереди в секундах (по умолчанию 0 - нет таймаута) batch_timeout: 0 # таймаут сборки пачки сообщений в секундах (по умолчанию 0 - нет таймаута) default_max_processed_tasks_count: 1000 # максимальное количество задач в обработке (по умолчанию 1000)
При указании options
значения ключей конфигурации connection_timeout
, read_write_timeout
, heartbeat
будут взяты из него.
В случае если options
не указан значения этих ключей конфигурации будет взято из первого значения ключа конфигурации connections
.
Попытка подключения к хостам, указанных в ключе конфигурации connection
, будет происходить по очереди и вернет первое удачное подключение. Multiple hosts connections
При указании lazy_connection
= true
соединение будет инициализировано не при инициализации всех классов, а в момент использования.
Параметр reconnect_retries
используется для автоматического реконнекта при обрыве соединения с RabbitMq.
Может принимать целочисленное значение (максимальное количество попыток реконнекта), либо null
для бесконечного переподключения.
Описание компонентов
Producer
Producer
- используется для отправки сообщений в очередь.
Для этих целей в бандле реализован RabbitMqProducer, с помощью которого можно отправлять сообщения в очередь с заданными параметрами.
<?php $data = ['message' => 'example']; # Сообщение $options = ['key' => 'unique_key', 'delay' => 1000]; # Опции, в зависимости от типа очереди $routingKey = 'test.routing.key'; # Ключ маршрутизации сообщения, для очередей с типом `ROUTER` $properties = ['type' => 'test']; # Дополнительные свойства сообщения AMQPMessage /** @var \MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */ $producer->put('queue_name', $data, $options, $routingKey, $properties);
Queue Pool
Все сообщения по умолчанию передаются в очередь через пул сообщений, который позволяет производить отложенную отправку накопленных сообщений.
Это поведение можно изменить индивидуально для каждого сообщения передав опцию use-queue-pool
во время вызова функции MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface::put
.
<?php use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum; use MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface; $data = ['message' => 'example']; $options = [QueueOptionEnum::USE_QUEUE_POOL => false]; /** @var RabbitMqProducerInterface $producer */ $producer->put('queue_name', $data, $options);
Publisher
Публикация сообщений в очередь происходит с помощью специальных классов паблишеров.
Producer
определяет какой паблишер использовать для публикации по типу очереди, с которым связан паблишер.
Соответственно на каждый новый тип очереди требуется свой класс Publisher
с кастомной логикой обработки/валидации и публикации сообщений в канал.
Бандл поддерживает следующие типы очередей и обменников:
- FIFO
- Delay
- Deduplicate
- Deduplicate + Delay
- Router
Router используется для создания разветвленной топологии как описано тут и тут
При желании добавить собственный тип очереди, необходимо создать класс Publisher
наследующий AbstractPublisher или реализующий PublisherInterface.
Пример DelayPublisher:
<?php declare(strict_types=1); namespace MarfaTech\Bundle\RabbitQueueBundle\Publisher; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum; use MarfaTech\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use MarfaTech\Bundle\RabbitQueueBundle\Exception\RabbitQueueException; use function is_int; use function sprintf; class DelayPublisher extends AbstractPublisher { public const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DELAY; /** * Custom prepare options logic */ protected function prepareOptions(DefinitionInterface $definition, array $options): array { $delay = $options[QueueOptionEnum::DELAY] ?? null; if (!is_int($delay)) { $message = sprintf( 'Element for queue "%s" must be with option %s. See %s', $definition::getQueueName(), QueueOptionEnum::DELAY, QueueOptionEnum::class ); throw new RabbitQueueException($message); } $amqpTableOption[QueueHeaderOptionEnum::X_DELAY] = $delay * 1000; return $amqpTableOption; } /** * Queue type supported by publisher */ public static function getQueueType(): string { return (string) self::QUEUE_TYPE; } }
Consumer
Consumer
- Используется для получения и обработки сообщений из очереди.
Для реализации логики обработки сообщений необходимо создать класс consumer
,
реализующий ConsumerInterface,
либо наследующий AbstractConsumer, который содержит предустановленные значения для некоторых методов.
<?php declare(strict_types=1); namespace Acme\AppBundle\Consumer; use MarfaTech\Bundle\RabbitQueueBundle\Consumer\AbstractConsumer; class ExampleConsumer extends AbstractConsumer { public const DEFAULT_BATCH_SIZE = 100; # Размер пачки /** * {@inheritDoc} */ public function process(array $messageList): void { foreach ($messageList as $item) { $data = $this->decodeMessageBody($item); # Decode message by hydrator // handle some task by specific logic } } /** * {@inheritDoc} */ public function getBindQueueName(): string { return 'example'; } /** * {@inheritDoc} */ public static function getName(): string { return 'example'; } }
В методе process()
необходимо реализовать обработку полученных сообщений.
Сообщения поступают пачками, размер которых задается константой DEFAULT_BATCH_SIZE
(по умолчанию = 1).
Сумма DEFAULT_BATCH_SIZE
со всех потребителей одной очереди не должна превышать значения 65535.
Hydrator
Для удобства работы с сообщениями разных форматов бандл предоставляет инструменты гидрации (кодирование/декодирование сообщений в необходимый формат).
По умолчанию доступны следующие гидраторы:
- JsonHydrator - для работы с сообщениями в формате json (используется по умолчанию).
- PlainTextHydrator - для работы с простыми текстовыми сообщениями.
Также существует возможность создания собственного гидратора.
Для этого необходимо реализовать HydratorInterface и изменить параметр конфигурации hydrator_name
на тип нового гидратора.
Definition
RabbitMQ позволяет создавать сложные схемы очередей, состоящие из несколько взаимосвязанных exchange
и queue
.
Для удобства работы со схемами бандл предоставляет возможность сохранения схем очередей в специальные классы Definition
,
которые реализуют DefinitionInterface.
Пример FIFO:
<?php declare(strict_types=1); namespace MarfaTech\Bundle\RabbitQueueBundle\Definition; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use PhpAmqpLib\Connection\AMQPStreamConnection; class ExampleFifoDefinition implements DefinitionInterface { public const QUEUE_NAME = QueueEnum::EXAMPLE_FIFO; public const ENTRY_POINT = self::QUEUE_NAME; /** * {@inheritDoc} */ public function init(AMQPStreamConnection $connection): void { $channel = $connection->channel(); $channel->queue_declare( self::ENTRY_POINT, false, true, false, false ); } /** * * {@inheritDoc} */ public function getEntryPointName(): string { return self::ENTRY_POINT; } /** * {@inheritDoc} */ public function getQueueType(): int { return QueueTypeEnum::FIFO; } /** * {@inheritDoc} */ public static function getQueueName(): string { return self::QUEUE_NAME; } }
Пример delay + deduplicate:
<?php declare(strict_types=1); namespace MarfaTech\Bundle\RabbitQueueBundle\Definition; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Wire\AMQPTable; class ExampleDeduplicateDelayDefinition implements DefinitionInterface { public const QUEUE_NAME = QueueEnum::EXAMPLE_DEDUPLICATE_DELAY; public const ENTRY_POINT = self::QUEUE_NAME . '@exchange_deduplication'; private const SECOND_POINT = self::QUEUE_NAME . '@exchange_delay'; private const THIRD_POINT = self::QUEUE_NAME; /** * {@inheritDoc} */ public function init(AMQPStreamConnection $connection): void { $channel = $connection->channel(); $channel->exchange_declare( self::ENTRY_POINT, 'x-message-deduplication', false, true, false, false, false, new AMQPTable(['x-cache-size' => 1_000_000_000]) ); $channel->exchange_declare( self::SECOND_POINT, 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT]) ); $channel->queue_declare( self::THIRD_POINT, false, true, false, false ); $channel->exchange_bind(self::SECOND_POINT, self::ENTRY_POINT); $channel->queue_bind(self::THIRD_POINT, self::SECOND_POINT); } /** * {@inheritDoc} */ public function getEntryPointName(): string { return self::ENTRY_POINT; } /** * {@inheritDoc} */ public function getQueueType(): int { return QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE | QueueTypeEnum::DELAY; } /** * {@inheritDoc} */ public static function getQueueName(): string { return self::QUEUE_NAME; } }
В методе init()
объявляется структура очереди состоящая из необходимых exchanges
, queue
и bindings
с помощью стандартных методов php-amqplib.
Метод getEntryPointName()
- отвечает за точку входа сообщений. Точкой входа может быть название exchange
или queue
в зависимости от структуры схемы.
Метод getQueueName()
- название очереди, куда в конечном итоге попадут сообщения.
Жизненный цикл сообщения:
Сообщение -> Producer -> EntryPoint -> Структура очереди exchanges, bindings -> Queue -> Consumer
Таким образом producer
отправляет сообщения на точку входа, а consumer
забирает сообщения из очереди.
В простейшем случае при использовании обычной очереди FIFO, точкой входа будет являться название очереди.
Доступные команды
rabbit:consumer:run
- запускает выбранный консьюмер.
php bin/console rabbit:consumer:run <name> # <name> - название консьюмера.
rabbit:definition:update
- загружает все схемы очередейRabbitMQ
в соответствии с существующими классамиDefinition
.
Примечание: Данная команда не обновляет существующие схемы.
php bin/console rabbit:definition:update
rabbit:consumer:list
- выводит список консьюмеров, зарегистрированных в проекте.
php bin/console rabbit:consumer:list
Пример вывода команды:
Total consumers count: 2
+--------------------+------------+
| Queue Name | Batch Size |
+--------------------+------------+
| example_first | 1 |
| example_second | 100 |
+--------------------+------------+
Использование
Шаг 1: Создание схемы очереди (Definition)
Для инициализации схемы, требуется создать класс Definition,
который реализует DefinitionInterface.
В методе init
нужно объявить структуру очереди состоящию из необходимых exchanges
, queue
и bindings
с помощью стандартных методов работы с каналом php-amqplib.
Шаг 2: Создание consumer'а
Далее необходимо создать класс-consumer
, наследующий AbstractConsumer.
А в методе process
реализовать обработку полученных сообщений.
Если в проекте не работает механизм autowire
, то вам понадобится зарегистрировать consumer
с тегом marfatech_rabbit_queue.consumer
:
services: app.acme.consumer: class: Acme\AppBundle\Consumer\ExampleConsumer tags: - { name: marfatech_rabbit_queue.consumer }
Шаг 3: Загрузка схем очередей RabbitMQ
Чтобы загрузить схемы definition
в RabbitMQ необходимо выполнить команду rabbit:definition:update
.
Данная команда обновит схему в соответствии с существующими классами Definition
, реализующими DefinitionInterface.
php bin/console rabbit:definition:update
Шаг 4: Запуск consumer'а
Чтобы запустить consumer
необходимо выполнить команду rabbit:consumer:run
rabbit.
Для запуска нужно передать имя конкретного consumer
.
Запуск ранее описанного consumer
'а будет выглядеть так:
php bin/console rabbit:consumer:run example
Для просмотра списка всех зарегистрированных consumer
'ов достаточно выполнить команду rabbit:consumer:list
.
Использование RouterPublisher
RouterPublisher
следует использовать в случаях, когда нужно множество очередей, а каждое сообщение должно попадать
сразу в некоторое их подмножество, определяемое по routingKey
сообщения. Для таких целей нужно создать Definition
,
в котором будет определена только exchange
типа direct
, topic
или fanout
. Эта Definition
будет использоваться
в качестве точки входя для сообщений. После этого нужно создать по одной Definition
на каждую очередь, и все их
биндить на первую Definition
. Можно создать сложную маршрутизацию, если вместо очередей создавать и биндить
Definition
типа первой.
Пример Definition
с exchange
:
<?php declare(strict_types=1); namespace Wakeapp\Bundle\RabbitQueueBundle\Definition; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use PhpAmqpLib\Connection\AMQPStreamConnection; class ExampleTopicExchangeDefinition implements DefinitionInterface { public const QUEUE_NAME = QueueEnum::EXAMPLE_TOPIC_EXCHENGE; public const ENTRY_POINT = self::QUEUE_NAME; /** * {@inheritDoc} */ public function init(AMQPStreamConnection $connection): void { $channel = $connection->channel(); $channel->exchange_declare( self::QUEUE_NAME, 'topic', false, true, ); } /** * {@inheritDoc} */ public function getEntryPointName(): string { return self::ENTRY_POINT; } /** * {@inheritDoc} */ public function getQueueType(): int { return QueueTypeEnum::ROUTER; } /** * {@inheritDoc} */ public static function getQueueName(): string { return self::QUEUE_NAME; } }
Пример Definition
для очереди
<?php declare(strict_types=1); namespace Wakeapp\Bundle\RabbitQueueBundle\Definition; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use PhpAmqpLib\Connection\AMQPStreamConnection; class ExampleRoutedQueryDefinition implements DefinitionInterface { public const QUEUE_NAME = QueueEnum::EXAMPLE_ROUTED_FIFO; public const ENTRY_POINT = QueueEnum::EXAMPLE_TOPIC_EXCHENGE; // это QUEUE_NAME из примера выше public const ROUTING = [ '*.orange.*', 'big.#', '*.black.car' ]; /** * {@inheritDoc} */ public function init(AMQPStreamConnection $connection): void { $channel = $connection->channel(); $channel->queue_declare( self::QUEUE_NAME, false, true, false, false ); foreach (self::ROUTING as $route) { $channel->queue_bind(self::QUEUE_NAME, self::ENTRY_POINT, $route); // биндим на exchange из первой Definition } } /** * {@inheritDoc} */ public function getEntryPointName(): string { return self::ENTRY_POINT; } /** * {@inheritDoc} */ public function getQueueType(): int { return QueueTypeEnum::FIFO; } /** * {@inheritDoc} */ public static function getQueueName(): string { return self::QUEUE_NAME; } }
После определения биржи и очередей отправка сообщений будет выглядеть как и раньше, но сообщения будут попадать в очереди только при подходящем routingKey (четвертый параметр в методе put()).
<?php $data = ['message' => 'example']; # Сообщение $options = []; /** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */ $producer->put('queue_name', $data, $options, 'small.orange.bicycle'); // попадет в очередь по роуту '*.orange.*' $producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // попадет в очередь по роуту 'big.#' $producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера
Важно!!! Длина routeKey не должна превышать 255 символов
Примеры
Использование RewindPartialException
Чтобы перемотать сообщения в конец очереди, нужно выбросить исключение RewindPartialException. Первый аргумент принимает массив идентификаторов (тегов) сообщений. Второй аргумент - массив, где ключ - тег сообщения, значение - контекст сообщения. С помощью контекста можно управлять логикой обработки сообщения. Получить контекст:
$headers = $message->get('application_headers'); $context = $headers->getNativeData()[QueueHeaderOptionEnum::X_CONTEXT];