comlaude / laravel-amqp
AMQP wrapper for Laravel and Lumen to publish and consume messages
Installs: 11 368
Dependents: 0
Suggesters: 0
Security: 0
Stars: 2
Watchers: 8
Forks: 0
Open Issues: 0
Requires
- php: ^7.4|^8.0
- php-amqplib/php-amqplib: >=3.2
Requires (Dev)
- comlaude/php-formatter: ^1.0
- illuminate/support: >=v5.5.28
- mockery/mockery: ^1.0
- php-mock/php-mock-phpunit: ^2.6
- phpunit/phpunit: ^9.2
- tightenco/tlint: ^6.0
README
Simple PhpAmqpLib wrapper for interaction with RabbitMQ
Installation
Composer
Add the following to your require part within the composer.json:
"comlaude/laravel-amqp": "^1.0.0"
$ php composer update
or
$ php composer require comlaude/laravel-amqp
Integration
Lumen
Create a config folder in the root directory of your Lumen application and copy the content from vendor/comlaude/laravel-amqp/config/amqp.php to config/amqp.php.
Adjust the properties to your needs.
return [ 'use' => 'production', 'properties' => [ 'production' => [ 'host' => 'localhost', 'port' => 5672, 'username' => '', 'password' => '', 'vhost' => '/', 'connect_options' => [], 'ssl_options' => [], 'exchange' => 'amq.topic', 'exchange_type' => 'topic', 'exchange_passive' => false, 'exchange_durable' => true, 'exchange_auto_delete' => false, 'exchange_internal' => false, 'exchange_nowait' => false, 'exchange_properties' => [], 'queue_force_declare' => false, 'queue_passive' => false, 'queue_durable' => true, // only change when not using quorum queues 'queue_exclusive' => false, 'queue_auto_delete' => false, // only change when not using quorum queues 'queue_nowait' => false, 'queue_properties' => [ 'x-ha-policy' => ['S', 'all'], 'x-queue-type' => ['S', 'quorum'], // 'x-dead-letter-exchange' => ['S', 'amq.topic-dlx'], // if provided an exchange and queue (queue_name-dlx) will be automatically declared // 'x-delivery-limit' => ['I', 5], // the delivery limit will be set on the relevant queue but not the DLX queue itself ], 'queue_acknowledge_is_final' => true, // if important work is done inside a consumer after the acknowledge call, this should be false 'queue_reject_is_final' => true, // if important work is done inside a consumer after the reject call, this should be false 'consumer_tag' => '', 'consumer_no_local' => false, 'consumer_no_ack' => false, 'consumer_exclusive' => false, 'consumer_nowait' => false, 'timeout' => 0, // seconds 'persistent' => false, 'persistent_restart_period' => 0, // seconds 'request_accepted_timeout' => 0.5, // seconds in decimal accepted 'request_handled_timeout' => 5, // seconds in decimal accepted 'qos' => true, 'qos_prefetch_size' => 0, 'qos_prefetch_count' => 1, 'qos_a_global' => false, /* |-------------------------------------------------------------------------- | An example binding set up when declaring exchange and queues |-------------------------------------------------------------------------- |'bindings' => [ | [ | 'queue' => 'example', | 'routing' => 'example.route.key', | ], |], */ ], ], ];
Register the Lumen Service Provider in bootstrap/app.php:
/* |-------------------------------------------------------------------------- | Register Service Providers |-------------------------------------------------------------------------- */ //... $app->configure('amqp'); $app->register(ComLaude\Amqp\LumenServiceProvider::class); //...
Add Facade Support for Lumen 5.2+
//... $app->withFacades(true, [ 'ComLaude\Amqp\Facades\Amqp' => 'Amqp', ]); //...
Laravel
Open config/app.php and add the service provider and alias:
'ComLaude\Amqp\AmqpServiceProvider',
'Amqp' => 'ComLaude\Amqp\Facades\Amqp',
Publishing a message
Push message with routing key
Amqp::publish('routing-key', 'message');
Push message with routing key and overwrite properties
Amqp::publish('routing-key', 'message' , ['exchange' => 'amq.direct']);
Consuming messages
Consume messages forever
Amqp::consume(function ($message) { var_dump($message->body); Amqp::acknowledge($message); });
Consume messages, with custom settings
Amqp::consume(function ($message) { var_dump($message->body); Amqp::acknowledge($message); }, [ 'timeout' => 2, 'vhost' => 'vhost3', 'queue' => 'queue-name', 'persistent' => true // required if you want to listen forever ]);
Fanout example
Publishing a message
Amqp::publish('', 'message' , [ 'exchange_type' => 'fanout', 'exchange' => 'amq.fanout', ]);
Disable publishing
This is useful for development and sync requirements, if you are using observers or events to trigger messages over AMQP you may want to temporarily disable the publishing of messages. When turning the publishing off the publish method will silently drop the message and return.
Check state
if(Amqp::isEnabled()) { // It is going to publish }
Disable
Amqp::disable();
Enable
Amqp::enable();
Remote procedure call server and client
RPC is potentially an anti-pattern in a microservices world so do not use it carelessly, nevertheless sometimes you just need that request-response behaviour and you're willing to accept its limitations. Simply return a response from within a consumer handler, if the message is a request from a client, the response will automatically be routed to the correct requestor. There are 2 configurable timeouts to prevent infinite-blocking waits.
request_accepted_timeout
- time to wait for confirmation from server that a job is being worked on, this is a check if anybody is listening at all and should be quite small
request_handled_timeout
- time to wait for the full request to be completed (all messages), be careful to ensure this is large enough if your job is long-lasting or if the number of messages to be handled is large
Server
Amqp::consume(function ($message) { Amqp::acknowledge($message); return "I handled this message " . $message->getBody(); });
Client
Amqp::request('example.routing.key', [ 'message1', 'message2', ], function ($message) { echo("The remote server said " . $message->getBody()); }); // Or for single message requests you can just do $response = Amqp::requestWithResponse('example.routing.key', 'quickly'); // $response is already the message content as a string "I handled this message quickly"
Consume messages, with dead letter exchange configured
When using the x-dead-letter-exchange
parameter in queue properties the package will additionally:
- declare the <queue_name>-dlx queue
- declare the exchange itself
When the consumer fails or requeues the message for 5 times the message will instead be routed to this new queue via the dead letter exchange.
Amqp::consume(function ($message) { var_dump($message->body); Amqp::acknowledge($message); }, [ 'timeout' => 2, 'vhost' => 'vhost3', 'queue' => 'my-example-queue', 'persistent' => true // required if you want to listen forever 'queue_properties' => [ 'x-ha-policy' => ['S', 'all'], 'x-queue-type' => ['S', 'quorum'], 'x-dead-letter-exchange' => ['S', 'amq.topic-dlx'], // will auto-declare queue named my-example-queue-dlx 'x-delivery-limit' => ['I', 5], // after 5 deliveries the message is routed to my-example-queue-dlx ], ]);
Credits
- Some concepts were used from https://github.com/bschmitt/laravel-amqp
License
This package is open-sourced software licensed under the MIT license