freyr / message-broker
Reliable Inbox & Outbox Patterns for Symfony Messenger with transactional guarantees and automatic deduplication
Installs: 18
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 1
Forks: 1
Open Issues: 0
Type:symfony-bundle
pkg:composer/freyr/message-broker
Requires
- php: >=8.4
- ext-amqp: *
- ext-curl: *
- doctrine/dbal: ^3.0|^4.0
- doctrine/orm: ^3.0
- freyr/identity: ^0.2 | ^0.3
- nesbot/carbon: ^2.0|^3.0
- php-amqplib/php-amqplib: ^3.7
- symfony/amqp-messenger: ^6.4|^7.0
- symfony/config: ^6.4|^7.0
- symfony/console: ^6.4|^7.0
- symfony/dependency-injection: ^6.4|^7.0
- symfony/doctrine-messenger: ^6.4|^7.0
- symfony/http-kernel: ^6.4|^7.0
- symfony/messenger: ^6.4|^7.0
- symfony/property-access: ^6.4|^7.0
- symfony/serializer: ^6.4|^7.0
Requires (Dev)
- doctrine/doctrine-bundle: ^2.0
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^12.0
- symfony/framework-bundle: ^7.0
- symfony/phpunit-bridge: ^7.0
- symfony/yaml: ^7.0
- symplify/easy-coding-standard: ^13.0
This package is auto-updated.
Last update: 2026-02-02 06:52:36 UTC
README
Inbox & Outbox Patterns for Symfony Messenger
IMPORTANT This library is in a very early stage of development! Not production ready!
A Symfony bundle providing reliable event publishing and consumption with transactional guarantees, automatic deduplication, and seamless AMQP integration.
Features
- ✅ Transactional Outbox - Publish events reliably within your business transactions
- ✅ Automatic Deduplication at the Inbox - Binary UUID v7 primary key prevents duplicate processing
- ✅ Typed Message Handlers - Type-safe event consumption with IDE autocomplete
- ✅ Horizontal Scaling - Multiple workers with database-level SKIP LOCKED
Restrictions
- Zero Configuration - (in progress) Symfony Flex recipe automates installation
- AMQP support only - There is no plan do add Kafka/SQS etc.
Quick Start
Installation
composer require freyr/message-broker
** Flex is not registered yet ** That's it! Symfony Flex automatically:
- ✅ Registers the bundle
- ✅ Creates configuration files
- ✅ Adds database migrations
- ✅ Sets up environment variables
Setup Database
The package uses a 3-table architecture with mixed management:
Auto-Managed by Symfony (no manual setup needed):
messenger_outbox- Dedicated outbox table for publishing eventsmessenger_messages- Standard table for failed messages
Application-Managed (manual setup required):
3. message_broker_deduplication - Deduplication tracking (binary UUID v7 PK)
Setup:
# Run migrations to create deduplication table (messenger tables auto-created)
php bin/console doctrine:migrations:migrate
First-Run Note: The messenger_outbox and messenger_messages tables will be automatically created when you first start the worker. This is expected behaviour with auto_setup: true.
See docs/database-schema.md for complete schema details and rationale.
How it works
-
You emit your event (e.g.,
inventory.stock.received) from Application A (Inventory management) -
Messenger routing directs it to the outbox transport (Doctrine/database), inserting it in the same transaction as your business logic
-
php bin/console messenger:consume outbox -vvfetches events from outbox and publishes them to AMQP- Default routing: exchange =
inventory.stock, routing key =inventory.stock.received - Transactional outbox provides at-least-once delivery (events may be sent multiple times)
- Deduplication must happen at the receiving end
- Default routing: exchange =
-
Application B (Procurement) sets up AMQP infrastructure:
- Queue:
inventory_stock - Binds to exchange:
inventory.stock - Binding key:
inventory.stock.*
- Queue:
-
php bin/console messenger:consume amqp_orders -vvfetches events from AMQP and passes them to messenger worker- Transactional middleware starts transaction
- Deduplication middleware checks for duplicated event_id
- Skips event when id exists
- Inserts event_id if not
- Command handler executes business logic, performs database operation
- Transaction is committed, ack is sent to AMQP
- If ack fails, event can be redelivered, but deduplication middleware will skip it
Start Workers
# Process outbox (publish events to AMQP) php bin/console messenger:consume outbox -vv # Consume messages from AMQP (example: orders queue) php bin/console messenger:consume amqp_orders -vv
Usage
Publishing Events via Outbox
Step 1: Implement OutboxEventInterface
All events published through the outbox pattern must implement OutboxEventInterface:
use Freyr\MessageBroker\Outbox\MessageName; use Freyr\MessageBroker\Outbox\EventBridge\OutboxMessage; #[MessageName('order.placed')] final class OrderPlaced implements OutboxMessage { public function __construct( public string $orderId, public float $amount, ) { } }
Requirements:
- Must have #[MessageName('semantic.name')] attribute
- Must implement OutboxMessage marker interface
Domain Layer Alternative
To avoid coupling your domain to infrastructure, extend the interface:
// Your domain layer namespace App\Shared\Integration; use Freyr\MessageBroker\Outbox\OutboxEventInterface; interface IntegrationEvent extends OutboxEventInterface { // Your domain-specific contracts } // Your events #[MessageName('order.placed')] final class OrderPlaced implements IntegrationEvent { // ... }
This keeps your events referencing your own interface, not the infrastructure one.
AMQP Routing:
By default, events are published using convention-based routing:
- Exchange: First 2 parts of message name (
order.placed→order.placed) - Routing Key: Full message name (
order.placed)
You can override this with attributes:
use Freyr\MessageBroker\Outbox\Routing\MessengerTransport; use Freyr\MessageBroker\Outbox\Routing\AmqpRoutingKey; #[MessageName('order.placed')] #[MessengerTransport('commerce')] // Custom exchange #[AmqpRoutingKey('commerce.order.placed')] // Custom routing key final readonly class OrderPlaced { // ... }
See AMQP Routing for complete documentation.
2. Configure Routing
Edit config/packages/messenger.yaml:
framework: messenger: routing: 'App\Domain\Event\OrderPlaced': outbox
3. Dispatch the Event
use Symfony\Component\Messenger\MessageBusInterface; use Freyr\Identity\Id; class OrderService { public function __construct( private MessageBusInterface $messageBus, ) {} public function placeOrder(Order $order): void { // Save order (transaction started) $this->entityManager->persist($order); // Dispatch event (saved to outbox in same transaction) // Note: messageId is auto-generated by OutboxToAmqpBridge $this->messageBus->dispatch(new OrderPlaced( orderId: $order->getId(), customerId: $order->getCustomerId(), totalAmount: $order->getTotalAmount(), placedAt: CarbonImmutable::now() )); // Commit (order + event saved atomically) $this->entityManager->flush(); } }
The event is now stored in the outbox. Workers will publish it to AMQP asynchronously.
Consuming Events (Inbox Pattern)
1. Define Consumer Message
Create a message class matching the event structure:
<?php namespace App\Message; use Freyr\Identity\Id; use Carbon\CarbonImmutable; final readonly class OrderPlaced { public function __construct( public Id $orderId, public Id $customerId, public float $totalAmount, public CarbonImmutable $placedAt, ) {} }
Important: Consumer messages contain only business data (no messageId - it's transport metadata).
2. Configure Message Type Mapping
Edit config/packages/message_broker.yaml:
message_broker: inbox: message_types: 'order.placed': 'App\Message\OrderPlaced'
3. Create Handler
use Symfony\Component\Messenger\Attribute\AsMessageHandler; #[AsMessageHandler] final readonly class OrderPlacedHandler { public function __invoke(OrderPlaced $message): void { // Type-safe with IDE autocomplete! $orderId = $message->orderId; $amount = $message->totalAmount; // Process the event... } }
4. Consume from AMQP
Prerequisites: RabbitMQ queue must already exist with proper bindings.
# Consume directly from AMQP transport
php bin/console messenger:consume amqp_orders -vv
Messages are automatically:
- Deserialised by InboxSerialiser into typed PHP objects
- Deduplicated by DeduplicationMiddleware
- Routed to handlers based on PHP class
Message Format
AMQP messages use this structure:
Headers:
type: order.placed- Semantic message nameX-Message-Stamp-MessageIdStamp: [{"messageId":"..."}]- For deduplication
Body (messageId stripped):
{
"orderId": "550e8400-e29b-41d4-a716-446655440000",
"customerId": "01234567-89ab-cdef-0123-456789abcdef",
"totalAmount": 99.99,
"placedAt": "2024-01-01T12:00:00+00:00"
}
Note: messageId is transport metadata (for deduplication), not business data. It's sent via MessageIdStamp header, not in the payload.
Configuration
After Installation
Symfony Flex creates these configuration files:
config/packages/message_broker.yaml:
message_broker: inbox: message_types: {} # Add your mappings here
config/packages/messenger.yaml:
framework: messenger: failure_transport: failed transports: # Outbox transport - AUTO-MANAGED (auto_setup: true) outbox: dsn: 'doctrine://default?table_name=messenger_outbox&queue_name=outbox' serializer: 'Freyr\MessageBroker\Serializer\OutboxSerializer' options: auto_setup: true # Symfony creates table automatically # AMQP publish transport - MANUAL MANAGEMENT (auto_setup: false) amqp: dsn: '%env(MESSENGER_AMQP_DSN)%' serializer: 'Freyr\MessageBroker\Serializer\OutboxSerializer' options: auto_setup: false # Infrastructure managed by ops # AMQP consumption transport - MANUAL MANAGEMENT (auto_setup: false) amqp_orders: dsn: '%env(MESSENGER_AMQP_DSN)%' serializer: 'Freyr\MessageBroker\Serializer\InboxSerializer' options: auto_setup: false # Infrastructure managed by ops queues: orders_queue: ~ # Failed transport - AUTO-MANAGED (auto_setup: true) failed: dsn: 'doctrine://default?queue_name=failed' options: auto_setup: true # Symfony creates table automatically routing: # Add your domain events here: # 'App\Domain\Event\OrderPlaced': outbox
See CLAUDE.md for complete configuration documentation.
.env:
MESSENGER_AMQP_DSN=amqp://guest:guest@localhost:5672/%2f
Customisation
You can customise:
- Transport DSNs in
messenger.yaml - AMQP connection in
.env - Failed transport name
Production Deployment
Docker Compose
services: # Process outbox database and publish to AMQP worker-outbox: image: your-app:latest command: php bin/console messenger:consume outbox --time-limit=3600 restart: always deploy: replicas: 2 # Consume from AMQP and process with handlers worker-amqp-orders: image: your-app:latest command: php bin/console messenger:consume amqp_orders --time-limit=3600 restart: always deploy: replicas: 3
Monitoring
# View queue statistics php bin/console messenger:stats # View failed messages php bin/console messenger:failed:show # Retry failed messages php bin/console messenger:failed:retry
Documentation
Core Architecture:
- Database Schema - Complete 3-table architecture, migrations, and cleanup strategies
- Outbox Pattern - Transactional consistency for event publishing
- Inbox Deduplication - Preventing duplicate message processing
- Message Serialisation - Semantic naming and cross-language compatibility
- AMQP Routing - Convention-based routing with attribute overrides
Developer Guide:
- See
CLAUDE.mdfor complete configuration examples and implementation details
Manual Installation (Without Symfony Flex)
If Symfony Flex is not available in your project, follow these manual installation steps:
1. Install via Composer
composer require freyr/message-broker --no-scripts
2. Register the Bundle
Edit config/bundles.php:
return [ // ... other bundles Freyr\MessageBroker\FreyrMessageBrokerBundle::class => ['all' => true], ];
3. Create Configuration Files
config/packages/message_broker.yaml:
message_broker: inbox: # Message type mapping: message_name => PHP class # Used by InboxSerializer to translate semantic names to FQN during deserialization message_types: # Examples: # 'order.placed': 'App\Message\OrderPlaced' # 'user.registered': 'App\Message\UserRegistered'
config/packages/messenger.yaml:
framework: messenger: # Failure transport for handling failed messages failure_transport: failed # Middleware configuration # DeduplicationMiddleware runs AFTER doctrine_transaction (priority -10) # This ensures deduplication INSERT is within the transaction default_middleware: enabled: true allow_no_handlers: false buses: messenger.bus.default: middleware: - doctrine_transaction # Priority 0 (starts transaction) # DeduplicationMiddleware (priority -10) registered via service tag # Runs after transaction starts, before handlers transports: outbox: dsn: 'doctrine://default?table_name=messenger_outbox&queue_name=outbox' serializer: 'Freyr\MessageBroker\Serializer\OutboxSerializer' retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 # AMQP transport - external message broker # For publishing from outbox: uses OutboxSerializer # For consuming to inbox: uses InboxSerializer amqp: dsn: '%env(MESSENGER_AMQP_DSN)%' serializer: 'Freyr\MessageBroker\Serializer\OutboxSerializer' options: auto_setup: false retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 # AMQP consumption transport (example) - uses InboxSerializer amqp_orders: dsn: '%env(MESSENGER_AMQP_DSN)%' serializer: 'Freyr\MessageBroker\Serializer\InboxSerializer' options: auto_setup: false queue: name: 'orders_queue' retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 # Failed transport - for all failed messages failed: dsn: 'doctrine://default?queue_name=failed' options: auto_setup: false routing: # Outbox messages - route domain events to outbox transport # Example: # 'App\Domain\Event\OrderPlaced': outbox # 'App\Domain\Event\UserRegistered': outbox # Inbox messages (consumed from AMQP transports) # Messages are deserialized by InboxSerializer into typed objects # DeduplicationMiddleware automatically prevents duplicate processing # Handlers execute synchronously (no routing needed - AMQP transport handles delivery) # Example handlers: # #[AsMessageHandler] # class OrderPlacedHandler { public function __invoke(OrderPlaced $message) {} }
config/packages/doctrine.yaml:
doctrine: dbal: types: id_binary: Freyr\MessageBroker\Doctrine\Type\IdType
config/services.yaml:
services: # Doctrine Integration Freyr\MessageBroker\Doctrine\Type\IdType: tags: - { name: 'doctrine.dbal.types', type: 'id_binary' } # Auto-register all Normalizers using Symfony's native tag # These will be automatically added to the @serializer service Freyr\MessageBroker\Serializer\Normalizer\: resource: '../vendor/freyr/message-broker/src/Serializer/Normalizer/' tags: ['serializer.normalizer'] # Custom ObjectNormalizer with property promotion support # This overrides Symfony's default ObjectNormalizer with propertyTypeExtractor # Lower priority (-1000) ensures it runs as fallback after specialized normalizers Freyr\MessageBroker\Serializer\Normalizer\PropertyPromotionObjectNormalizer: autowire: true class: Symfony\Component\Serializer\Normalizer\ObjectNormalizer arguments: $propertyTypeExtractor: '@property_info' tags: - { name: 'serializer.normalizer', priority: -1000 } # Inbox Serializer - for AMQP consumption # Injects native @serializer service with all registered normalizers Freyr\MessageBroker\Serializer\InboxSerializer: arguments: $messageTypes: '%message_broker.inbox.message_types%' $serializer: '@serializer' # Outbox Serializer - for AMQP publishing # Injects native @serializer service with all registered normalizers Freyr\MessageBroker\Serializer\OutboxSerializer: arguments: $serializer: '@serializer' # Deduplication Store (DBAL implementation) Freyr\MessageBroker\Inbox\DeduplicationStore: class: Freyr\MessageBroker\Inbox\DeduplicationDbalStore arguments: $connection: '@doctrine.dbal.default_connection' $logger: '@logger' # Deduplication Middleware (inbox pattern) # Runs AFTER doctrine_transaction middleware (priority -10) Freyr\MessageBroker\Inbox\DeduplicationMiddleware: arguments: $store: '@Freyr\MessageBroker\Inbox\DeduplicationStore' tags: - { name: 'messenger.middleware', priority: -10 } # AMQP Routing Strategy (default convention-based routing) Freyr\MessageBroker\Outbox\Routing\AmqpRoutingStrategyInterface: class: Freyr\MessageBroker\Outbox\Routing\DefaultAmqpRoutingStrategy # Outbox Bridge (publishes outbox events to AMQP) Freyr\MessageBroker\Outbox\EventBridge\OutboxToAmqpBridge: autoconfigure: true arguments: $eventBus: '@messenger.default_bus' $routingStrategy: '@Freyr\MessageBroker\Outbox\Routing\AmqpRoutingStrategyInterface' $logger: '@logger' # Deduplication Store Cleanup Command (optional maintenance) Freyr\MessageBroker\Command\DeduplicationStoreCleanup: arguments: $connection: '@doctrine.dbal.default_connection' tags: ['console.command']
5. Create Database Migration
Create migrations/VersionYYYYMMDDHHIISS.php:
<?php declare(strict_types=1); namespace DoctrineMigrations; use Doctrine\DBAL\Schema\Schema; use Doctrine\Migrations\AbstractMigration; /** * Auto-generated Migration: Message Broker Deduplication table * * Creates deduplication tracking table with binary UUID v7 for middleware-based deduplication. */ final class Version20250103000001 extends AbstractMigration { public function getDescription(): string { return 'Create message_broker_deduplication table for middleware-based deduplication'; } public function up(Schema $schema): void { // Create message_broker_deduplication table with binary UUID v7 $this->addSql(" CREATE TABLE message_broker_deduplication ( message_id BINARY(16) NOT NULL PRIMARY KEY COMMENT '(DC2Type:id_binary)', message_name VARCHAR(255) NOT NULL, processed_at DATETIME NOT NULL, INDEX idx_message_name (message_name), INDEX idx_processed_at (processed_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci "); } public function down(Schema $schema): void { $this->addSql('DROP TABLE message_broker_deduplication'); } }
6. Add Environment Variables
Edit .env:
###> freyr/message-broker ### MESSENGER_AMQP_DSN=amqp://guest:guest@localhost:5672/%2f ###< freyr/message-broker ###
7. Run Migrations
php bin/console doctrine:migrations:migrate
8. Verify Installation
# Check bundle is registered php bin/console debug:container | grep MessageBroker # Check transports are available php bin/console debug:messenger # Start workers php bin/console messenger:consume outbox -vv # Outbox database → AMQP php bin/console messenger:consume amqp_orders -vv # AMQP → handlers