gdx/p-service-bus

PServiceBus

2.0.1 2024-10-02 08:37 UTC

README

Service Bus for PHP inspired by NServiceBus.

You can read the principals of usage and why we need it from their documentation :)

Documentation is bad. Ask in issues if you need help.

Telegram group: https://t.me/PServiceBus

Symfony: https://packagist.org/packages/gdx/p-service-bus-symfony-bundle

Laravel: https://packagist.org/packages/gdx/p-service-bus-laravel-package

Simple concept

PServiceBus Simple

Extended concept

PServiceBus

Installation

composer require gdx/p-service-bus

Usage

So far no great examples

Please look For examples in https://gitlab.com/GDXbsv/pservicebus/-/tree/master/TestApp

Or to symfony bundle https://packagist.org/packages/gdx/p-service-bus-symfony-bundle

How to start to use it in your project:

Features

  • Saga/Aggregate consume command/event produce event.
  • Bus allow to send command or publish event.
  • CoroutineBus allow to send multiple commands or publish multiple events
  • Doctrine integration (Saga persistence, transactional messages(OutBox pattern), onlyOnce control)
  • Automatically init all the resources for you
  • ServiceBus as main entry point

Init

p-service-bus:init

Send/Publish command/event

\GDXbsv\PServiceBus\Bus\ServiceBus implements all the Bus interfaces.

If you have a couple of message use Bus

#command
$bus->send(new TestCommand());
#event
$bus->publish(new TestEvent());

If you have many messages use CoroutineBus

#command
$coroutine = $coroutineBus->sendCoroutine();
$coroutine->send(new TestCommand(1), CommandOptions::record());
$coroutine->send(new TestCommand(2), CommandOptions::record());
$coroutine->finish();
#event
$coroutine = $coroutineBus->publishCoroutine();
$coroutine->publish(new TestEvent(1), EventOptions::record());
$coroutine->publish(new TestEvent(2), EventOptions::record());
$coroutine->finish();

Consume

To start consuming run the command

p-service-bus:transport:consume  memory

where:

  • memory is transport name

Handlers

You can make any method as handler just use PHP Attribute You can set:

  • retries amount of retries before message will go in DLQ (does not support by SQS, for sqs configure the queue itself with DSN string)
  • timeoutSec the initial delay before the message will be precessed the first time. (max 15 min for SQS)
  • retriesTimeoutExpression custom formula to calculate delay for each retry. We pass inside retries_count. Syntax see here https://symfony.com/doc/current/reference/formats/expression_language.html

You can set all of these options with MessageOption. Handler will override them. It is not recommended, so not documented.

<?php declare(strict_types=1);

use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Bus\Handling\MessageHandleContext;

/**
 * @internal
 */
final class Handlers
{
    public string $result = '';

    #[Handle('memory')]
    public function handleCommand(TestMultiHandlersCommand $command, MessageHandleContext $context): void
    {
        $this->result .= '||' . $command->name;
    }

    /**
     * 5 tries = initial try + 4 retries
     *
     * Retry no  | Delay
     * --------- | -------------
     *      1    |  0 h  5 min
     *      2    |  0 h 25 min
     *      3    |  2 h  5 min
     *      4    | 10 h 25 min
     *
     * after all retries -> push to DLQ after 10s
     */
    #[Handle(transportName: 'memory', retries: 5, timeoutSec: 100, retriesTimeoutExpression: '(retries_count > 4) ? 10 : (60 * (5 ** retries_count))')]
    public function anyNameFunction(Test1Event $event, MessageHandleContext $context): void
    {
        $this->result .= '||' . $event->name;
    }
}

External events

Send outside to subscribed clients (for example from SNS). Or receive from outside where we subscribed (for example to SNS).

<?php
declare(strict_types=1);

use GDXbsv\PServiceBus\Message\ExternalIn;
use GDXbsv\PServiceBus\Message\ExternalOut;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 */
#[ExternalOut(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalOutEvent
{
}

#[ExternalIn(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalInEvent
{
}

Replay

Sometimes something goes wrong and you want to replay certain events. For this use replay annotations.

<?php
declare(strict_types=1);

use GDXbsv\PServiceBus\Message\EventOptions;
use GDXbsv\PServiceBus\Message\Message;
use GDXbsv\PServiceBus\Message\Replay\Replay;
use GDXbsv\PServiceBusTestApp\Handling\Test1Event;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 *
 * @psalm-import-type ReplayOutput from \GDXbsv\PServiceBus\Message\Replay\Replaying
 */
final class ReplayForEvent
{
    /**
     * @return ReplayOutput
     */
    #[Replay(replayName: 'testReplay')]
    public function anyName(): \Traversable {
        for ($i=1; $i<=5; ++$i) {
            yield new Message(new Test1Event(), EventOptions::record());
        }
    }
}

Then use command to start replay

p-service-bus:message:replay testReplay "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" memory

where:

  • testReplay is replay name from Attribute
  • "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" is className + :: + methodName
  • memory is transport name

Saga

Inspired: https://docs.particular.net/nservicebus/sagas/

This is a long living process when you have to react on multiple events to make some decision. Or just when you data and message should be transactionally binded together with outbox pattern.

Example with doctrine:

<?php declare(strict_types=1);

use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\Saga;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaMapStringCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaInEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaOutputEvent;


/**
 * @final
 */
 #[ORM\Entity]
final class TestSaga extends Saga
{
    #[ORM\Column(type: 'id', nullable: false)]
    #[ORM\Id]
    private Id $id;
    #[ORM\Column(type: 'string', nullable: false)]
    private ?string $string;
    #[ORM\Column(type: 'string', nullable: true)]
    private ?string $value;

    /**
     * @param Id<static> $id
     */
    private function __construct(Id $id, string $string)
    {
        $this->id = $id;
        $this->string = $string;
    }
    
    public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
    {
        $mapper
            ->toMessage(
                // do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
                function (TestSagaCreateCommand $command, MessageSagaContext $context) {
                    return new self(new Id($command->id), $command->string);
                }
            );
    }

    public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
    {
        $mapper
            // Find saga by id
            ->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
            ->toMessage(
                function (TestSagaCommand $command, MessageSagaContext $context) {
                    return new Id($command->id);
                }
            )
            ->toMessage(
                function (TestsSagaInEvent $message, MessageSagaContext $context) {
                    return new Id($message->string);
                }
            );
        $mapper
            // Find saga by string propery
            ->mapSaga(new \ReflectionProperty(TestSaga::class, 'string'))
            ->toMessage(
                function (TestSagaMapStringCommand $command, MessageSagaContext $context) {
                    return $command->string;
                }
            );
    }
    
    /** We have to tell saga we wait this message, or saga could already exist */
    #[Handle('memory', 3)]
    public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
    {
        $this->string = $command->string;
    }
    
    /** We can remove saga after all */
    #[Handle('memory', 3)]
    public function testRemove(TestSagaRemoveCommand $command, SagaContext $context)
    {
        $this->markAsComplete();
    }

    #[Handle('memory', 3)]
    public function testHandlerFunction(TestSagaCommand $command, SagaContext $context)
    {
        $this->string = $command->string;
        $context->timeout(new TestsSagaOutputEvent('testHandlerFunction'), TimeSpan::fromSeconds(0));
    }

    #[Handle('memory', 3)]
    public function testListeningFunction(TestsSagaInEvent $event, SagaContext $context)
    {
        $this->string = $event->string;
        $this->value = $event->value;
        $context->publish(new TestsSagaOutputEvent('testListeningFunction'));
    }

    #[Handle('memory', 3)]
    public function handleTestSagaMapStringCommand(
        TestSagaMapStringCommand $command,
        SagaContext $context
    ) {
        $context->publish(new TestsSagaOutputEvent($this->id->toString()));
    }
}

where:

  • configureHowToCreateSaga describe how to create saga (WARNING: Handlers should exist for all creation messages)
  • configureHowToFindSaga describe how to find saga
  • #[Handle('memory', 3)] set methods as handlers

You can create custom finders by using #[SagaFind] attribute, for example:

<?php declare(strict_types=1);

use Doctrine\ORM\EntityManager;
use GDXbsv\PServiceBus\Message\MessageOptions;
use GDXbsv\PServiceBus\Saga\SagaFind;
use GDXbsv\PServiceBusTestApp\Saga\CustomDoctrineSearchEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestSaga;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 */
final class CustomDoctrineSagaFinder
{
    public function __construct(
        private EntityManager $em
    ) {
    }

    #[SagaFind]
    public function findByMultipleFields(
        CustomDoctrineSearchEvent $event,
        MessageOptions $messageOptions
    ): TestSaga {
        $qb = $this->em->createQueryBuilder();
        $qb
            ->from(TestSaga::class, 'saga')
            ->select('saga')
            ->where($qb->expr()->eq('saga.string', ':propertyValue'))
            ->setParameter(':propertyValue', $event->string);
        $saga = $qb->getQuery()->getSingleResult();

        return $saga;
    }
}

Transport

So far implemented only InMemoryTransport and RabbitMq(BunnyTransport) and SQS and SNS. But you can adapt any of yours by implementing 2 interfaces

interface Transport
{
    /**
     * @return \Generator<int, void, Envelope|null, void>
     */
    public function sending(): \Generator;

    /**
     * @return \Generator<int, Envelope, Result\Ok<null, mixed>|Result\Err<mixed, \Exception>, void>
     */
    public function receive(int $limit = 0): \Generator;

    public function stop(): void;
}
interface TransportSynchronisation
{
    public function sync(): void;
}

Please pay attention that sync() method for an external bus MUST subscribe you on external message name if you want it to happen automatically.

SQS transport

Examle DSN or config sqs+http://key:secret@aws:4100/123456789012?region=eu-west-1&retries=3&visibilityTimeout=30&waitSeconds=20&waitBetweenLoopsSeconds=40&messagesBatch=10&preload=true&tags[name]=value&tags[name2]=value2&assume=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fxaccounts3access&queue=QueueName"

Options:

  • region - AWS region
  • retries - retries before DLQ -- during the creation
  • visibilityTimeout -- how long consumed messages will be blocked from next attempt
  • waitSeconds -- long pooling how long will AWS will wait to collect a batch of messages
  • waitBetweenLoopsSeconds -- if no messages, how long will we sleep
  • messagesBatch -- how many messages will we consume from AWS with one request
  • preload -- asynchronously load the next messages while working on the previous ones
  • tags[name] -- add tags to the queue -- during the creation
  • assume -- the AWS role which we want to assume
  • queue -- the actual queue name

bunny transport

For bunny transport it is different internal and external transports. External use exchanges and pub/sub.

See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Bunny

SQS-SNS transport

For SQS-SNS transports SNS is only for external messages.

See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sqs See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sns