sbooker / command-bus
Command bus
Installs: 6 148
Dependents: 1
Suggesters: 0
Security: 0
Stars: 1
Watchers: 1
Forks: 1
Open Issues: 0
Requires
- php: ^7.4 || ^8.0
- ramsey/uuid: ^4.0
- sbooker/transaction-manager: ^2.2
- sbooker/workflow: ^1.1
- symfony/property-access: ^5.0 || ^6.0
- symfony/serializer: ^5.0 || ^6.0
Requires (Dev)
- doctrine/cache: ^1.12
- doctrine/dbal: ^3.8 || ^4.0
- doctrine/orm: ^2.9 || ^3.0
- phpunit/phpunit: ^9.0
- psr/container: 1.0.0
- ramsey/uuid-doctrine: ^1.6
- sbooker/console: ^1.1
- sbooker/doctrine-transaction-handler: ^2.2
- sbooker/enumerable-doctrine: ^1.1
Suggests
- doctrine/dbal: ^3.8
- doctrine/orm: If you want use DB persistence with Doctrine
- psr/container: If you want use container registry implementation
- ramsey/uuid-doctrine
- sbooker/console: If you want use debug commands
- sbooker/doctrine-enumerable-type
- sbooker/doctrine-transaction-handler
- sbooker/event-loop-worker: If you want handle command in background process
README
Asynchronous Command Bus (sbooker/command-bus
)
Надежная, транзакционная, персистентная шина для асинхронного выполнения команд с поддержкой повторных попыток и отслеживанием состояния.
Назначение библиотеки
Эта библиотека решает две ключевые задачи в современных приложениях:
1. Надежное асинхронное выполнение задач. Библиотека позволяет атомарно сохранить команду на выполнение долгой или ненадежной операции (отправка email, запрос к API) в базу данных. Отдельный фоновый процесс (воркер) гарантированно выполнит эту команду позже.
2. Создание фундамента для отказоустойчивых Process Managers (Sagas). В event driven системах обработка доменных событий может быть хрупкой. Например, если подписчик на событие OrderPaid пытается выполнить операцию Warehouse::reserve() и получает ошибку, вся очередь обработки событий может быть заблокирована. Выход — сделать обработку событий простой и безошибочной, преобразуя событие или последовательность событий в команду(ы). Обычно для этого используют паттерн Process Manager (Saga). Ответственность за преобразование события в команду лежит на вашем коде (в подписчике на события). Сама библиотека не реализует паттерн Process Manager, но существенно облегчает его реализацию, предоставляя надежный, транзакционный механизм для асинхронного выполнения этой команды, изолируя сбои и не блокируя обработку других событий.
Ключевые особенности
- Основа для Process Manager / Saga: Предоставляет надежный механизм выполнения команд, который является ключевой частью реализации Process Manager.
- Транзакционная надежность: Прием команды происходит атомарно благодаря интеграции с sbooker/transaction-manager.
- Декларативные стратегии ретраев: Логика повторных попыток не находится в обработчике. Она декларативно настраивается для каждой команды через
TimeoutCalculator
. Сама персистентная сущностьCommand
отслеживает количество попыток и время следующего запуска. Воркер лишь пытается выполнить готовые к запуску команды. - Гарантированное выполнение (At-Least-Once): Команда будет выполняться до тех пор, пока не завершится успешно или не исчерпает все попытки.
- Отслеживание состояния: В любой момент можно узнать статус команды (
created
,pending
,success
,fail
). - Хуки на результат: Определяйте колбэки (
Invoker
) на успешное или неуспешное завершение.
Установка
composer require sbooker/command-bus
Вам также понадобятся зависимости из экосистемы и адаптеры:
composer require sbooker/transaction-manager sbooker/workflow sbooker/event-loop-worker
composer require sbooker/doctrine-transaction-handler # Если вы используете Doctrine
Быстрый старт
Сценарий 1: Асинхронное выполнение задачи
Задача: Отправить приветственный email, не заставляя пользователя ждать.
1. Команда и ее Endpoint:
// src/Mailing/Command/SendEmail.php final class SendEmail { /* ... DTO с to, subject, body ... */ } // src/Mailing/Endpoint/SendEmailEndpoint.php use Sbooker\CommandBus\Endpoint; final class SendEmailEndpoint implements Endpoint { private Mailer $mailer; public function process(UuidInterface $id, object $payload): void { /** @var SendEmail $payload */ $this->mailer->send(/* ... */); } }
2. Отправка команды:
// src/UseCase/RegisterUser/Handler.php /** @var Sbooker\CommandBus\CommandBus $commandBus */ $command = new SendEmail('welcome@user.com', 'Welcome!', '...'); $commandBus->accept(Uuid::uuid4(), $command); // Быстро и надежно
Сценарий 2 (Продвинутый): Построение Process Manager
Задача: После оплаты заказа (OrderPaid
event) надежно зарезервировать товары на складе.
1. Команда и ее Endpoint:
// src/Warehouse/Command/ReserveItems.php final class ReserveItems { /* ... DTO с orderId, items ... */ } // src/Warehouse/Endpoint/ReserveItemsEndpoint.php use Sbooker\CommandBus\Endpoint; final class ReserveItemsEndpoint implements Endpoint { private WarehouseService $warehouseService; public function process(UuidInterface $id, object $payload): void { /** @var ReserveItems $payload */ $this->warehouseService->reserve(/* ... */); // Может выбросить исключение } }
2. Подписчик на событие (Process Manager):
// src/Order/Subscriber/OrderPaidSubscriber.php use Sbooker\DomainEvents\DomainEventSubscriber; use Sbooker\CommandBus\CommandBus; final class OrderPaidSubscriber implements DomainEventSubscriber { private CommandBus $commandBus; public function handleEvent(DomainEvent $event): void { if (!$event instanceof OrderPaid) return; $command = new ReserveItems($event->getOrderId(), $event->getItems()); $this->commandBus->accept(Uuid::uuid4(), $command); } // ... }
Общие шаги: Конфигурация и запуск воркера
Независимо от сценария, вам нужно настроить Registry
и запустить воркер.
1. Конфигурация Registry:
// bootstrap.php или ваш DI-контейнер use Sbooker\CommandBus\Registry\Containerized\ContainerizedRegistry; use Sbooker\CommandBus\Registry\Containerized\ContainerAdapter; /** @var Psr\Container\ContainerInterface $container */ $endpointsMap = new ContainerAdapter($container, null, [ 'send-email' => SendEmailEndpoint::class, 'reserve-items' => ReserveItemsEndpoint::class, ]); $timeoutsMap = new ContainerAdapter($container, 'default_timeout'); $registry = new ContainerizedRegistry($endpointsMap, $timeoutsMap);
2. Запуск воркера:
// worker.php /** @var Sbooker\CommandBus\Handler $handler */ while (true) { if (!$handler->handleNext()) sleep(5); } // Рекомендуется использовать sbooker/event-loop-worker для более эффективной реализации
3. Настройка Doctrine:
Вам нужно предоставить маппинг для сущности Sbooker\CommandBus\Command
и ее embeddables
, а также зарегистрировать кастомный тип для Status
.
Или использовать поставляемые с библиотекой
4. Так же в комплекте:
- Консольная команда для ручного запуска обработки команды по ее идентификатору ExecuteCommand
- Консольная команда для очистки отработанных команд Clean
License
See LICENSE file.