agluh / outbox-bundle
Implements Outbox pattern for DDD-based Symfony applications
Installs: 16
Dependents: 0
Suggesters: 0
Security: 0
Stars: 8
Watchers: 2
Forks: 0
Open Issues: 0
Type:symfony-bundle
Requires
- php: ^7.4
- doctrine/doctrine-bundle: ^1.0 || ^2.0
- doctrine/orm: ^2.6
- ramsey/uuid-doctrine: ^1.6
- symfony/console: ^4.4 || ^5.0
- symfony/framework-bundle: ^4.4 || ^5.0
- symfony/lock: ^4.4 || ^5.0
- webmozart/assert: ^1.9
Requires (Dev)
- dama/doctrine-test-bundle: ^6.3
- doctrine/doctrine-fixtures-bundle: ^3.3
- matthiasnoback/symfony-dependency-injection-test: ^4.1
- nyholm/symfony-bundle-test: ^1.6
- phpstan/phpstan: ^0.12.40
- roave/security-advisories: dev-master
- symfony/dotenv: ^4.4 || ^5.0
- symfony/phpunit-bridge: ^4.4 || ^5.0
- symplify/easy-coding-standard: ^8.1
This package is auto-updated.
Last update: 2024-11-17 03:36:36 UTC
README
Implements Outbox pattern for DDD-based Symfony applications.
How it works:
- Bundle collects domain events from aggregate being persisted and save them in a separate table within a single database transaction.
- After a successful commit those domain events are enqueued for publication.
- If bundle configured with
auto_publish=true
option, then domain events from outbox table will be processed using a Symfony event listener in the kernel.TERMINATE or console.TERMINATE events. - If bundle configured with
auto_publish=false
option, then you should use CLI interface described below to periodically run worker for processing of stored events.
- If bundle configured with
Important note: events are enqueued for publishing on-by-one in ascending order sorted by expected publication date (witch by default is date of registration domain event in outbox). If for some reason on DomainEventEnqueuedForPublishingEvent you did not marked domain event as published (i.e. publication date not set) then next time outbox will try to enqueue for publishing the same domain event until it succeeded. This ensures time consistency of published domain events.
Note: you can combine auto publishing with CLI-based publication at the same time. Locking mechanism ensure all events will be published in the right order.
Acknowledgments
Inspired by Domain Event Bundle. Worker class mainly based on Worker from symfony/messenger component.
Installation
Requires Symfony 4.4, or Symfony 5.x and PHP 7.4 and above
Make sure Composer is installed globally, as explained in the installation chapter of the Composer documentation.
Applications that use Symfony Flex
Open a command console, enter your project directory and execute:
$ composer require agluh/outbox-bundle
Applications that don't use Symfony Flex
Step 1: Download the Bundle
Open a command console, enter your project directory and execute the following command to download the latest stable version of this bundle:
$ composer require agluh/outbox-bundle
Step 2: Enable the Bundle
Then, enable the bundle by adding it to the list of registered bundles
in the config/bundles.php
file of your project:
// config/bundles.php return [ // ... AGluh\Bundle\OutboxBundle\OutboxBundle::class => ['all' => true], ];
Usage
Outbox bundle integrates into your Symfony application mostly by application events. You should implement listeners for them as shown below.
use Symfony\Component\EventDispatcher\EventSubscriberInterface; use AGluh\Bundle\OutboxBundle\Event\AggregateRootPreparedForPersistenceEvent; use AGluh\Bundle\OutboxBundle\Event\DomainEventPreparedForPersistenceEvent; use AGluh\Bundle\OutboxBundle\Event\DomainEventEnqueuedForPublishingEvent; use AGluh\Bundle\OutboxBundle\Event\DomainEventPublishedEvent; class OutboxIntegrationEventsListener implements EventSubscriberInterface { /** * In this method you should collect domain events from your aggregate root * and than call $event->addDomainEvent() to persist them to outbox. */ public function onAggregateRootPreparedForPersistence (AggregateRootPreparedForPersistenceEvent $event): void { $aggregateRoot = $event->aggregateRoot(); /** * Example: DomainEventPublisher is a base class or an interface * of your aggregate. */ if($aggregateRoot instanceof DomainEventPublisher) { /** * Here DomainEventPublisher::popEvents() is the method * that returns array of domain events for aggregate. */ foreach ($aggregateRoot->popEvents() as $domainEvent) { /** * Basically domain event can be any PHP type that supports serialization. * See Serialization section below in docs. */ $event->addDomainEvent($domainEvent); } } } /** * In this method you can alter date of expected publication for domain event. * By default it will be the date of registration of the event in outbox. */ public function onDomainEventPreparedForPersistence(DomainEventPreparedForPersistenceEvent $event): void { $domainEvent = $event->domainEvent(); /** * Here DomainEvent is an interface or base class for your domain event. */ if($domainEvent instanceof DomainEvent) { /** * In this example we use event occurrence date as date of expected publication. */ $event->changeExpectedPublicationDate($domainEvent->occurrenceDate()); } } /** * This function will be called by outbox bundle for each domain event should be published. */ public function onDomainEventEnqueuedForPublishing(DomainEventEnqueuedForPublishingEvent $event): void { // It makes sense to stop propagation for event $event->stopPropagation(); $domainEvent = $event->domainEvent(); // Do whatever you mention under 'publish event' here. For example, send message to RabbitMQ. // You MUST set publication date here to mark event as published in outbox table. $event->setPublicationDate(new \DateTimeImmutable()); } /** * This function will be called after outbox bundle persists domain event as published. */ public function onDomainEventPublished(DomainEventPublishedEvent $event): void { // Do something if you want } public static function getSubscribedEvents(): array { return [ AggregateRootPreparedForPersistenceEvent::class => 'onAggregateRootPreparedForPersistence', DomainEventPreparedForPersistenceEvent::class => 'onDomainEventPreparedForPersistence', DomainEventEnqueuedForPublishingEvent::class => 'onDomainEventEnqueuedForPublishing', DomainEventPublishedEvent::class => 'onDomainEventPublished' ]; } }
Console commands
Publish domain events
outbox:publish [options] Options: -l, --limit=LIMIT Limit the number of published events -m, --memory-limit=MEMORY-LIMIT The memory limit the worker can consume -t, --time-limit=TIME-LIMIT The time limit in seconds the worker can run -s, --sleep=SLEEP Seconds to sleep before asking for new unpublished events after no unpublished events were found. Applicable only for demonized worker [default: 1] -d, --daemonize Daemonize worker -b, --batch-size=BATCH-SIZE Limit the number of events worker can query at every iteration [default: 20] -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -e, --env=ENV The Environment name. [default: "dev"] --no-debug Switches off debug mode. -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
The php process is not designed to work for a long time. So it has to quit periodically. Or, the command may exit because of error or exception. Something has to bring it back and continue events publication (if not used auto publication). You can use Supervisord for that. It starts processes and keep an eye on them while they are working.
Here an example of supervisord configuration.
[program:outbox_worker] command=/path/to/bin/console outbox:publish --env=prod --no-debug --time-limit=3600 --daemonize process_name=%(program_name)s_%(process_num)02d numprocs=1 # There is no reason to use multiple workers here because of locking nature of events publication process autostart=true autorestart=true startsecs=0 redirect_stderr=true
Note: Pay attention to --time-limit it tells the command to exit after 60 minutes.
Prune published domain events
outbox:prune-published
Stop workers
outbox:stop-workers
Custom serializer
By default, outbox bundle uses serialize/unserialize functions from PHP to convert domain event to a string during persistence. You can use a custom serializer for that purpose. For example, how to use symfony/serializer is shown below.
namespace App\Service; use AGluh\Bundle\OutboxBundle\Serialization\SerializerInterface; use AGluh\Bundle\OutboxBundle\Exception\DomainEventDecodingFailedException; class CustomSerializer implements SerializerInterface { private \Symfony\Component\Serializer\SerializerInterface $serializer; // Constructor skipped public function encode($domainEvent): string { return $this->serializer->serialize($domainEvent, 'json'); } /** * @throws DomainEventDecodingFailedException */ public function decode(string $data) { // In this example we don't convert json back to an object and simply use it further return $data; } }
Then register new serializer as a service.
# config\services.yaml services: agluh_outbox_bundle.serializer: class: App\Service\CustomSerializer
Default configuration
agluh_outbox: table_name: outbox # Name of outbox table for Doctrine mapping auto_publish: false # Publish domain events on kernel.TERMINATE or console.TERMINATE
Contributing
Contributions are welcome. Composer scripts are configured for your convenience:
> composer test # Run test suite (you should set accessible MySQL server with DATABASE_URL env variable)
> composer cs # Run coding standards checks
> composer cs-fix # Fix coding standards violations
> composer static # Run static analysis with Phpstan