spryker/symfony-messenger-extension

SymfonyMessengerExtension module

Installs: 2

Dependents: 2

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

pkg:composer/spryker/symfony-messenger-extension

1.0.0 2026-02-17 17:18 UTC

This package is auto-updated.

Last update: 2026-02-17 19:59:23 UTC


README

Latest Stable Version Minimum PHP Version

This module provides plugin interfaces for SymfonyMessenger module.

Installation

composer require spryker/symfony-messenger-extension

Usage

MessageMappingProviderPluginInterface

This plugin interface allows you to provide message handlers and transport mappings for message classes.

Example Plugin Implementation

<?php

namespace Pyz\Zed\MyModule\Communication\Plugin\SymfonyMessenger;

use Pyz\Zed\MyModule\Business\Message\OrderCreatedMessage;
use Pyz\Zed\MyModule\Business\Message\ProductUpdatedMessage;
use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\MessageMappingProviderPluginInterface;
use Spryker\Zed\Kernel\Communication\AbstractPlugin;

/**
 * @method \Pyz\Zed\MyModule\Business\MyModuleFacadeInterface getFacade()
 */
class MyModuleMessageMappingPlugin extends AbstractPlugin implements MessageMappingProviderPluginInterface
{
    /**
     * @return array<string, array<callable>>
     */
    public function getMessageToHandlerMap(): array
    {
        return [
            OrderCreatedMessage::class => [[$this, 'handleOrderCreated']],
            ProductUpdatedMessage::class => [function (ProductUpdatedMessage $message): void {
                $this->getFacade()->updateProductSearch($message->getProductId());
            }],
        ];
    }

    /**
     * @return array<string, string|array>
     */
    public function getMessageToTransportMap(): array
    {
        return [
            OrderCreatedMessage::class => 'async_transport',
            ProductUpdatedMessage::class => ['sync_transport', 'logging_transport'],
        ];
    }

    /**
     * @param \Pyz\Zed\MyModule\Business\Message\OrderCreatedMessage $message
     *
     * @return void
     */
    public function handleOrderCreated(OrderCreatedMessage $message): void
    {
        $this->getFacade()->processOrderCreated($message);
    }
}

Registering Handler Plugins

Register your plugins in the appropriate DependencyProvider (typically Zed or Client layer):

<?php

namespace Pyz\Zed\SymfonyMessenger;

use Pyz\Zed\MyModule\Communication\Plugin\SymfonyMessenger\MyModuleMessageMappingPlugin;
use Spryker\Zed\SymfonyMessenger\SymfonyMessengerDependencyProvider as SprykerSymfonyMessengerDependencyProvider;

class SymfonyMessengerDependencyProvider extends SprykerSymfonyMessengerDependencyProvider
{
    /**
     * @return array<\Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\MessageMappingProviderPluginInterface>
     */
    protected function getMessageMappingProviderPlugins(): array
    {
        return [
            new MyModuleMessageMappingPlugin(),
        ];
    }
}

TransportFactoryProviderPluginInterface

This plugin interface allows you to provide custom Symfony Messenger transport factories to extend the list of available transports.

Example Plugin Implementation

<?php

namespace Pyz\Client\MyModule\Plugin\SymfonyMessenger;

use Spryker\Client\Kernel\AbstractPlugin;
use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\TransportFactoryProviderPluginInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;

class MyTransportFactoryProviderPlugin extends AbstractPlugin implements TransportFactoryProviderPluginInterface
{
    /**
     * @return array<\Symfony\Component\Messenger\Transport\TransportFactoryInterface>
     */
    public function getTransportFactories(): array
    {
        return [
            new MyCustomTransportFactory(),
            new AnotherTransportFactory(),
        ];
    }
}

Creating a Custom Transport Factory

<?php

namespace Pyz\Client\MyModule\Transport;

use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class MyCustomTransportFactory implements TransportFactoryInterface
{
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        return new MyCustomTransport($dsn, $options, $serializer);
    }

    public function supports(string $dsn, array $options): bool
    {
        // Return true if this factory can handle the given DSN
        return str_starts_with($dsn, 'mycustom://');
    }
}

Registering the Plugin

Register your plugin in SymfonyMessengerDependencyProvider:

<?php

namespace Pyz\Client\SymfonyMessenger;

use Pyz\Client\MyModule\Plugin\SymfonyMessenger\MyTransportFactoryProviderPlugin;
use Spryker\Client\SymfonyMessenger\SymfonyMessengerDependencyProvider as SprykerSymfonyMessengerDependencyProvider;

class SymfonyMessengerDependencyProvider extends SprykerSymfonyMessengerDependencyProvider
{
    /**
     * @return array<\Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\TransportFactoryProviderPluginInterface>
     */
    protected function getTransportFactoryProviderPlugins(): array
    {
        return [
            new MyTransportFactoryProviderPlugin(),
        ];
    }
}

AvailableTransportProviderPluginInterface

This plugin interface allows you to define available transports by providing their names and DSN strings.

Example Plugin Implementation

<?php

namespace Pyz\Client\MyModule\Plugin\SymfonyMessenger;

use Spryker\Client\Kernel\AbstractPlugin;
use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\AvailableTransportProviderPluginInterface;

class MyAvailableTransportProviderPlugin extends AbstractPlugin implements AvailableTransportProviderPluginInterface
{
    /**
     * @return array<string, string>
     */
    public function getTransportDSNByTransportName(): array
    {
        return [
            'schedule_queue' => 'schedule://queue',
            'async_queue' => 'amqp://localhost:5672/messages',
            'sync_transport' => 'sync://',
        ];
    }
}

Registering the Plugin

<?php

namespace Pyz\Client\SymfonyMessenger;

use Pyz\Client\MyModule\Plugin\SymfonyMessenger\MyAvailableTransportProviderPlugin;
use Spryker\Client\SymfonyMessenger\SymfonyMessengerDependencyProvider as SprykerSymfonyMessengerDependencyProvider;

class SymfonyMessengerDependencyProvider extends SprykerSymfonyMessengerDependencyProvider
{
    /**
     * @return array<\Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\AvailableTransportProviderPluginInterface>
     */
    protected function getAvailableTransportProviderPlugins(): array
    {
        return [
            new MyAvailableTransportProviderPlugin(),
        ];
    }
}

GroupAwareTransportsPluginInterface

This plugin interface allows you to organize transports into groups for easier management and consumption.

Example Plugin Implementation

<?php

namespace Pyz\Client\MyModule\Plugin\SymfonyMessenger;

use Spryker\Client\Kernel\AbstractPlugin;
use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\GroupAwareTransportsPluginInterface;

class MyGroupAwareTransportsPlugin extends AbstractPlugin implements GroupAwareTransportsPluginInterface
{
    /**
     * @return array<string, array<string>>
     */
    public function getGroupMapping(): array
    {
        return [
            'compiled-cron-scheduler' => ['queue-worker-start', 'data-backup-task'],
            'order-processing' => ['order-created-queue', 'order-updated-queue', 'order-shipped-queue'],
            'notification-system' => ['email-queue', 'sms-queue', 'push-notification-queue'],
        ];
    }
}

Registering the Plugin

<?php

namespace Pyz\Client\SymfonyMessenger;

use Pyz\Client\MyModule\Plugin\SymfonyMessenger\MyGroupAwareTransportsPlugin;
use Spryker\Client\SymfonyMessenger\SymfonyMessengerDependencyProvider as SprykerSymfonyMessengerDependencyProvider;

class SymfonyMessengerDependencyProvider extends SprykerSymfonyMessengerDependencyProvider
{
    /**
     * @return array<\Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\GroupAwareTransportsPluginInterface>
     */
    protected function getGroupAwareTransportsPlugins(): array
    {
        return [
            new MyGroupAwareTransportsPlugin(),
        ];
    }
}

Use Case: Consumer Groups

The consumer command can use group names instead of individual transport names:

# Instead of running multiple consumers:
console symfonymessenger:consume queue-worker-start data-backup-task

# Use a group name:
console symfonymessenger:consume compiled-cron-scheduler

The consumer will automatically unpack the group mapping and consume from all transports in that group.

Real-World Example: Scheduler Transport

The SymfonyScheduler module can provide a scheduler transport factory:

<?php

namespace Spryker\Client\SymfonyScheduler\Plugin\SymfonyMessenger;

use Spryker\Client\Kernel\AbstractPlugin;
use Spryker\Shared\SymfonyMessengerExtension\Dependency\Plugin\TransportFactoryProviderPluginInterface;

class SchedulerTransportFactoryProviderPlugin extends AbstractPlugin implements TransportFactoryProviderPluginInterface
{
    /**
     * @return array<\Symfony\Component\Messenger\Transport\TransportFactoryInterface>
     */
    public function getTransportFactories(): array
    {
        return [
            $this->getFactory()->createSchedulerTransportFactory(),
        ];
    }
}

Then in SymfonyMessenger, these factories are collected and used:

public function getInstalledTransportFactories(): array
{
    $transportFactories = [
        new AmqpTransportFactory(),
    ];

    foreach ($this->getTransportFactoryProviderPlugins() as $plugin) {
        $transportFactories = array_merge(
            $transportFactories,
            $plugin->getTransportFactories()
        );
    }

    return $transportFactories;
}

How Transport Factories Work

  1. DSN Matching: When creating a transport, Symfony checks each factory's supports() method
  2. Factory Selection: The first factory that returns true for supports() is used
  3. Transport Creation: The selected factory's createTransport() method creates the transport instance

How Message Handlers Work

  1. Handler Registration: Plugins provide message class to handler mappings via getMessageToHandlerMap()
  2. Transport Routing: Plugins define message-to-transport mappings via getMessageToTransportMap()
  3. Message Dispatch: When a message is dispatched, the message bus looks up the handler and transport
  4. Handler Execution: The callable is invoked with the message as parameter
  5. Result Processing: Handler processes the message (side effects, state changes, etc.)

How Transport Groups Work

  1. Group Definition: Plugins provide group-to-transport mappings via getGroupMapping()
  2. Consumer Resolution: When starting a consumer with a group name, the system unpacks it
  3. Multi-Transport Consumption: All transports in the group are consumed simultaneously
  4. Simplified Operations: Operators work with logical groups instead of individual transport names

DSN Examples

  • amqp://localhost:5672/messages - AMQP transport
  • scheduler://default - Scheduler transport (from SymfonyScheduler)
  • redis://localhost:6379/messages - Redis transport
  • doctrine://default - Doctrine transport

Documentation

Spryker Documentation