magdv / workflow-orchestrator
Workflow orchestrator for symfony/messenger
Requires
- php: >=8.3
- ext-curl: *
- ext-json: *
- symfony/messenger: ^6.3
- symfony/uid: ^7.0
Requires (Dev)
- overtrue/phplint: ^9.5
- phpunit/phpunit: ^12
- rector/rector: ^2.0
- squizlabs/php_codesniffer: ^3.11
- symfony/event-dispatcher: ^7.2
- vimeo/psalm: ^6.9
This package is auto-updated.
Last update: 2025-03-19 08:22:58 UTC
README
Внимание, библиотека была вынесена в отдельную библиотеку, но в таком варианте еще не была использована. В составе другого проекта, не как пакет код работает. Пока что можно считать, что это альфа версия. Как только подключу этот пакет в таком виде, то отпишусь. Версия станет сразу стабильной.
В любом случае можете пробовать реализовывать или брать за основу код. Очень помогает в работе.
Идея библиотеки родилась после ознакомления с Temporal и LaravelWorkflow. Но у данных решений есть недостаток - это зависимость от конкретной технологии или фреймворка. А мне требовалось решение, которое можно переносить из проекта в проект.
Данная библиотека позволяет разделить выполнение сложных асинхронных задач, следующих друг за другом и отслеживать текущий статус их выполнения. Задачи - это просто задачи из меседжера симфони, которые обрабатываются АСИНХРОННО. Одна за одной, вы просто выстраиваете их в очередь, если у вас надо обработать несколько задачи последовательно. Результат работы каждой предыдущей задачи можно использовать в работе последующей.
Короткий пример организации вашего кода.
class SimpleWorkflowForTest extends SimpleWorkflow { public function getGeneratorFlow(): \Generator { // Выстраиваем по очереди Ваши задачи. Каждая должна возвращаться через yield $result = yield new NotifyToMattermostMessage1('Это переменная первой Activity', '111', []); yield new NotifyToMattermostMessage1('Результат первой Activity из всего Workflow: ' . $result . ' + еще текст к переменной', '222', []); } } // пример старта WORKFLOW $workflowService = new WorkflowService( $workflowRepository, $workflowBus ); $workflowService->createWorkflow(new SimpleWorkflowForTest('тест')); $workflowService->start();
При работе Вашей задачи можно использовать все преимущества месенджера от симфони. Например исключения для повторения задачи, или для откладывания задачи на 10 минут. Можно просто сразу завершить, если есть проблемы неразрешимые.
RecoverableExceptionInterface UnrecoverableExceptionInterface
Основная идея была заимствована из Temporal. Но за обработку задач отвечает symfony/workflow.т.е. вы легко можете использовать все его возможности. Для работы пакета создаются 2 асинхронные очереди
- Workflow - вычисляет текущий статус обработки или какую следующую задачу выполнят, или пора завершать работу.
- Activity - берет назначенную задачу из очереди и выполняет ее синхронно.
Все асинхронные задачи объединяются в 1 большой Workflow. Workflow по итогу составляется из большого количества Activity.
Workflow находится в работе, пока выполняются его Activity. Каждая задача выполняется Activity выполняется по отдельности и по порядку, как только задача активити выполнена, то выполнение возвращается в Workflow происходит вычисление следующей задачи и далее ставится задача на выполнение следующей активити.
Данный пакет предоставляет 2 обработчика очереди:
- Workflow
- Activity Интерфейсы для работы с хранилищами данных для Workflow и Activity.
Для работы требуется подготовить хранилища Workflow и Activity в соответствии с интерфейсом. Ниже будет пример использования с примером реализованных репозиториев для хранилищ.
Какие классы необходимо реализовать?
- Миграция для хранилищ - надо уметь сохранять сущности в зависимости от вашей Хранилки. Хоть SQL, хоть NoSql.
- MagDv\Orchestrator\Dto\WorkflowEntity
- MagDv\Orchestrator\Dto\ActivityEntity
- Репозиторий для
Workflow
, который реализует интерфейс (Реализация репозитория зависит от вашей системы хранения)- MagDv\Orchestrator\Interface\WorkflowRepositoryInterface
- Репозиторий для
Activity
, который реализует интерфейс (Реализация репозитория зависит от вашей системы хранения)- MagDv\Orchestrator\Interface\ActivityRepositoryInterface
- Настройка
symfony/messenger
в проекте.
- Надо создать 3 очереди для работы оркестратора.
- асинхронная очередь для
workflow
- асинхронная очередь для
activity
- СИНХРОННАЯ очередь для работы ваших задач. Они обрабатываются только в СИХРОННОМ режиме.
Пример реализации. !!!!Местами псевдокод!!!!
<?php declare(strict_types=1); use MagDv\Orchestrator\Enum\WorkflowStatus; use MagDv\Orchestrator\Events\Events\ActivityErrorLogEvent; use MagDv\Orchestrator\Events\Events\DispatchActivityEvent; use MagDv\Orchestrator\Events\Events\DispatchWorkflowEvent; use MagDv\Orchestrator\Events\Events\WorkflowErrorLogEvent; use MagDv\Orchestrator\Events\Handlers\DispatchActivityHandler; use MagDv\Orchestrator\Events\Handlers\DispatchWorkflowHandler; use MagDv\Orchestrator\Messenger\Handler\ActivityMessageHandler; use MagDv\Orchestrator\Messenger\Handler\WorkflowMessageHandler; use MagDv\Orchestrator\Messenger\Message\ActivityMessage; use MagDv\Orchestrator\Messenger\Message\WorkflowMessage; use MagDv\Orchestrator\Service\WorkflowService; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; // события потребуются джля работы оркестратора $eventDispatcher = new EventDispatcher(); // Можете настроить обработчики события (оповещение, например), когда происходит ошибка в обработке Workflow $eventDispatcher->addListener( WorkflowErrorLogEvent::class, static function (WorkflowErrorLogEvent $event) {} ); // Можете настроить обработчики события (оповещение, например), когда происходит ошибка в обработке Activity $eventDispatcher->addListener( ActivityErrorLogEvent::class, static function (ActivityErrorLogEvent $event) {} ); // ВАШИ реализации репозиториев $workflowRepository = new WorkflowRepository(); $activityRepository = new ActivityRepository(); // Настраиваем работу шины workflow $workflowMessageHandler = new WorkflowMessageHandler( workflowRepository:$workflowRepository, eventDispatcher:$eventDispatcher, activityRepository: $activityRepository, ); $workflowBus = new MessageBus( [ new HandleMessageMiddleware( new HandlersLocator( [ WorkflowMessage::class => [$workflowMessageHandler], ] ) ), ] ); // это то, что мы будем обрабатывать синхронно. Ваши задачи!!! $syncBus = new MessageBus( [ new HandleMessageMiddleware( new HandlersLocator( [ NotifyToMattermostMessage1::class => [new NotifyToMattermostMessageHandler1()], ] ) ), ] ); // Настраиваем работу шины activity $activityMessageHandler = new ActivityMessageHandler( activityRepository: $activityRepository, eventDispatcher: $eventDispatcher, messageBus: $syncBus ); $activityBus = new MessageBus( [ new HandleMessageMiddleware( new HandlersLocator( [ ActivityMessage::class => [$activityMessageHandler], ] ) ), ] ); //требуется так же указать обработчики вот этих событий, иначе не будет ничего работать. $eventDispatcher->addListener(DispatchActivityEvent::class, new DispatchActivityHandler($activityBus)); $eventDispatcher->addListener(DispatchWorkflowEvent::class, new DispatchWorkflowHandler($workflowBus));
Готовим Ваш первый WORKFLOW
// создаем сообщение для синхронной ШИНЫ, которое будет обрабатываться через WORKFLOW // ЭТО просто пример, Ваш код будет отличаться class NotifyToMattermostMessage1 { public function __construct( private readonly string $text, private readonly string $description, private readonly array $context ) { } public function getText(): string { return $this->text; } public function getContext(): array { return $this->context; } public function getDescription(): string { return $this->description; } } // Создаем обработчик Вашего сообщения final class NotifyToMattermostMessageHandler1 { public function __invoke(NotifyToMattermostMessage1 $message): mixed { // Тут ваш код, логика какая - то. Результат, который будет возвращаться, можно будет использовать в следующем задании в в этом Workflow return 'Результат Activity: ' . $message->getText(); } } // Пишем ВОРКФЛОУ // Наследуем класс SimpleWorkflow class SimpleWorkflowForTest extends SimpleWorkflow { public function getGeneratorFlow(): \Generator { // Выстраиваем по очереди Ваши задачи. Каждая должна возвращаться через yield $result = yield new NotifyToMattermostMessage1('Это переменная первой Activity', '111', []); yield new NotifyToMattermostMessage1('Результат первой Activity из всего Workflow: ' . $result . ' + еще текст к переменной', '222', []); } } // пример старта WORKFLOW $workflowService = new WorkflowService( $workflowRepository, $workflowBus ); $workflowService->createWorkflow(new SimpleWorkflowForTest('тест')); $workflowService->start();
Пример настройки под Симфони.
#messenger.yaml framework: messenger: default_bus: sync_bus failure_transport: failed serializer: default_serializer: messenger.transport.symfony_serializer symfony_serializer: format: json context: { } transports: failed: 'doctrine://default?queue_name=failed' activity: dsn: '%env(ACTIVITY_BUS_DSN)%' options: retry_strategy: max_retries: 10 delay: 10000 multiplier: 2 workflow: dsn: '%env(WORKFLOW_BUS_DSN)%' options: retry_strategy: max_retries: 10 delay: 10000 multiplier: 2 sync: 'sync://' buses: sync_bus: ~ activity_bus: middleware: - doctrine_ping_connection - doctrine_close_connection workflow_bus: middleware: - doctrine_ping_connection - doctrine_close_connection routing: MagDv\Orchestrator\Messenger\Message\WorkflowMessage: workflow MagDv\Orchestrator\Messenger\Message\ActivityMessage: activity App\Message\NotifyToMattermostMessage1: sync