somework / cqrs-bundle
CQRS utilities for Symfony Messenger integration.
Requires
- php: ^8.2
- psr/log: ^3.0
- symfony/config: ^7.2
- symfony/console: ^7.2
- symfony/dependency-injection: ^7.2
- symfony/framework-bundle: ^7.2
- symfony/http-kernel: ^7.2
- symfony/messenger: ^7.2
Requires (Dev)
- doctrine/dbal: ^4.0
- doctrine/orm: ^3.6
- friendsofphp/php-cs-fixer: ^3.58
- open-telemetry/api: ^1.8
- phpstan/extension-installer: ^1.4
- phpstan/phpstan: ^2.1
- phpstan/phpstan-deprecation-rules: ^2.0
- phpstan/phpstan-phpunit: ^2.0
- phpstan/phpstan-strict-rules: ^2.0
- phpstan/phpstan-symfony: ^2.0
- phpunit/phpunit: ^11.5
- symfony/lock: ^7.2
- symfony/rate-limiter: ^7.2
Suggests
- doctrine/dbal: Required for DBAL-backed transactional outbox storage (DbalOutboxStorage)
- open-telemetry/api: Required for OpenTelemetry tracing bridge (^1.8)
- symfony/lock: Required for idempotency deduplication bridge (IdempotencyStamp to DeduplicateStamp)
- symfony/rate-limiter: Required for rate limiting message dispatch (RateLimitStampDecider)
- dev-main / 0.4.x-dev
- v0.4.0
- v0.3.0
- v0.2.4
- v0.2.3
- v0.2.2
- v0.2.1
- v0.2.0
- v0.1.2
- v0.1.1
- v0.1.0
- dev-dependabot/github_actions/actions/cache-5
- dev-dependabot/github_actions/actions/upload-artifact-7
- dev-dependabot/github_actions/actions/checkout-6
- dev-dependabot/github_actions/actions/download-artifact-8
- dev-dependabot/github_actions/actions/setup-python-6
- dev-codex/fix-nohandlerformessageexception-error
- dev-codex/replace-resolvemessageclass-and-update-tests
This package is auto-updated.
Last update: 2026-03-23 17:24:22 UTC
README
A Symfony bundle that wires Command, Query, and Event buses on top of Symfony Messenger. It auto-discovers handlers via PHP attributes, provides a configurable stamp pipeline, and ships with testing utilities and production-grade patterns.
Why this bundle?
Symfony Messenger is a powerful transport layer, but it leaves CQRS wiring as an exercise for the developer. This bundle fills the gap:
- Auto-discovery -- Annotate handlers with
#[AsCommandHandler],#[AsQueryHandler], or#[AsEventHandler]and they are registered automatically. No YAML tags, no manual wiring. - Stamp pipeline -- A composable
StampDeciderpipeline attaches retry policies, transport routing, serializer stamps, metadata, and dispatch-after-current-bus stamps per message type or per individual message class. - Type-safe buses -- Three dedicated buses (
CommandBus,QueryBus,EventBus) with distinct semantics: commands support sync/async dispatch, queries always return a result, events are fire-and-forget with zero-to-many handlers. - Testing utilities --
FakeCommandBus,FakeQueryBus,FakeEventBuswithassertDispatched(),assertNotDispatched(), and callback-based property assertions for fast, isolated unit tests.
Architecture
flowchart LR
A[Your Code] --> B[CommandBus / QueryBus / EventBus]
B --> C[DispatchModeDecider]
C --> D[StampsDecider Pipeline]
D --> E1[RetryPolicy]
D --> E2[Transport]
D --> E3[Serializer]
D --> E4[Metadata]
D --> E5[DispatchAfterCurrentBus]
D --> F[Symfony Messenger]
F --> G[Handler]
Loading
How does it compare?
| Capability | Raw Messenger | CQRS Bundle | Ecotone |
|---|---|---|---|
| Handler discovery | Manual YAML tags or #[AsMessageHandler] |
#[AsCommandHandler] / #[AsQueryHandler] / #[AsEventHandler] with auto-discovery |
Attribute-based with conventions |
| Type safety | Single MessageBusInterface |
Separate CommandBus, QueryBus, EventBus with typed dispatch methods |
Separate gateway interfaces |
| Bus abstraction | You build it | Three buses with sync/async routing, DispatchMode enum |
Command/Query/Event buses built-in |
| Retry configuration | Per-transport YAML only | Per-message-class via RetryPolicy interface + resolver hierarchy |
Per-endpoint via attributes |
| Testing support | InMemoryTransport |
FakeBus implementations with assertDispatched() + callback assertions |
Test support module |
| Async routing | routing YAML config |
DispatchMode + #[Asynchronous] attribute + per-message transport mapping |
Async via polled endpoints |
| Stamp pipeline | Manual stamp attachment | Composable StampDecider pipeline with priority ordering |
Interceptors (before/after/around) |
| Event ordering | Not built-in | SequenceAware interface + AggregateSequenceStamp |
Built-in aggregate versioning |
| Transactional outbox | Not built-in | OutboxStorage interface + DBAL implementation |
Built-in with Doctrine |
| Sagas / Process managers | Not built-in | Not built-in | Built-in saga support |
| Event sourcing | Not built-in | Not built-in | Built-in event sourcing |
| OpenTelemetry | Not built-in | Bridge middleware with trace spans | Not built-in |
| Learning curve | Low (part of Symfony) | Low (thin layer over Messenger) | Moderate (own conventions) |
| Dependencies | Symfony only | Symfony Messenger | Ecotone framework |
Choose raw Messenger when your app has simple dispatch needs and you want zero additional dependencies. Choose this bundle when you want structured CQRS buses, per-message configuration, and testing utilities while staying close to Messenger. Choose Ecotone when you need sagas, event sourcing, or a full CQRS/ES framework.
Feature matrix
Core
- CommandBus with sync/async dispatch and result extraction
- QueryBus with single-handler validation and typed results
- EventBus with zero-to-many handlers and fire-and-forget semantics
- Attribute-based handler discovery (
#[AsCommandHandler],#[AsQueryHandler],#[AsEventHandler]) - Handler interfaces optional -- attributes alone are sufficient
Stamp Pipeline
- Composable
StampDecidersystem with priority ordering (@api-- extend it yourself) - Per-message retry policies via
RetryPolicyinterface - Per-message transport routing with
TransportNamesStamporSendMessageToTransportsStamp - Per-message serializer stamps
- Per-message metadata stamps with correlation ID support
DispatchAfterCurrentBusStampcontrol per message
Patterns
- Causation ID propagation across nested dispatches
- Idempotency bridge (
IdempotencyStamptoDeduplicateStamp) - Event ordering with
SequenceAwareandAggregateSequenceStamp - Rate limiting via Symfony Rate Limiter integration
- Transactional outbox with DBAL storage and relay command
Developer Experience
FakeCommandBus,FakeQueryBus,FakeEventBusfor unit testingassertDispatched()/assertNotDispatched()with callback-based property assertionssomework:cqrs:generatescaffold command for messages and handlerssomework:cqrs:listhandler cataloguesomework:cqrs:debug-transportstransport diagnosticssomework:cqrs:health-checkfor monitoring
Observability
- OpenTelemetry bridge middleware (trace spans for dispatch and handling)
- PSR-3 structured logging across buses, deciders, and resolvers
Integration
- Symfony Flex recipe for zero-touch installation
CommandBusInterface,QueryBusInterface,EventBusInterfacefor DI and testing#[Asynchronous]attribute for transport routing without YAML config
Installation
Requirements
- PHP 8.2 or newer.
- Symfony 7.2 or newer.
With Symfony Flex (recommended)
composer require somework/cqrs-bundle
Flex automatically registers the bundle in config/bundles.php and creates a
commented config/packages/somework_cqrs.yaml with all available options.
Without Symfony Flex
Install the bundle via Composer:
composer require somework/cqrs-bundle
Then register it manually in config/bundles.php:
return [ // ... SomeWork\CqrsBundle\SomeWorkCqrsBundle::class => ['all' => true], ];
Create config/packages/somework_cqrs.yaml (see docs/flex-recipe/ for a
template with all available options).
Verify the installation
Run the bundled console tooling to verify the bundle is registered:
bin/console somework:cqrs:list
Flex Recipe: The recipe files are in
docs/flex-recipe/and are pending submission to symfony/recipes-contrib. Until published, manual bundle registration is required.
Quick start
Step 1 -- Define a command message
namespace App\Application\Command; use SomeWork\CqrsBundle\Contract\Command; final class CreateTask implements Command { public function __construct( public readonly string $id, public readonly string $name, ) {} }
Step 2 -- Create the handler
namespace App\Application\Command; use SomeWork\CqrsBundle\Attribute\AsCommandHandler; use SomeWork\CqrsBundle\Contract\CommandHandler; #[AsCommandHandler(command: CreateTask::class)] final class CreateTaskHandler implements CommandHandler { public function __invoke(CreateTask $command): mixed { // Save task to database... return null; } }
Step 3 -- Inject the bus and dispatch
namespace App\Controller; use App\Application\Command\CreateTask; use SomeWork\CqrsBundle\Contract\CommandBusInterface; use Symfony\Component\HttpFoundation\JsonResponse; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\Routing\Attribute\Route; final class TaskController { #[Route('/tasks', methods: ['POST'])] public function create(Request $request, CommandBusInterface $commandBus): JsonResponse { $data = $request->toArray(); $commandBus->dispatch(new CreateTask( id: uuid_create(), name: $data['name'], )); return new JsonResponse(['status' => 'ok'], 201); } }
Documentation
Full documentation is available at somework.github.io/cqrs.
- Getting Started -- progressive tutorial from install to advanced patterns
- Usage Guide -- core patterns, dispatch modes, console commands
- Configuration Reference -- every
somework_cqrsoption explained - Testing Guide -- FakeBus, assertions, integration testing
- Production Guide -- deployment, workers, monitoring
- Troubleshooting -- common issues and solutions
- Upgrade Guide -- migration between versions
- Changelog
Advanced topics
License
MIT. See LICENSE.