duyler/event-bus

dev-main 2024-05-09 04:44 UTC

This package is auto-updated.

Last update: 2024-05-09 04:45:10 UTC


README

Build status type-coverage psalm-level codecov

Action Bus

The action bus implements cooperative multitasking between the actions performed within it. Each action is performed within a separate thread (Fiber) in an isolated DI container. You can control execution using state handlers, triggers, and subscriptions to events generated by actions.

Full documentation see duyler.com/en/docs/action-bus

Example amqp worker

Create state handler for connect to amqp queue

<?php

declare(strict_types=1);

use AMQPChannel;
use AMQPConnection;
use AMQPQueue;
use Duyler\ActionBus\Contract\State\MainBeginStateHandlerInterface;
use Duyler\ActionBus\State\Service\StateMainBeginService;
use Duyler\ActionBus\State\StateContext;
use AccountEventQueueConfig;
use Override;

class ConnectToQueueStateHandler implements MainBeginStateHandlerInterface
{
    public function __construct(
        private AccountEventQueueConfig $queueConfig,
    ) {}

    #[Override]
    public function handle(StateMainBeginService $stateService, StateContext $context): void
    {
        $connection = new AMQPConnection();
        $connection->setHost($this->queueConfig->host);
        $connection->setPort($this->queueConfig->port);
        $connection->setLogin($this->queueConfig->login);
        $connection->setPassword($this->queueConfig->password);
        $connection->connect();
        
        $channel = new AMQPChannel($connection);

        $queue = new AMQPQueue($channel);
        $queue->setName($this->queueConfig->queueName);
        $queue->declareQueue();

        $context->write('queue', $queue);
    }
}

Create state handler for listening queue

<?php

declare(strict_types=1);

use AMQPEnvelope;
use AMQPQueue;
use Duyler\ActionBus\Contract\State\MainCyclicStateHandlerInterface;
use Duyler\ActionBus\Dto\Action;
use Duyler\ActionBus\Dto\Trigger;
use Duyler\ActionBus\Enum\ResultStatus;
use Duyler\ActionBus\State\Service\StateMainCyclicService;
use Duyler\ActionBus\State\StateContext;
use Override;

class ListeningQueueStateHandler implements MainCyclicStateHandlerInterface
{
    #[Override]
    public function handle(StateMainCyclicService $stateService, StateContext $context): void
    {
        /** @var AMQPQueue $queue */
        $queue = $context->read('queue');

        $message = $queue->get();
        
        if ($message === null) {
            return;
        }

        $content = json_decode($message->getBody(), true);

        $actionId = 'account_id_' . $content['account_id'];

        if ($stateService->actionIsExists($actionId) === false) {
            $stateService->addAction(
                new Action(
                    id: $actionId,
                    handler: HandleAccountEventAction::class,
                    triggeredOn: $actionId,
                    argument: AMQPEnvelope::class,
                    contract: AMQPEnvelope::class,
                    repeatable: true,
                )
            );
        }

        $stateService->doTrigger(
            new Trigger(
                id: $actionId,
                data: $message,
                contract: AMQPEnvelope::class,
            )
        );
    }
}

Create state handler for send ack into queue

<?php

declare(strict_types=1);

use AMQPEnvelope;
use AMQPQueue;
use Duyler\ActionBus\Contract\State\MainAfterStateHandlerInterface;
use Duyler\ActionBus\Enum\ResultStatus;
use Duyler\ActionBus\State\Service\StateMainAfterService;
use Duyler\ActionBus\State\StateContext;
use Override;

class AckMessageStateHandler implements MainAfterStateHandlerInterface
{
    #[Override]
    public function handle(StateMainAfterService $stateService, StateContext $context): void
    {
        if ($stateService->getStatus() === ResultStatus::Success) {
            /** @var AMQPEnvelope $message */
            $message = $stateService->getResultData();
            /** @var AMQPQueue $queue */
            $queue = $context->read('queue');
            $queue->ack($message->getDeliveryTag());
        }
    }

    #[Override]
    public function observed(StateContext $context): array
    {
        return [];
    }
}

Create action handler

<?php

declare(strict_types=1);

use AMQPEnvelope;
use Duyler\ActionBus\Dto\Result;
use Duyler\ActionBus\Enum\ResultStatus;
use Fiber;

class HandleAccountEventAction
{
    public function __invoke(AMQPEnvelope $message): Result
    {
        $content = json_decode($message->getBody(), true);

        echo Fiber::suspend(
            fn() => 'Account id: ' . $content['account_id'] . '. Event id: ' . $content['event_id'] . PHP_EOL
        );
        
        return new Result(
            status: ResultStatus::Success,
            data: $message,
        );
    }
}

Build and run

// run.php

<?php

declare(strict_types=1);

use Duyler\ActionBus\BusBuilder;
use Duyler\ActionBus\BusConfig;
use Duyler\ActionBus\Enum\Mode;
use AccountEventQueueConfig;

$busBuilder = new BusBuilder(
    new BusConfig(
        mode: Mode::Loop,
    )
);

$config = new AccountEventQueueConfig(
        host: 'localhost',
        port: 5672,
        logi: 'user',
        password: 'password',
        queueName: 'account_events_queue',
);

$busBuilder->addStateHandler(
    new ConnectToQueueStateHandler($config),
);

$busBuilder->addStateHandler(
    new ListeningQueueStateHandler(),
);

$busBuilder->addStateHandler(
    new AckMessageStateHandler(),
);

$bus = $busBuilder
    ->build()
    ->run();
    $ php run.php

Example content receive

<?php

use Duyler\ActionBus\BusBuilder;
use Duyler\ActionBus\BusConfig;
use Duyler\ActionBus\Dto\Action;
use Duyler\ActionBus\Dto\Subscription;
use Duyler\ActionBus\Enum\ResultStatus;
use Psr\Http\Message\ServerRequestInterface;

$requestAction = new Action(
    id: 'Request.GetRequest',
    handler: GetRequestAction::class,
    contract: ServerRequestInterface::class,
);

$blogAction = new Action(
    id: 'Blog.GetPostById',
    handler: GetPostByIdAction::class,
    required: [
        'Request.GetRequest',
    ],
    argument: PostId::class,
    argumentFactory: fn(ServerRequestInterface $request): PostId => new PostId($request->getAttribute('id')),
    externalAccess: true,
    contract: Post::class,
);

$blogCommentListAction = new Action(
    id: 'Blog.GetPostComments',
    handler: GetCommentsByPostAction::class,
    required: [
        'Blog.GetPostById',
    ],
    argument: Post,
    externalAccess: true,
    contract: CommentList::class,
);

$blogActionSubscription = new Subscription(
    subject: 'Request.GetRequest',
    actionId: 'Blog.GetPostById',
    status: ResultStatus::Success,
);

$blogCommentListActionSubscription = new Subscription(
    subject: 'Blog.GetPostById',
    actionId: 'Blog.GetPostComments',
    status: ResultStatus::Success,
);

$busBuilder = new BusBuilder(new BusConfig());

$bus = $busBuilder
    ->addAction($blogAction)
    ->addAction($blogCommentListAction)
    ->addSubscription($blogActionSubscription)
    ->addSubscription($blogCommentListActionSubscription)
    ->doAction($requestAction)
    ->build()
    ->run();

$blogPost = $bus->getResult('Blog.GetPostById');
$blogPostComments = $bus->getResult('Blog.GetPostComments');