phphd / pipeline-bundle
Chain of Responsibility on top of Symfony Messenger
Installs: 763
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 2
Type:symfony-bundle
Requires
- php: >=8.0.2
- symfony/dependency-injection: ^6.0 | ^7.0
- symfony/http-kernel: ^6.0 | ^7.0
- symfony/messenger: ^6.3.5 | ^7.0
Requires (Dev)
- nyholm/symfony-bundle-test: ^3.0
- phphd/coding-standard: ~0.5.0
- phpstan/phpstan: ^1.10
- phpstan/phpstan-phpunit: ^1.3
- phpunit/phpunit: ^10.1
- psalm/plugin-phpunit: ^0.18.4
- symfony/var-dumper: ^6.0 | ^7.0
- tomasvotruba/type-coverage: ^0.2.1
README
🧰 Provides Symfony Messenger middleware for basic per-bus pipelining. It enables streamlined chaining of the messages created by message handlers. For instance, when handler (hdl1) processes message (msg1), it creates a subsequent message (msg2), triggering the invocation of the next handler (hdl2), which may, in turn, produce yet another new message, and this cycle continues.
Installation 📥
-
Install via composer
composer require phphd/pipeline-bundle
-
Enable the bundle in the
bundles.php
PhPhD\PipelineBundle\PhdPipelineBundle::class => ['all' => true],
Configuration ⚒️
To leverage chain of pipelined handlers for your command/query buses,
you should add phd_pipeline.forward_chain
middleware to the list:
framework: messenger: buses: command.bus: middleware: - doctrine_transaction + - phd_pipeline.forward_chain - validation query.bus: middleware: + - phd_pipeline.forward_chain - validation
Usage 🚀
Consider having this original message that is initially dispatched to the message bus:
final readonly class CreateVacationRequestCommandDto { public function __construct( public int $userId, public int $vacationTypeId, #[Assert\DateTime] public string $startDate, #[Assert\DateTime] public string $endDate, ) { } }
Thy upfront message handler returns a new message that will be used for subsequent redispatch:
#[AsMessageHandler(bus: 'command.bus')] final readonly class ConvertVacationRequestCommandHandler { public function __invoke(CreateVacationRequestCommandDto $dto): CreateVacationRequestCommand { $employee = $this->employeeRepository->find($dto->userId); $vacationType = $this->vacationTypeRepository->find($dto->vacationTypeId); $vacationPeriod = VacationPeriod::fromStringDates($dto->startDate, $dto->endDate); return new CreateVacationRequestCommand($employee, $vacationType, $vacationPeriod); } }
The new created message conveys basically the same business concept, but on the higher level of abstraction than
initially. Thereof, instead of scalar types, it has business objects (e.g. VacationType
entity instead
of $vacationTypeId
scalar). Basically, new class no longer merely represents the DTO. It now embodies the complete
domain object.
You should add #[NextForwarded]
attribute to enable forwarding of this new message to the next handler:
use PhPhD\Pipeline\NextForwarded; #[NextForwarded] final readonly class CreateVacationRequestCommand { public function __construct( public Employee $employee, public VacationType $vacationType, public VacationPeriod $vacationPeriod, ) { } }
Messages lacking
#[NextForwarded]
attribute will not be forwarded. This attribute must be put on each message expected of redispatching.
Finally, one ultimate handler must implement the core business logic. It may or may not return a result to the calling code.
#[AsMessageHandler(bus: 'command.bus')] final readonly class CreateVacationRequestHandler { public function __invoke(CreateVacationRequestCommand $command) { // The core business logic that deals with domain entities rather than primitives... } }
You may chain as many message handlers as needed, even in a recursive manner, by returning an instance of the same class as the original message, provided that it has the forwarding attribute enabled.