bongrun / rsmq
Redis Simple Message Queue.
Requires
- php: ^7.4|^8.0
- ext-mbstring: *
- predis/predis: ^1.1
Requires (Dev)
- mockery/mockery: ^1.3
- phpstan/phpstan: ^0.11.12 || ^0.12.0 || ^1.0.0
- phpunit/phpunit: ^8.3 || ^9.0
- roave/security-advisories: dev-master
- squizlabs/php_codesniffer: ^3.5
- vimeo/psalm: ^3.12 || ^4.0
- dev-master
- 3.0.1
- 3.0.0
- 2.0.1
- 2.0.0
- 1.2.0
- 1.0.2
- 1.0.1
- 1.0.0
- 0.2.1
- 0.2.0
- v0.1.0
- dev-dependabot/composer/phpstan/phpstan-1.5.3
- dev-dependabot/npm_and_yarn/minimist-1.2.6
- dev-dependabot/composer/phpunit/phpunit-9.5.19
- dev-dependabot/composer/vimeo/psalm-4.22.0
- dev-dependabot/composer/predis/predis-1.1.10
- dev-cross-client-tests
This package is not auto-updated.
Last update: 2025-01-04 02:27:12 UTC
README
A lightweight message queue for PHP that requires no dedicated queue server. Just a Redis server. See smrchy/rsmq for more information.
This is a fork of eislambey/php-rsmq with the following changes:
- Uses predis instead of the Redis extension
- Has some OO wrappers for QueueAttributes and Message
- Provides a simple QueueWorker
Table of Contents
Installation
composer require andrewbreksa/rsmq
Methods
Construct
Creates a new instance of RSMQ.
Parameters:
$predis
(\Predis\ClientInterface): *required The Predis instance$ns
(string): optional (Default: "rsmq") The namespace prefix used for all keys created by RSMQ$realtime
(Boolean): optional (Default: false) Enable realtime PUBLISH of new messages
Example:
<?php use Predis\Client; use AndrewBreksa\RSMQ\RSMQClient; $predis = new Client( [ 'host' => '127.0.0.1', 'port' => 6379 ] ); $this->rsmq = new RSMQClient($predis);
Queue
createQueue
Create a new queue.
Parameters:
$name
(string): The Queue name. Maximum 160 characters; alphanumeric characters, hyphens (-), and underscores (_) are allowed.$vt
(int): optional (Default: 30) The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)$delay
(int): optional (Default: 0) The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)$maxsize
(int): optional (Default: 65536) The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)
Returns:
true
(Bool)
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueAlreadyExistsException
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $rsmq->createQueue('myqueue');
listQueues
List all queues
Returns an array:
["qname1", "qname2"]
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $queues = $rsmq->listQueues();
deleteQueue
Deletes a queue and all messages.
Parameters:
$name
(string): The Queue name.
Returns:
true
(Bool)
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $rsmq->deleteQueue('myqueue');
getQueueAttributes
Get queue attributes, counter and stats
Parameters:
$queue
(string): The Queue name.
Returns a \AndrewBreksa\RSMQ\QueueAttributes
object with the following properties:
vt
(int): The visibility timeout for the queue in secondsdelay
(int): The delay for new messages in secondsmaxSize
(int): The maximum size of a message in bytestotalReceived
(int): Total number of messages received from the queuetotalSent
(int): Total number of messages sent to the queuecreated
(float): Timestamp (epoch in seconds) when the queue was createdmodified
(float): Timestamp (epoch in seconds) when the queue was last modified withsetQueueAttributes
messageCount
(int): Current number of messages in the queuehiddenMessageCount
(int): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to avt
parameter or when sent with adelay
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $attributes = $rsmq->getQueueAttributes('myqueue'); echo "visibility timeout: ", $attributes->getVt(), "\n"; echo "delay for new messages: ", $attributes->getDelay(), "\n"; echo "max size in bytes: ", $attributes->getMaxSize(), "\n"; echo "total received messages: ", $attributes->getTotalReceived(), "\n"; echo "total sent messages: ", $attributes->getTotalSent(), "\n"; echo "created: ", $attributes->getCreated(), "\n"; echo "last modified: ", $attributes->getModified(), "\n"; echo "current n of messages: ", $attributes->getMessageCount(), "\n"; echo "hidden messages: ", $attributes->getHiddenMessageCount(), "\n";
setQueueAttributes
Sets queue parameters.
Parameters:
$queue
(string): The Queue name.$vt
(int): optional * The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)$delay
(int): optional The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)$maxsize
(int): optional The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)
Note: At least one attribute (vt, delay, maxsize) must be supplied. Only attributes that are supplied will be modified.
Returns a \AndrewBreksa\RSMQ\QueueAttributes
object with the following properties:
vt
(int): The visibility timeout for the queue in secondsdelay
(int): The delay for new messages in secondsmaxSize
(int): The maximum size of a message in bytestotalReceived
(int): Total number of messages received from the queuetotalSent
(int): Total number of messages sent to the queuecreated
(float): Timestamp (epoch in seconds) when the queue was createdmodified
(float): Timestamp (epoch in seconds) when the queue was last modified withsetQueueAttributes
messageCount
(int): Current number of messages in the queuehiddenMessageCount
(int): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to avt
parameter or when sent with adelay
Throws:
\AndrewBreksa\RSMQ\QueueAttributes
\AndrewBreksa\RSMQ\QueueParametersValidationException
\AndrewBreksa\RSMQ\QueueNotFoundException
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $queue = 'myqueue'; $vt = 50; $delay = 10; $maxsize = 2048; $rsmq->setQueueAttributes($queue, $vt, $delay, $maxsize);
Messages
sendMessage
Sends a new message.
Parameters:
$queue
(string)$message
(string)$delay
(int): optional (Default: queue settings) The time in seconds that the delivery of the message will be delayed. Allowed values: 0-9999999 (around 115 days)
Returns:
$id
(string): The internal message id.
Throws:
\AndrewBreksa\RSMQ\Exceptions\MessageToLongException
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $id = $rsmq->sendMessage('myqueue', 'a message'); echo "Message Sent. ID: ", $id;
receiveMessage
Receive the next message from the queue.
Parameters:
$queue
(string): The Queue name.$vt
(int): optional (Default: queue settings) The length of time, in seconds, that the received message will be invisible to others. Allowed values: 0-9999999 (around 115 days)
Returns a \AndrewBreksa\RSMQ\Message
object with the following properties:
message
(string): The message's contents.id
(string): The internal message id.sent
(int): Timestamp of when this message was sent / created.firstReceived
(int): Timestamp of when this message was first received.receiveCount
(int): Number of times this message was received.
Note: Will return an empty array if no message is there
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $message = $rsmq->receiveMessage('myqueue'); echo "Message ID: ", $message->getId(); echo "Message: ", $message->getMessage();
deleteMessage
Parameters:
$queue
(string): The Queue name.$id
(string): message id to delete.
Returns:
true
if successful,false
if the message was not found (bool).
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $id = $rsmq->sendMessage('queue', 'a message'); $rsmq->deleteMessage('queue', $id);
popMessage
Receive the next message from the queue and delete it.
Important: This method deletes the message it receives right away. There is no way to receive the message again if something goes wrong while working on the message.
Parameters:
$queue
(string): The Queue name.
Returns a \AndrewBreksa\RSMQ\Message
object with the following properties:
message
(string): The message's contents.id
(string): The internal message id.sent
(int): Timestamp of when this message was sent / created.firstReceived
(int): Timestamp of when this message was first received.receiveCount
(int): Number of times this message was received.
Note: Will return an empty object if no message is there
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $message = $rsmq->popMessage('myqueue'); echo "Message ID: ", $message->getId(); echo "Message: ", $message->getMessage();
changeMessageVisibility
Change the visibility timer of a single message. The time when the message will be visible again is calculated from the
current time (now) + vt
.
Parameters:
qname
(string): The Queue name.id
(string): The message id.vt
(int): The length of time, in seconds, that this message will not be visible. Allowed values: 0-9999999 (around 115 days)
Returns:
true
if successful,false
if the message was not found (bool).
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
Example:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $queue = 'myqueue'; $id = $rsmq->sendMessage($queue, 'a message'); if($rsmq->changeMessageVisibility($queue, $id, 60)) { echo "Message hidden for 60 secs"; }
QueueWorker
The QueueWorker class provides an easy way to consume RSMQ messages, to use it:
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ use AndrewBreksa\RSMQ\ExecutorInterface; use AndrewBreksa\RSMQ\Message; use AndrewBreksa\RSMQ\QueueWorker; use AndrewBreksa\RSMQ\WorkerSleepProvider; $executor = new class() implements ExecutorInterface{ public function __invoke(Message $message) : bool { //@todo: do some work, true will ack/delete the message, false will allow the queue's config to "re-publish" return true; } }; $sleepProvider = new class() implements WorkerSleepProvider{ public function getSleep() : ?int { /** * This allows you to return null to stop the worker, which can be used with something like redis to mark. * * Note that this method is called _before_ we poll for a message, and therefore if it returns null we'll eject * before we process a message. */ return 1; } }; $worker = new QueueWorker($rsmq, $executor, $sleepProvider, 'test_queue'); $worker->work(); // here we can optionally pass true to only process one message
LICENSE
The MIT LICENSE. See LICENSE