djereg / symfony-rabbitmq
Integrates php-amqplib with Symfony Messenger & RabbitMq.
Installs: 7
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 0
Open Issues: 0
Type:symfony-bundle
Requires
- php: ^8.2
- ext-pcntl: *
- datto/json-rpc: ^6.1
- illuminate/support: ^11.5
- php-amqplib/php-amqplib: ^3.0
- ramsey/uuid: ^4.7
- symfony/messenger: ^7.0
Requires (Dev)
- phpunit/phpunit: ^9.5
- psr/cache: ^3.0
- symfony/config: ^7.0
- symfony/console: ^7.0
- symfony/dependency-injection: ^7.0
- symfony/event-dispatcher: ^7.0
- symfony/framework-bundle: ^7.0
- symfony/http-kernel: ^7.0
- symfony/rate-limiter: ^7.0
- symfony/serializer: ^7.0
- symfony/yaml: ^7.0
Suggests
- symfony/framework-bundle: To use this lib as a full Symfony Bundle and to use the profiler data collector
Conflicts
- symfony/amqp-messenger: *
- symfony/config: <7.0
- symfony/console: <7.0
- symfony/messenger: <7.0
- symfony/yaml: <7.0
This package is auto-updated.
Last update: 2024-11-03 11:25:05 UTC
README
THIS PACKAGE IS PRIMARILY INTENDED FOR INTERNAL/PRIVATE USE IN OWN PROJECTS. IF IT MEETS YOUR NEEDS, FEEL FREE TO USE IT, BUT IN CASE OF ANY MODIFICATION REQUESTS, I WILL CONSIDER MY OWN NEEDS FIRST.
It is still in a very early development phase, so I do not really recommend using it for now, because anything can change on it at any time and previous functions may break.
The package is part of the rabbitmq-multiverse.
Table of Contents
Description
This package is an alternative for Symfony AMQP Messenger Component. Works very similarly, allows you to start a message consumer and send messages to RabbitMQ.
Extends the functionality of the original Symfony Messenger by adding the ability to send and receive events and RPC queries through RabbitMQ messages.
Motivation
Since the microservice architecture has become very popular, I needed a library that provides the possibility of communicating with services written in different programming languages or frameworks.
Symfony has a good messaging system, but it is a closed Symfony-only system. This package allows you to communicate through messages between Symfony and/or other non-Symfony microservices.
On the top of simple JSON messages, utilizes the Symfony Messenger system, which perfectly does the rest of the job.
Usage
Installation
You can install this package via composer using this command:
composer require djereg/symfony-rabbitmq
Configuration
First, you have to define the environment variables.
# Set the queue connection to rabbitmq RABBITMQ_DSN=amqp://guest:guest@rabbitmq:5672/%2f RABBITMQ_QUEUE_NAME=queue-name RABBITMQ_EXCHANGE_NAME=exchange-name RABBITMQ_EXCHANGE_TYPE=direct-name
Then you have to add the configuration to the config/packages/messenger.yaml
file.
# config/packages/messenger.yaml framework: messenger: transports: # The name of the transport must be rabbitmq # If the transport is defined with a different name, # an exception will be thrown at runtime. rabbitmq: dsn: '%env(RABBITMQ_DSN)%' options: queue: name: '%env(RABBITMQ_QUEUE_NAME)%' exchange: name: '%env(RABBITMQ_EXCHANGE_NAME)%' type: '%env(RABBITMQ_EXCHANGE_TYPE)%'
Starting the consumer
To start the consumer, you have to run the following command.
php bin/console rabbitmq:consume
The consumer will start and listen to the queue for incoming messages.
Most of the options are the same as in the original Symfony Messenger consumer.
Start the consumer with the -h
option to see all available options.
Events
Provides an event based asynchronous communication between services.
Dispatching events
Create an event class that extends the MessagePublishEvent
class.
use Djereg\Symfony\RabbitMQ\Event\MessagePublishEvent; class UserCreated extends MessagePublishEvent { // Set the event name protected string $event = 'user.created'; public function __construct(private User $user) { $this->user = $user; } // Create a payload method that returns the data to be sent public function payload(): array { return [ 'user_id' => $this->user->id, ]; } }
And after just dispatch the event like any other Symfony event.
Almost, just a little difference. Instead of the Symfony event dispatcher, you have to use the EventDispatcher
included in this package.
Since the Symfony event system does not support listening to interfaces on top of many events, the EventDispatcher
does the trick by calling the Symfony event dispatcher under the hood and pass the full name of MessagePublishEvent
and a listener
listening to this event will catch all events implementing this interface.
use Djereg\Symfony\RabbitMQ\Service\EventDispatcher; class UserService { public function __construct( private EventDispatcher $dispatcher ) { // } public function createUser(User $user): void { // Dispatch the event $this->dispatcher->dispatch(new UserCreated($user)); } }
That's it, it's not so complicated.
Listening to events
Create an event listener class and add the AsMessageEventListener
attribute like in the example below.
You have to define the event name in the attribute. The event name must be the same as the event name defined in the event object.
The attribute behaves exactly like the Symfony event listener attribute, but adds one more tag to the service, what helps to collect the events listening to. The name differs from the Symfony attribute to avoid the confusion about which event system is used.
use Djereg\Symfony\RabbitMQ\Attribute\AsMessageEventListener; #[AsMessageEventListener('user.created')] class NotifyUserListener { public function __invoke(MessageEvent $event): void { // Do something } }
See more about the event listeners at Symfony docs. The only one thing you have to remember is to define the event name in the listener.
Errors in listeners
When an unhandled error occurs in a listener, the message will be requeued and the event will be dispatched again. This will happen until the message is successfully processed or the maximum number of attempts is reached. If multiple listeners are listening to the same event, the processing will stop at the first listener that throws an exception and the rest of the listeners will not be called.
Preventing this behavior there are two ways. The first one is to catch the exception in the listener and handle it. The second one is to listen to events and put messages to the queue and handle them separately and asynchronously. This way the failed message will not block the rest of the messages.
How to process an event asynchronously?
Oh, it's very simple! You need an intermediate listener that will put a message to the queue automatically and a message handler that will handle the message.
First create a message that extends EventMessage
. This message will be sent to the queue and will be processed by the message handler.
use Djereg\Symfony\RabbitMQ\Message\EventMessage; class UserCreatedMessage extends EventMessage {}
Then create an event listener that extends MessageEventListener
. This listener will listen to the event and put the message to the queue automatically.
use Djereg\Symfony\RabbitMQ\Attribute\AsMessageEventListener; use Djereg\Symfony\RabbitMQ\Listeners\MessageEventListener; #[AsMessageEventListener('user.created')] class UserCreatedListener extends MessageEventListener { protected string $message = UserCreatedMessage::class; }
Finally, create a message handler that will handle the message put to the queue.
#[AsMessageHandler] class UserCreatedMessageHandler { public function __invoke(UserCreatedMessage $message): void { // Get the event name $event = $message->getEvent(); // Get the event payload $payload = $event->getPayload(); // Get the event wrapped in the message $raw = $message->getMessageEvent(); } }
It's pretty simple, right? I know, not really. But it works.
Subscribing to events
The consumer automatically creates the exchange and the queue if they do not exist and registers all listened events as bindings keys to the queue.
RPC
A synchronous-like communication between services.
Uses the JSON-RPC 2.0 protocol for communication.
Registering clients
To call remote procedures, you have to create an instance of the Client
class and register it in the service container.
# config/services.yaml services: users_client: class: Djereg\Symfony\RabbitMQ\Service\Client tags: - name: rabbitmq.rpc.client queue: users # Some example client definitions below orders_client: class: Djereg\Symfony\RabbitMQ\Service\Client tags: - name: rabbitmq.rpc.client queue: orders products_client: class: Djereg\Symfony\RabbitMQ\Service\Client tags: - name: rabbitmq.rpc.client queue: products
Calling remote procedures
Create a service and inject the client into it.
use Djereg\Symfony\RabbitMQ\Contract\ClientInterface; use Symfony\Component\DependencyInjection\Attribute\Autowire; class UserService { public function __construct( #[Autowire('users_client')] private ClientInterface $client, ) { // } public function getUser(int $id): User { // Call the remote procedure $user = $this->client->call('get', ['id' => $id]); // Return the user or do something else with it return $user; } }
Registering remote procedures
Create a service and add the AsRemoteProcedure
attribute.
Works very similarly to the event listeners described above. You can add the attribute to a class or to a method.
use Djereg\Symfony\RabbitMQ\Attribute\AsRemoteProcedure; #[AsRemoteProcedure('get')] class GetUser { public function __invoke(int $id): array { // Query the database and return the result } }
Or another example with the attribute added to a method.
use Djereg\Symfony\RabbitMQ\Attribute\AsRemoteProcedure; class UserService { #[AsRemoteProcedure('get')] public function getUser(int $id): array { // Query the database and return the result } // When adding the attribute to a method, you can omit the name of the procedure. // In this case, the name will be the same as the method name. #[AsRemoteProcedure] public function update(int $id, array $data): bool { // Update the user and return the result of the operation } }
When registering two or more procedures with the same name, an exception will be thrown at startup.
Symfony Messenger
The functionality of the original Symfony Messenger component is also available. Route the messages to the rabbitmq transport, and they will be sent to the queue and processed by the consumer.
Lifecycle Events
MessagePublishingEvent
Dispatched before the message is sent to the queue.
use Djereg\Symfony\RabbitMQ\Event\MessagePublishingEvent;
MessageReceivedEvent
Dispatched when the message is received from the queue.
use Djereg\Symfony\RabbitMQ\Event\MessageReceivedEvent;
MessageProcessingEvent
Dispatched when the message is being processed.
use Djereg\Symfony\RabbitMQ\Event\MessageProcessingEvent;
MessageProcessedEvent
Dispatched when the message is processed.
use Djereg\Symfony\RabbitMQ\Event\MessageProcessedEvent;
Known Issues
- NO TESTS! I know, I know. I will write them soon.