gdx / p-service-bus
PServiceBus
Requires
- php: >=8.2
- ext-json: *
- ext-pcntl: *
- doctrine/instantiator: ^1.4 || ^2.0
- prewk/result: ^3.1.0
- psr/log: ^3.0|^2.0|^1.0
- ramsey/uuid: ^4.5
- symfony/console: *
Requires (Dev)
- alexeyshockov/guzzle-psalm-plugin: ^v1.0.0
- aws/aws-sdk-php: ^3.285.0
- bunny/bunny: ^0.5.0
- doctrine/orm: ^2.10.1
- enqueue/dsn: ^0.10.8
- guzzlehttp/guzzle: ^7.3
- php-standard-library/psalm-plugin: ^2.2.1
- phpunit/phpunit: ^10.2
- react/async: ^v3.0 || ^4.0
- rector/rector: ^1.0.0
- roave/security-advisories: dev-master
- symfony/cache: *
- symfony/expression-language: *
- vimeo/psalm: ^5.12
Suggests
- aws/aws-sdk-php: Allows to use SQS adn/or SNS
- bunny/bunny: Allows to use RabbitMq as transport
- doctrine/orm: If you want to use it with Doctrine.
- enqueue/dsn: Allows to use SQS adn/or SNS
- react/async: Allows to use RabbitMq as transport
- symfony/expression-language: Allows to use custom VisibilityTimeout SQS
- dev-master
- 2.0.1
- 2.0.0
- 1.13.0
- 1.12.1
- 1.12.0
- 1.11.0
- 1.10.1
- 1.10.0
- 1.9.0
- 1.8.0
- 1.7.5
- 1.7.4
- 1.7.3
- 1.7.2
- 1.7.1
- 1.7.0
- 1.6.4
- 1.6.3
- 1.6.2
- 1.6.1
- 1.6.0
- 1.5.1
- 1.5.0
- 1.4.1
- 1.4.0
- 1.3.0
- 1.2.0
- 1.1.1
- 1.1.0
- 1.0.0
- 0.17.0
- 0.16.2
- 0.16.1
- 0.16.0
- 0.15.0
- 0.14.3
- 0.14.2
- 0.14.1
- 0.14.0
- 0.13.3
- 0.13.2
- 0.13.1
- 0.13.0
- 0.12.0
- 0.11.2
- 0.11.1
- 0.11.0
- 0.10.0
- 0.9.0
- 0.8.5
- 0.8.4
- 0.8.3
- 0.8.2
- 0.8.1
- 0.8.0
- 0.7.4
- 0.7.3
- 0.7.2
- 0.7.1
- 0.7.0
- 0.6.5
- 0.6.4
- 0.6.3
- 0.6.2
- 0.6.1
- 0.6.0
- 0.5.6
- 0.5.4
- 0.5.3
- 0.5.2
- 0.5.1
- 0.5.0
- 0.4.16
- 0.4.15
- 0.4.14
- 0.4.13
- 0.4.12
- 0.4.11
- 0.4.10
- 0.4.9
- 0.4.8
- 0.4.7
- 0.4.6
- 0.4.5
- 0.4.4
- 0.4.3
- 0.4.2
- 0.4.1
- 0.4.0
- 0.3.10
- 0.3.9
- 0.3.8
- 0.3.7
- 0.3.6
- 0.3.5
- 0.3.4
- 0.3.2
- 0.3.1
- 0.3.0
- 0.2.0
This package is auto-updated.
Last update: 2024-11-02 06:46:27 UTC
README
Service Bus for PHP inspired by NServiceBus.
You can read the principals of usage and why we need it from their documentation :)
Documentation is bad. Ask in issues if you need help.
Telegram group: https://t.me/PServiceBus
Symfony: https://packagist.org/packages/gdx/p-service-bus-symfony-bundle
Laravel: https://packagist.org/packages/gdx/p-service-bus-laravel-package
Simple concept
Extended concept
Installation
composer require gdx/p-service-bus
Usage
So far no great examples
Please look For examples in https://gitlab.com/GDXbsv/pservicebus/-/tree/master/TestApp
Or to symfony bundle https://packagist.org/packages/gdx/p-service-bus-symfony-bundle
How to start to use it in your project:
- Initialization example https://gitlab.com/GDXbsv/pservicebus/-/blob/master/src/Setup.php
- How I do it in tests https://gitlab.com/GDXbsv/pservicebus/-/blob/master/tests/Integration/IntegrationTestCase.php#L45
Features
- Saga/Aggregate consume command/event produce event.
- Bus allow to send command or publish event.
- CoroutineBus allow to send multiple commands or publish multiple events
- Doctrine integration (Saga persistence, transactional messages(OutBox pattern), onlyOnce control)
- Automatically init all the resources for you
- ServiceBus as main entry point
Init
p-service-bus:init
Send/Publish command/event
\GDXbsv\PServiceBus\Bus\ServiceBus
implements all the Bus interfaces.
If you have a couple of message use Bus
#command
$bus->send(new TestCommand());
#event
$bus->publish(new TestEvent());
If you have many messages use CoroutineBus
#command
$coroutine = $coroutineBus->sendCoroutine();
$coroutine->send(new TestCommand(1), CommandOptions::record());
$coroutine->send(new TestCommand(2), CommandOptions::record());
$coroutine->finish();
#event
$coroutine = $coroutineBus->publishCoroutine();
$coroutine->publish(new TestEvent(1), EventOptions::record());
$coroutine->publish(new TestEvent(2), EventOptions::record());
$coroutine->finish();
Consume
To start consuming run the command
p-service-bus:transport:consume memory
where:
- memory is transport name
Handlers
You can make any method as handler just use PHP Attribute You can set:
- retries amount of retries before message will go in DLQ (does not support by SQS, for sqs configure the queue itself with DSN string)
- timeoutSec the initial delay before the message will be precessed the first time. (max 15 min for SQS)
- retriesTimeoutExpression custom formula to calculate delay for each retry. We pass inside
retries_count
. Syntax see here https://symfony.com/doc/current/reference/formats/expression_language.html
You can set all of these options with MessageOption. Handler will override them. It is not recommended, so not documented.
<?php declare(strict_types=1);
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Bus\Handling\MessageHandleContext;
/**
* @internal
*/
final class Handlers
{
public string $result = '';
#[Handle('memory')]
public function handleCommand(TestMultiHandlersCommand $command, MessageHandleContext $context): void
{
$this->result .= '||' . $command->name;
}
/**
* 5 tries = initial try + 4 retries
*
* Retry no | Delay
* --------- | -------------
* 1 | 0 h 5 min
* 2 | 0 h 25 min
* 3 | 2 h 5 min
* 4 | 10 h 25 min
*
* after all retries -> push to DLQ after 10s
*/
#[Handle(transportName: 'memory', retries: 5, timeoutSec: 100, retriesTimeoutExpression: '(retries_count > 4) ? 10 : (60 * (5 ** retries_count))')]
public function anyNameFunction(Test1Event $event, MessageHandleContext $context): void
{
$this->result .= '||' . $event->name;
}
}
External events
Send outside to subscribed clients (for example from SNS). Or receive from outside where we subscribed (for example to SNS).
<?php
declare(strict_types=1);
use GDXbsv\PServiceBus\Message\ExternalIn;
use GDXbsv\PServiceBus\Message\ExternalOut;
/**
* @internal
* @immutable
* @psalm-immutable
*/
#[ExternalOut(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalOutEvent
{
}
#[ExternalIn(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalInEvent
{
}
Replay
Sometimes something goes wrong and you want to replay certain events. For this use replay annotations.
<?php
declare(strict_types=1);
use GDXbsv\PServiceBus\Message\EventOptions;
use GDXbsv\PServiceBus\Message\Message;
use GDXbsv\PServiceBus\Message\Replay\Replay;
use GDXbsv\PServiceBusTestApp\Handling\Test1Event;
/**
* @internal
* @immutable
* @psalm-immutable
*
* @psalm-import-type ReplayOutput from \GDXbsv\PServiceBus\Message\Replay\Replaying
*/
final class ReplayForEvent
{
/**
* @return ReplayOutput
*/
#[Replay(replayName: 'testReplay')]
public function anyName(): \Traversable {
for ($i=1; $i<=5; ++$i) {
yield new Message(new Test1Event(), EventOptions::record());
}
}
}
Then use command to start replay
p-service-bus:message:replay testReplay "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" memory
where:
- testReplay is replay name from Attribute
- "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" is className + :: + methodName
- memory is transport name
Saga
Inspired: https://docs.particular.net/nservicebus/sagas/
This is a long living process when you have to react on multiple events to make some decision. Or just when you data and message should be transactionally binded together with outbox pattern.
Example with doctrine:
<?php declare(strict_types=1);
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\Saga;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaMapStringCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaInEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaOutputEvent;
/**
* @final
*/
#[ORM\Entity]
final class TestSaga extends Saga
{
#[ORM\Column(type: 'id', nullable: false)]
#[ORM\Id]
private Id $id;
#[ORM\Column(type: 'string', nullable: false)]
private ?string $string;
#[ORM\Column(type: 'string', nullable: true)]
private ?string $value;
/**
* @param Id<static> $id
*/
private function __construct(Id $id, string $string)
{
$this->id = $id;
$this->string = $string;
}
public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
{
$mapper
->toMessage(
// do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
function (TestSagaCreateCommand $command, MessageSagaContext $context) {
return new self(new Id($command->id), $command->string);
}
);
}
public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
{
$mapper
// Find saga by id
->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
->toMessage(
function (TestSagaCommand $command, MessageSagaContext $context) {
return new Id($command->id);
}
)
->toMessage(
function (TestsSagaInEvent $message, MessageSagaContext $context) {
return new Id($message->string);
}
);
$mapper
// Find saga by string propery
->mapSaga(new \ReflectionProperty(TestSaga::class, 'string'))
->toMessage(
function (TestSagaMapStringCommand $command, MessageSagaContext $context) {
return $command->string;
}
);
}
/** We have to tell saga we wait this message, or saga could already exist */
#[Handle('memory', 3)]
public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
{
$this->string = $command->string;
}
/** We can remove saga after all */
#[Handle('memory', 3)]
public function testRemove(TestSagaRemoveCommand $command, SagaContext $context)
{
$this->markAsComplete();
}
#[Handle('memory', 3)]
public function testHandlerFunction(TestSagaCommand $command, SagaContext $context)
{
$this->string = $command->string;
$context->timeout(new TestsSagaOutputEvent('testHandlerFunction'), TimeSpan::fromSeconds(0));
}
#[Handle('memory', 3)]
public function testListeningFunction(TestsSagaInEvent $event, SagaContext $context)
{
$this->string = $event->string;
$this->value = $event->value;
$context->publish(new TestsSagaOutputEvent('testListeningFunction'));
}
#[Handle('memory', 3)]
public function handleTestSagaMapStringCommand(
TestSagaMapStringCommand $command,
SagaContext $context
) {
$context->publish(new TestsSagaOutputEvent($this->id->toString()));
}
}
where:
configureHowToCreateSaga
describe how to create saga (WARNING: Handlers should exist for all creation messages)configureHowToFindSaga
describe how to find saga#[Handle('memory', 3)]
set methods as handlers
You can create custom finders by using #[SagaFind]
attribute, for example:
<?php declare(strict_types=1);
use Doctrine\ORM\EntityManager;
use GDXbsv\PServiceBus\Message\MessageOptions;
use GDXbsv\PServiceBus\Saga\SagaFind;
use GDXbsv\PServiceBusTestApp\Saga\CustomDoctrineSearchEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestSaga;
/**
* @internal
* @immutable
* @psalm-immutable
*/
final class CustomDoctrineSagaFinder
{
public function __construct(
private EntityManager $em
) {
}
#[SagaFind]
public function findByMultipleFields(
CustomDoctrineSearchEvent $event,
MessageOptions $messageOptions
): TestSaga {
$qb = $this->em->createQueryBuilder();
$qb
->from(TestSaga::class, 'saga')
->select('saga')
->where($qb->expr()->eq('saga.string', ':propertyValue'))
->setParameter(':propertyValue', $event->string);
$saga = $qb->getQuery()->getSingleResult();
return $saga;
}
}
Transport
So far implemented only InMemoryTransport and RabbitMq(BunnyTransport) and SQS and SNS. But you can adapt any of yours by implementing 2 interfaces
interface Transport
{
/**
* @return \Generator<int, void, Envelope|null, void>
*/
public function sending(): \Generator;
/**
* @return \Generator<int, Envelope, Result\Ok<null, mixed>|Result\Err<mixed, \Exception>, void>
*/
public function receive(int $limit = 0): \Generator;
public function stop(): void;
}
interface TransportSynchronisation
{
public function sync(): void;
}
Please pay attention that sync()
method for an external bus MUST subscribe you on external message name if you want it to happen automatically.
SQS transport
Examle DSN or config sqs+http://key:secret@aws:4100/123456789012?region=eu-west-1&retries=3&visibilityTimeout=30&waitSeconds=20&waitBetweenLoopsSeconds=40&messagesBatch=10&preload=true&tags[name]=value&tags[name2]=value2&assume=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fxaccounts3access&queue=QueueName"
Options:
- region - AWS region
- retries - retries before DLQ -- during the creation
- visibilityTimeout -- how long consumed messages will be blocked from next attempt
- waitSeconds -- long pooling how long will AWS will wait to collect a batch of messages
- waitBetweenLoopsSeconds -- if no messages, how long will we sleep
- messagesBatch -- how many messages will we consume from AWS with one request
- preload -- asynchronously load the next messages while working on the previous ones
- tags[name] -- add tags to the queue -- during the creation
- assume -- the AWS role which we want to assume
- queue -- the actual queue name
bunny transport
For bunny transport it is different internal and external transports. External use exchanges and pub/sub.
See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Bunny
SQS-SNS transport
For SQS-SNS transports SNS is only for external messages.
See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sqs See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sns