medeiroz / laravel-amqp-toolkit
Laravel AMQP Toolkit to publish and consume messages from RabbitMQ
Fund package maintenance!
Medeiroz
Installs: 2 108
Dependents: 0
Suggesters: 0
Security: 0
Stars: 4
Watchers: 2
Forks: 0
Open Issues: 0
Requires
- php: ^8.1
- ext-mbstring: *
- ext-sockets: *
- illuminate/contracts: ^10.0|^11.0
- php-amqplib/php-amqplib: ^3.6
- spatie/laravel-package-tools: ^1.14.0
Requires (Dev)
- larastan/larastan: ^2.0.1
- laravel/pint: ^1.0
- nunomaduro/collision: ^7.8|^8.0
- orchestra/testbench: ^8.8|^9.0
- pestphp/pest: ^2.20
- pestphp/pest-plugin-arch: ^2.0
- pestphp/pest-plugin-laravel: ^2.0
- phpstan/extension-installer: ^1.1
- phpstan/phpstan-deprecation-rules: ^1.0
- phpstan/phpstan-phpunit: ^1.0
- spatie/laravel-ray: ^1.26
README
This package was developed to facilitate the integration of Laravel applications with RabbitMQ, providing functionalities to consume and publish messages, in addition to providing a simple way to manage the AMQP / RabbitMQ infrastructure through schema migrations, inspired by Laravel's database migrations.
If you need to consume messages from RabbitMQ queues, publish messages in exchanges or queues, or manage the RabbitMQ infrastructure, this package is for you.
If you have a problems to manage the RabbitMQ Schema same as queues, exchanges and shovels, this package is for you.
Main Features
- Schema Migrations: Use schema migrations to create, delete and manage queues, exchanges and shovels in RabbitMQ, similar to what Laravel offers for databases.
- Consume Queue: Ability to consume messages from a RabbitMQ queue in a simple and efficient way.
- Publish Messages: Ability to publish messages in exchanges or directly in RabbitMQ queues.
- Event Listeners: Ability to listen to messages received from AMQP / RabbitMQ queues and exchanges, similar to Laravel's event listeners.
Requirements
- PHP >= 8.1
- Laravel >= 10
Installation
follow the steps below to install the package:
- Install the
medeiroz/laravel-amqp-toolkit
package via composer - Publish and run migrations
- Publish the configuration file
- Environment variables .env
- Configure the AMQP Queue and Laravel Listeners
1. Install the package
Run the command below to install the package via composer:
composer require medeiroz/laravel-amqp-toolkit
2. Publish migrations
You must publish the database migration with:
php artisan vendor:publish --tag="amqp-toolkit-migrations"
Then run the command below to create the table in the database.
php artisan migrate
3. Publish the configuration file
You can publish the configuration file with:
php artisan vendor:publish --tag="amqp-toolkit-config"
This is the content of the published configuration file:
<?php // config for Medeiroz/AmqpToolkit return [ 'schemas' => base_path('amqp-toolkit-schemas'), 'table_name' => env('AMQP_TABLE_NAME', 'amqp_schemas'), 'max-attempts' => env('AMQP_MAX_ATTEMPTS', 10), 'heartbeat' => env('AMQP_HEARTBEAT', 30), 'keepalive' => env('AMQP_KEEPALIVE', true), /** * The default connection to use when no connection is provided to the AMQP client. */ 'connection' => env('AMQP_CONNECTION', 'rabbitmq'), /** * The default logging channel to use when no channel is provided to the AMQP client. * You can use the same channels as the Laravel logging configuration * Like as 'stack', 'single', 'daily' etc... */ 'logging-channel' => env('AMQP_LOG_CHANNEL', env('LOG_CHANNEL')), /** * The queues to be consumed by the consumer command without arguments. */ 'consumer-queues' => [ // 'payment-received' => \App\Listeners\PaymentReceivedListener::class, ], 'connections' => [ 'rabbitmq' => [ 'host' => env('AMQP_HOST', 'localhost'), 'port' => env('AMQP_PORT', 5672), 'api-port' => env('AMQP_API_PORT', 15672), 'user' => env('AMQP_USER', 'guest'), 'password' => env('AMQP_PASSWORD', ''), 'vhost' => env('AMQP_VHOST', '/'), ], ], ];
4. Environment variables .env
Edit the .env
file and add the environment variables of your AMQP / Rabbitmq server.
AMQP_HOST=your-amqp-host AMQP_PORT=5672 AMQP_API_PORT=15672 AMQP_USER=user AMQP_PASSWORD=password AMQP_VHOST=/
Remember to replace the values according to your environment.
5. Configure the AMQP Queue and Laravel Listeners
Edit the config/amqp-toolkit.php
file and add the queues you want to consume and the listeners that will be executed when a message is received.
The key of the array is the name of the queue and the value is the listener class that will be executed when a message is received.
Example: Attach the listener PaymentReceivedListener
to the queue payment-received
/** * The queues to be consumed by the consumer command without arguments. */ 'consumer-queues' => [ 'payment-received' => \App\Listeners\PaymentReceivedListener::class, ],
Refer to the configuration file for more details.
Usage
Schema Migrations
Allowed schema types are:
- queue
- exchange
- shovel
Creating a Schema
php artisan amqp:make-schema {type} {name}
Examples
php artisan amqp:make-schema queue my-first-queue php artisan amqp:make-schema exchange my-exchange php artisan amqp:make-schema shovel my-shovel
Running Migrations
php artisan amqp:migrate
Reverting Migrations
php artisan amqp:migrate --rollback --step=1
Reverting all migrations
php artisan amqp:migrate --refresh
Publishing a message to a queue or exchange
use Medeiroz\AmqpToolkit\Facades\AmqpPublisher; AmqpPublisher::onQueue(['say' => 'Hello World'], 'my-queue-name'); AmqpPublisher::onExchange(['say' => 'Hello World'], 'my-exchange-name'); AmqpPublisher::onExchange(['say' => 'Hello World with routing key'], 'my-exchange-name', 'my-routing-key');
Start consuming an AMQP / RabbitMQ queue
Running the consumer for all queues
To start consuming all queues you must run the artisan command below:
php artisan amqp:consumer
To start consuming a specific queues you must run the artisan command below:
Where my-first-queue
and payment-received
is the name of the queues you want to consume.
php artisan amqp:consumer my-first-queue payment-received
Listening messages
The package provides a way to listen to messages received from AMQP / RabbitMQ queues and exchanges, similar to Laravel's event listeners.
1. Automatic listener registration
When a queue and listener are configured in the config/amqp-toolkit.php
file, the listener will be automatically registered when the consumer is executed.
2. Manual listener registration
Edit the app/Providers/EventServiceProvider.php
file and add the events you want to listen to.
The name of the event should be amqp.QUEUE_NAME
, where QUEUE_NAME
is the name of the queue you want to listen to.
app/Providers/EventServiceProvider.php
public function boot() { Event::listen( 'amqp:payment-received', \App\Listeners\PaymentReceivedListener::class, ); Event::listen( 'amqp:my-queue', \App\Listeners\MyQueueListener::class, ); Event::listen( 'amqp:*', \App\Listeners\AllListener::class, ); }
Note: The amqp:* event is a special event that listens to all messages received from all queues. The queue event is only called if the consumer is being executed
Creating a listener for the event
You can create a listener for the event you want to listen to, for this run the command below:
php artisan make:listener PaymentReceivedListener
This package provides an event object Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent
that contains the queue name and the message body.
<?php namespace App\Listeners; use Illuminate\Contracts\Queue\ShouldQueue; use Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent; class PaymentReceivedListener { public function handle(AmqpReceivedMessageEvent $event): void { \Log::debug('Queue' . $event->queue); \Log::debug('Message Body', $event->messageBody); } }
If you want your events to be executed asynchronously with Laravel Horizon
, you can use the ShouldQueue
interface.
<?php namespace App\Listeners; use Illuminate\Contracts\Queue\ShouldQueue; use Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent; class PaymentReceivedListener implements ShouldQueue { /* ... */ }
More about the AMQP Migration Schema
Creating an Exchange Schema
Create a new exchange schema with the command below:
php artisan amqp:make-schema exchange my-exchange
Generated file:
<?php use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration; return new class extends SchemaMigration { private const NAME = 'my-exchange'; public function up(): void { $this->createExchangeIfNonExists(self::NAME); } public function down(): void { $this->deleteExchangeIfExists(self::NAME); } };
Creating a Queue Schema
Create a new queue schema with the command below:
php artisan amqp:make-schema queue payment-received
Generated file:
<?php use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration; return new class extends SchemaMigration { private const NAME = 'payment-received'; public function up(): void { $this->createQueueIfNonExists(self::NAME) ->withRetry() ->withTtl(seconds: 5) ->withDlq(); } public function down(): void { $this->deleteQueueIfExists(self::NAME); $this->deleteQueueIfExists(self::NAME . '.retry'); $this->deleteQueueIfExists(self::NAME . '.dlq'); } };
This schema creates a queue named payment-received
with retry, ttl and dlq. RabbitMQ will automatically create the payment-received.retry
and payment-received.dlq
queues.
You can create bind the queue to an exchange by calling the bind
method.
// ... public function up(): void { $this->createQueue('my-queue') ->bind('my-exchange', 'my-route-key'); } // or public function up(): void { $this->createQueue('my-queue'); $this->bind('my-queue', 'my-exchange', 'my-route-key'); }
Creating a Shovel Schema
Create a new shovel schema with the command below:
php artisan amqp:make-schema shovel my-shovel
Generated file:
<?php use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration; use Medeiroz\AmqpToolkit\SchemaMigration\Shovel\Resource0dot9; use Medeiroz\AmqpToolkit\SchemaMigration\Shovel\Resource1dot0; return new class extends SchemaMigration { private const NAME = 'my-shovel'; public function up(): void { $this->createShovelIfNonExists( name: self::NAME, source: new Resource0dot9( type: 'queue', uri: 'amqp://', queue: 'my-queue-on-amqp-0-9', autoDelete: 'never', addForwardingHeaders: 'No', ), destination: new Resource1dot0( uri: 'amqps://user:password@my-host.servicebus.windows.net:5671/?verify=verify_none', address: 'my-topic-on-service-bus', ), ); } public function down(): void { $this->deleteShovelIfExists(self::NAME); } };
You can create a shovel with the createShovel
or createShovelIfNonExists
method, passing the name of the shovel, the source and destination resources.
The source
and destination
are objects that represent message broker
resources.
Available resources:
- Resource0dot9
- The Message Broker resource for AMQP 0-9-1 same as RabbitMQ, Qpid, etc.
- Resource1dot0
- The Message Broker resource for AMQP 1.0 same as Azure Service Bus, ActiveMQ, etc.
You create a shovel to move messages from Broker A to Broker B, for example.
- Move messages from RabbitMQ to Azure Service Bus
- Move messages from Azure Service Bus to RabbitMQ
- Move messages from RabbitMQ in
AWS Cloud
to RabbitMQ inAzure Cloud
- Move messages from Azure Service Bus to another subscription in Azure Service Bus
More about the shovel configuration can be found in the RabbitMQ Shovel documentation.
Note: The shovel schema is only available for RabbitMQ 3.8.0 or later.
All available methods in schema migration
createQueue(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Queue
- Create a new queue
createQueueIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Queue
- Create a new queue if it does not exist
deleteQueue(string $name): void
- Delete a queue
deleteQueueIfExists(string $name): void
- Delete a queue if it exists
createExchange(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Exchange
- Create a new exchange
createExchangeIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Exchange
- Create a new exchange if it does not exist
deleteExchange(string $name): void
- Delete an exchange
deleteExchangeIfExists(string $name): void
- Delete an exchange if it exists
bind(string $queue, string $exchange, ?string $routingKey = null): Medeiroz\AmqpToolkit\SchemaMigration\Bind
- Bind a queue to an exchange with a routing key
bindIfExist(string $queue, string $exchange, ?string $routingKey = null): Medeiroz\AmqpToolkit\SchemaMigration\Bind
- Bind a queue to an exchange with a routing key if the queue exists
unbind(string $queue, string $exchange, ?string $routingKey = null): void
- Unbind a queue from an exchange with a routing key
unbindIfExists(string $queue, string $exchange, ?string $routingKey = null): void
- Unbind a queue from an exchange with a routing key if the queue exists
createShovel(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Shovel
- Create a new shovel
createShovelIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Shovel
- Create a new shovel if it does not exist
deleteShovel(string $name): void
- Delete a shovel
deleteShovelIfExists(string $name): void
- Delete a shovel if it exists
More about the Shovel
The shovel is a feature that allows you to move messages between different brokers, for example, RabbitMQ to Azure Service Bus, Azure Service Bus to RabbitMQ, etc.
If your message broke is Azure Service Bus and your application as code with PHP/Laravel, you can not consume messages directly from the Azure Service Bus. You can create a shovel to move messages from Azure Service Bus to RabbitMQ and consume messages from RabbitMQ.
Forward message from
Azure Service Bus
toRabbitMQ
You need create a specific resources for your shovel, for example, you have the resources below:
- On Azure Service Bus
- Subscription A
(Source)
- Subscription A
- On RabbitMQ
- Exchange
(Destination)
- Queue A (for your PHP application to consume messages)
- Shovel "My First Shovel"
- Exchange
Forward message from
RabbitMQ
toAzure Service Bus
You need create a specific resources for your shovel, for example, you have the resources below:
- On RabbitMQ
- Exchange
- Queue D
(Source)
- Shovel "My Second Shovel"
- On Azure Service Bus
- Topic
(Destination)
- Topic
Testing
composer test
Changelog
Please see CHANGELOG for more information on what has changed recently.
Contributing
Please see CONTRIBUTING for details.
Security Vulnerabilities
Please review our security policy on how to report security vulnerabilities.
Credits
License
The MIT License (MIT). Please see License File for more information.