brash-creative / rabbit-queue
A basic package to publish and/or consume AMQP messages via RabbitMQ
Requires
- php: >=7.0
- php-amqplib/php-amqplib: ~2.7
Requires (Dev)
- mockery/mockery: ~1.0
- phpunit/phpunit: ~6.4
This package is auto-updated.
Last update: 2024-12-05 01:44:07 UTC
README
Basic RabbitMQ interface
A basic package to publish and/or consume AMQP messages via RabbitMQ
First, you need to create your queue class, which extends the Rabbit Brash\RabbitQueue\RabbitQueue class with the queue parameter set with your desired queue name.
<?php use Brash\RabbitQueue\RabbitQueue class MyQueue extends RabbitQueue { protected $queue = 'EXAMPLE_QUEUE'; public function __construct(AMQPConnection $connection) { parent::__construct($connection) } public function getQueue(): string { return $this->queue; } }
This class can then be used to push/pull all messages to that queue.
Usage Example - Publish
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use Brash\RabbitQueue\QueueException; try { // AMQPConnection(host, port, username, password) $message = "This is my message"; $amqp = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest'); $publish = new MyQueue($amqp); $publish->push($message); } catch (QueueException $e) { // Catch publish errors } catch (\Exception $e) { // Catch all other errors }
The consumer will retrieve messages from the queue and pass them to the chosen processor.
Processor methods involve passing a callable method, object/method pair or class path constant of an invokable class, e.g.
$consume->pull([$class, 'method']); $consume->pull(function (AMQPMessage $message){ // Some code }) $consume->pull(ExampleConsumer::class);
In the final example, ExampleConsumer class would look like:
For example:
class ExampleConsumer { public function __invoke(AMPQMessage $message) { // Code } }
You can also choose to pull messages down with or without acknowledgement.
Usage Example - Consume (acknowledgement)
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use Brash\RabbitQueue\QueueException; // A class containing a method that the consumer can send the retrieved message body try { $amqp = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest'); $consume = new MyQueue($amqp); $consume->pull(ExampleConsumer::class); // Keep listening to the queue... $consume->poll(); } catch (QueueException $e) { // Catch consume errors } catch (\Exception $e) { // Catch all other errors }
When using this method, you will have to send the acknowledgement after the message has been processed in the method you defined.
In this example...
<?php use PhpAmqpLib\Message\AMQPMessage; class ExampleConsumer { public function __invoke(AMQPMessage $message) { $body = $message->body; // Do something with the message $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } }
Alternatively, you can extend the included AcknowledgableConsumer abstract class and call the acknowledge
method:
<?php use PhpAmqpLib\Message\AMQPMessage; use Brash\RabbitQueue\AcknowledgableConsumer; class ExampleConsumer extends AcknowledgableConsumer { public function __invoke(AMQPMessage $message) { $body = $message->body; // Do something with the message $this->acknowledge($message); } }
Usage Example - Consume (no acknowledgement)
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use Brash\RabbitQueue\QueueException; // A class containing a method that the consumer can send the retrieved message body try { $amqp = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest'); $consume = new MyQueue($amqp); $consume->pullNoAck(ExampleConsumer::class); // Keep listening to the queue... $consume->poll(); } catch (QueueException $e) { // Catch consume errors } catch (\Exception $e) { // Catch all other errors }