byjg / redis-queue-client
A minimal redis queue client using the components of byjg/message-queue-client
Fund package maintenance!
byjg
Installs: 3 312
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 2
Forks: 0
Open Issues: 0
Requires
- php: >=8.1 <8.4
- ext-curl: *
- ext-redis: *
- byjg/message-queue-client: ^5.0
Requires (Dev)
- phpunit/phpunit: ^9.6
- vimeo/psalm: ^5.9
README
It creates a simple abstraction layer to publish and consume messages from the Redis using the component byjg/message-queue-client.
For details on how to use the Message Queue Client see the documentation
Usage
Publish
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(RedisQueueConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("redis://$user:$pass@$host:$port")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Create a message $message = new Message("Hello World"); // Publish the message into the queue $connector->publish(new Envelope($pipe, $message));
Consume
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(RedisQueueConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("redis://$user:$pass@$host:$port")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Connect to the queue and wait to consume the message $connector->consume( $pipe, // Queue name function (Envelope $envelope) { // Callback function to process the message echo "Process the message"; echo $envelope->getMessage()->getBody(); return Message::ACK; }, function (Envelope $envelope, $ex) { // Callback function to process the failed message echo "Process the failed message"; echo $ex->getMessage(); return Message::REQUEUE; } );
The consume method will wait for a message and call the callback function to process the message. If there is no message in the queue, the method will wait until a message arrives.
If you want to exit the consume method, just return Message::ACK | Message::EXIT
from the callback function.
Possible return values from the callback function:
Message::ACK
- Acknowledge the message and remove from the queueMessage::NACK
- Not acknowledge the message and remove from the queue. If the queue has a dead letter queue, the message will be sent to the dead letter queue.Message::REQUEUE
- Requeue the messageMessage::EXIT
- Exit the consume method