spryker / symfony-messenger-extension
SymfonyMessengerExtension module
Installs: 2
Dependents: 2
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
pkg:composer/spryker/symfony-messenger-extension
Requires
- php: >=8.3
Requires (Dev)
This package is auto-updated.
Last update: 2026-02-17 19:59:23 UTC
README
This module provides plugin interfaces for SymfonyMessenger module.
Installation
composer require spryker/symfony-messenger-extension
Usage
MessageMappingProviderPluginInterface
This plugin interface allows you to provide message handlers and transport mappings for message classes.
Example Plugin Implementation
<?php namespace Pyz\Zed\MyModule\Communication\Plugin\SymfonyMessenger; use Pyz\Zed\MyModule\Business\Message\OrderCreatedMessage; use Pyz\Zed\MyModule\Business\Message\ProductUpdatedMessage; use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\MessageMappingProviderPluginInterface; use Spryker\Zed\Kernel\Communication\AbstractPlugin; /** * @method \Pyz\Zed\MyModule\Business\MyModuleFacadeInterface getFacade() */ class MyModuleMessageMappingPlugin extends AbstractPlugin implements MessageMappingProviderPluginInterface { /** * @return array<string, array<callable>> */ public function getMessageToHandlerMap(): array { return [ OrderCreatedMessage::class => [[$this, 'handleOrderCreated']], ProductUpdatedMessage::class => [function (ProductUpdatedMessage $message): void { $this->getFacade()->updateProductSearch($message->getProductId()); }], ]; } /** * @return array<string, string|array> */ public function getMessageToTransportMap(): array { return [ OrderCreatedMessage::class => 'async_transport', ProductUpdatedMessage::class => ['sync_transport', 'logging_transport'], ]; } /** * @param \Pyz\Zed\MyModule\Business\Message\OrderCreatedMessage $message * * @return void */ public function handleOrderCreated(OrderCreatedMessage $message): void { $this->getFacade()->processOrderCreated($message); } }
Registering Handler Plugins
Register your plugins in the appropriate DependencyProvider (typically Zed or Client layer):
<?php namespace Pyz\Zed\SymfonyMessenger; use Pyz\Zed\MyModule\Communication\Plugin\SymfonyMessenger\MyModuleMessageMappingPlugin; use Spryker\Zed\SymfonyMessenger\SymfonyMessengerDependencyProvider as SprykerSymfonyMessengerDependencyProvider; class SymfonyMessengerDependencyProvider extends SprykerSymfonyMessengerDependencyProvider { /** * @return array<\Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\MessageMappingProviderPluginInterface> */ protected function getMessageMappingProviderPlugins(): array { return [ new MyModuleMessageMappingPlugin(), ]; } }
TransportFactoryProviderPluginInterface
This plugin interface allows you to provide custom Symfony Messenger transport factories to extend the list of available transports.
Example Plugin Implementation
<?php namespace Pyz\Client\MyModule\Plugin\SymfonyMessenger; use Spryker\Client\Kernel\AbstractPlugin; use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\TransportFactoryProviderPluginInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface; class MyTransportFactoryProviderPlugin extends AbstractPlugin implements TransportFactoryProviderPluginInterface { /** * @return array<\Symfony\Component\Messenger\Transport\TransportFactoryInterface> */ public function getTransportFactories(): array { return [ new MyCustomTransportFactory(), new AnotherTransportFactory(), ]; } }
Creating a Custom Transport Factory
<?php namespace Pyz\Client\MyModule\Transport; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportInterface; class MyCustomTransportFactory implements TransportFactoryInterface { public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface { return new MyCustomTransport($dsn, $options, $serializer); } public function supports(string $dsn, array $options): bool { // Return true if this factory can handle the given DSN return str_starts_with($dsn, 'mycustom://'); } }
Registering the Plugin
Register your plugin in SymfonyMessengerDependencyProvider:
<?php namespace Pyz\Client\SymfonyMessenger; use Pyz\Client\MyModule\Plugin\SymfonyMessenger\MyTransportFactoryProviderPlugin; use Spryker\Client\SymfonyMessenger\SymfonyMessengerDependencyProvider as SprykerSymfonyMessengerDependencyProvider; class SymfonyMessengerDependencyProvider extends SprykerSymfonyMessengerDependencyProvider { /** * @return array<\Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\TransportFactoryProviderPluginInterface> */ protected function getTransportFactoryProviderPlugins(): array { return [ new MyTransportFactoryProviderPlugin(), ]; } }
AvailableTransportProviderPluginInterface
This plugin interface allows you to define available transports by providing their names and DSN strings.
Example Plugin Implementation
<?php namespace Pyz\Client\MyModule\Plugin\SymfonyMessenger; use Spryker\Client\Kernel\AbstractPlugin; use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\AvailableTransportProviderPluginInterface; class MyAvailableTransportProviderPlugin extends AbstractPlugin implements AvailableTransportProviderPluginInterface { /** * @return array<string, string> */ public function getTransportDSNByTransportName(): array { return [ 'schedule_queue' => 'schedule://queue', 'async_queue' => 'amqp://localhost:5672/messages', 'sync_transport' => 'sync://', ]; } }
Registering the Plugin
<?php namespace Pyz\Client\SymfonyMessenger; use Pyz\Client\MyModule\Plugin\SymfonyMessenger\MyAvailableTransportProviderPlugin; use Spryker\Client\SymfonyMessenger\SymfonyMessengerDependencyProvider as SprykerSymfonyMessengerDependencyProvider; class SymfonyMessengerDependencyProvider extends SprykerSymfonyMessengerDependencyProvider { /** * @return array<\Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\AvailableTransportProviderPluginInterface> */ protected function getAvailableTransportProviderPlugins(): array { return [ new MyAvailableTransportProviderPlugin(), ]; } }
GroupAwareTransportsPluginInterface
This plugin interface allows you to organize transports into groups for easier management and consumption.
Example Plugin Implementation
<?php namespace Pyz\Client\MyModule\Plugin\SymfonyMessenger; use Spryker\Client\Kernel\AbstractPlugin; use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\GroupAwareTransportsPluginInterface; class MyGroupAwareTransportsPlugin extends AbstractPlugin implements GroupAwareTransportsPluginInterface { /** * @return array<string, array<string>> */ public function getGroupMapping(): array { return [ 'compiled-cron-scheduler' => ['queue-worker-start', 'data-backup-task'], 'order-processing' => ['order-created-queue', 'order-updated-queue', 'order-shipped-queue'], 'notification-system' => ['email-queue', 'sms-queue', 'push-notification-queue'], ]; } }
Registering the Plugin
<?php namespace Pyz\Client\SymfonyMessenger; use Pyz\Client\MyModule\Plugin\SymfonyMessenger\MyGroupAwareTransportsPlugin; use Spryker\Client\SymfonyMessenger\SymfonyMessengerDependencyProvider as SprykerSymfonyMessengerDependencyProvider; class SymfonyMessengerDependencyProvider extends SprykerSymfonyMessengerDependencyProvider { /** * @return array<\Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\GroupAwareTransportsPluginInterface> */ protected function getGroupAwareTransportsPlugins(): array { return [ new MyGroupAwareTransportsPlugin(), ]; } }
Use Case: Consumer Groups
The consumer command can use group names instead of individual transport names:
# Instead of running multiple consumers: console symfonymessenger:consume queue-worker-start data-backup-task # Use a group name: console symfonymessenger:consume compiled-cron-scheduler
The consumer will automatically unpack the group mapping and consume from all transports in that group.
Real-World Example: Scheduler Transport
The SymfonyScheduler module can provide a scheduler transport factory:
<?php namespace Spryker\Client\SymfonyScheduler\Plugin\SymfonyMessenger; use Spryker\Client\Kernel\AbstractPlugin; use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\TransportFactoryProviderPluginInterface; class SchedulerTransportFactoryProviderPlugin extends AbstractPlugin implements TransportFactoryProviderPluginInterface { /** * @return array<\Symfony\Component\Messenger\Transport\TransportFactoryInterface> */ public function getTransportFactories(): array { return [ $this->getFactory()->createSchedulerTransportFactory(), ]; } }
Then in SymfonyMessenger, these factories are collected and used:
public function getInstalledTransportFactories(): array { $transportFactories = [ new AmqpTransportFactory(), ]; foreach ($this->getTransportFactoryProviderPlugins() as $plugin) { $transportFactories = array_merge( $transportFactories, $plugin->getTransportFactories() ); } return $transportFactories; }
How Transport Factories Work
- DSN Matching: When creating a transport, Symfony checks each factory's
supports()method - Factory Selection: The first factory that returns
trueforsupports()is used - Transport Creation: The selected factory's
createTransport()method creates the transport instance
How Message Handlers Work
- Handler Registration: Plugins provide message class to handler mappings via
getMessageToHandlerMap() - Transport Routing: Plugins define message-to-transport mappings via
getMessageToTransportMap() - Message Dispatch: When a message is dispatched, the message bus looks up the handler and transport
- Handler Execution: The callable is invoked with the message as parameter
- Result Processing: Handler processes the message (side effects, state changes, etc.)
How Transport Groups Work
- Group Definition: Plugins provide group-to-transport mappings via
getGroupMapping() - Consumer Resolution: When starting a consumer with a group name, the system unpacks it
- Multi-Transport Consumption: All transports in the group are consumed simultaneously
- Simplified Operations: Operators work with logical groups instead of individual transport names
DSN Examples
amqp://localhost:5672/messages- AMQP transportscheduler://default- Scheduler transport (from SymfonyScheduler)redis://localhost:6379/messages- Redis transportdoctrine://default- Doctrine transport