duyler / event-bus
Duyler action bus
dev-main
2024-05-09 04:44 UTC
Requires
- php: ^8.3
- duyler/dependency-injection: dev-main
- psr/event-dispatcher: ^1.0.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.30
- phpunit/phpunit: ^10.0
- squizlabs/php_codesniffer: ^4.0
- vimeo/psalm: ^5.0
This package is auto-updated.
Last update: 2024-05-09 04:45:10 UTC
README
Action Bus
The action bus implements cooperative multitasking between the actions performed within it. Each action is performed within a separate thread (Fiber) in an isolated DI container. You can control execution using state handlers, triggers, and subscriptions to events generated by actions.
Full documentation see duyler.com/en/docs/action-bus
Example amqp worker
Create state handler for connect to amqp queue
<?php declare(strict_types=1); use AMQPChannel; use AMQPConnection; use AMQPQueue; use Duyler\ActionBus\Contract\State\MainBeginStateHandlerInterface; use Duyler\ActionBus\State\Service\StateMainBeginService; use Duyler\ActionBus\State\StateContext; use AccountEventQueueConfig; use Override; class ConnectToQueueStateHandler implements MainBeginStateHandlerInterface { public function __construct( private AccountEventQueueConfig $queueConfig, ) {} #[Override] public function handle(StateMainBeginService $stateService, StateContext $context): void { $connection = new AMQPConnection(); $connection->setHost($this->queueConfig->host); $connection->setPort($this->queueConfig->port); $connection->setLogin($this->queueConfig->login); $connection->setPassword($this->queueConfig->password); $connection->connect(); $channel = new AMQPChannel($connection); $queue = new AMQPQueue($channel); $queue->setName($this->queueConfig->queueName); $queue->declareQueue(); $context->write('queue', $queue); } }
Create state handler for listening queue
<?php declare(strict_types=1); use AMQPEnvelope; use AMQPQueue; use Duyler\ActionBus\Contract\State\MainCyclicStateHandlerInterface; use Duyler\ActionBus\Dto\Action; use Duyler\ActionBus\Dto\Trigger; use Duyler\ActionBus\Enum\ResultStatus; use Duyler\ActionBus\State\Service\StateMainCyclicService; use Duyler\ActionBus\State\StateContext; use Override; class ListeningQueueStateHandler implements MainCyclicStateHandlerInterface { #[Override] public function handle(StateMainCyclicService $stateService, StateContext $context): void { /** @var AMQPQueue $queue */ $queue = $context->read('queue'); $message = $queue->get(); if ($message === null) { return; } $content = json_decode($message->getBody(), true); $actionId = 'account_id_' . $content['account_id']; if ($stateService->actionIsExists($actionId) === false) { $stateService->addAction( new Action( id: $actionId, handler: HandleAccountEventAction::class, triggeredOn: $actionId, argument: AMQPEnvelope::class, contract: AMQPEnvelope::class, repeatable: true, ) ); } $stateService->doTrigger( new Trigger( id: $actionId, data: $message, contract: AMQPEnvelope::class, ) ); } }
Create state handler for send ack into queue
<?php declare(strict_types=1); use AMQPEnvelope; use AMQPQueue; use Duyler\ActionBus\Contract\State\MainAfterStateHandlerInterface; use Duyler\ActionBus\Enum\ResultStatus; use Duyler\ActionBus\State\Service\StateMainAfterService; use Duyler\ActionBus\State\StateContext; use Override; class AckMessageStateHandler implements MainAfterStateHandlerInterface { #[Override] public function handle(StateMainAfterService $stateService, StateContext $context): void { if ($stateService->getStatus() === ResultStatus::Success) { /** @var AMQPEnvelope $message */ $message = $stateService->getResultData(); /** @var AMQPQueue $queue */ $queue = $context->read('queue'); $queue->ack($message->getDeliveryTag()); } } #[Override] public function observed(StateContext $context): array { return []; } }
Create action handler
<?php declare(strict_types=1); use AMQPEnvelope; use Duyler\ActionBus\Dto\Result; use Duyler\ActionBus\Enum\ResultStatus; use Fiber; class HandleAccountEventAction { public function __invoke(AMQPEnvelope $message): Result { $content = json_decode($message->getBody(), true); echo Fiber::suspend( fn() => 'Account id: ' . $content['account_id'] . '. Event id: ' . $content['event_id'] . PHP_EOL ); return new Result( status: ResultStatus::Success, data: $message, ); } }
Build and run
// run.php <?php declare(strict_types=1); use Duyler\ActionBus\BusBuilder; use Duyler\ActionBus\BusConfig; use Duyler\ActionBus\Enum\Mode; use AccountEventQueueConfig; $busBuilder = new BusBuilder( new BusConfig( mode: Mode::Loop, ) ); $config = new AccountEventQueueConfig( host: 'localhost', port: 5672, logi: 'user', password: 'password', queueName: 'account_events_queue', ); $busBuilder->addStateHandler( new ConnectToQueueStateHandler($config), ); $busBuilder->addStateHandler( new ListeningQueueStateHandler(), ); $busBuilder->addStateHandler( new AckMessageStateHandler(), ); $bus = $busBuilder ->build() ->run();
$ php run.php
Example content receive
<?php use Duyler\ActionBus\BusBuilder; use Duyler\ActionBus\BusConfig; use Duyler\ActionBus\Dto\Action; use Duyler\ActionBus\Dto\Subscription; use Duyler\ActionBus\Enum\ResultStatus; use Psr\Http\Message\ServerRequestInterface; $requestAction = new Action( id: 'Request.GetRequest', handler: GetRequestAction::class, contract: ServerRequestInterface::class, ); $blogAction = new Action( id: 'Blog.GetPostById', handler: GetPostByIdAction::class, required: [ 'Request.GetRequest', ], argument: PostId::class, argumentFactory: fn(ServerRequestInterface $request): PostId => new PostId($request->getAttribute('id')), externalAccess: true, contract: Post::class, ); $blogCommentListAction = new Action( id: 'Blog.GetPostComments', handler: GetCommentsByPostAction::class, required: [ 'Blog.GetPostById', ], argument: Post, externalAccess: true, contract: CommentList::class, ); $blogActionSubscription = new Subscription( subject: 'Request.GetRequest', actionId: 'Blog.GetPostById', status: ResultStatus::Success, ); $blogCommentListActionSubscription = new Subscription( subject: 'Blog.GetPostById', actionId: 'Blog.GetPostComments', status: ResultStatus::Success, ); $busBuilder = new BusBuilder(new BusConfig()); $bus = $busBuilder ->addAction($blogAction) ->addAction($blogCommentListAction) ->addSubscription($blogActionSubscription) ->addSubscription($blogCommentListActionSubscription) ->doAction($requestAction) ->build() ->run(); $blogPost = $bus->getResult('Blog.GetPostById'); $blogPostComments = $bus->getResult('Blog.GetPostComments');