m6web / redis-message-broker
message broker component backed with redis
Installs: 31 317
Dependents: 0
Suggesters: 0
Security: 0
Stars: 10
Watchers: 49
Forks: 0
Open Issues: 1
Requires
- php: >=7.1
- predis/predis: ^1.1.1
Requires (Dev)
- atoum/atoum: ~3.0.0
- m6web/php-cs-fixer-config: ^1.0
- m6web/redis-mock: ^3.3.1
README
This component will help you to build a messages brocker system over a redis backend. It will take advantage of the redis cluster capabilities with the possibility to shard messages into several redis lists while producing messages. Consumer, in no auto-ack mode, implements a working list to be sure not to loose any messages when processing fail.
By design, producing is super fast (one Redis command) and consuming can be slow if you want to ack messages manually.
You should use it with Redis >= 2.8.
usage
producer
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $message = new RedisMessageBroker\MessageEnvelope(uniqid(), 'un message'); $producer = new RedisMessageBroker\MessageHandler\Producer($queue, $redisClient); $producer->publishMessage($message);
consumer
Consumer should be wrapped in a worker. A unique Id should be pass to the consumer constructor. If you work with a worker, uniqId has to be constant per worker.
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $consumer = new RedisMessageBroker\MessageHandler\Consumer($queue, $redisClient, uniqid()); $message = $consumer->getMessageEnvelope();
inspector
Inspector methods allow you to count the messages in ready or processing in a queue.
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $inspector = new RedisMessageBroker\Queue\Inspector($queue, $redisClient); $countInProgress = $inspector->countInProgressMessages(); $countReady = $inspector->countReadyMessages();
cleanup
Cleanup methods let you perform a cleanup in the message queue. Cleanup is very slow as all the message in the queue will be scanned.
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $cleanup = new RedisMessageBroker\Queue\Cleanup($queue, $redisClient); $cleanup->cleanOldMessages( 3600, // erase messages older than 3600 seconds true // clean message in the ready queue too. Mandatory use if you are in no-autoack mode );
queue option
To avoid hotpsots you can shard a queue on several lists :
$queue = new RedisMessageBroker\Queue\Definition('raoul', 10); // shard on 10 lists
In this mode, messages will be written and read among the 10 lists. FIFO is no more guaranteed.
consumer options
manual message acknowledgment
with setNoAutoAck()
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $consumer = new RedisMessageBroker\MessageHandler\Consumer($queue, $redisClient, uniqid()); $consumer->setNoAutoAck(); $message = $consumer->getMessageEnvelope(); if ($message) { // do something with the message $consumer->ack($message); // erase the message from the working list }
look for old messages not acked by consumers
Each consumer got an unique Id defined during the construction of the object. This Id allow the consumer to define a unique working list where a message is stored between the getMessage
and the ack
.
Is it possible to use LostMessageConsumer class to look on consumer working lists and move message more than x second old (messageTtl
parameter) from those lists to a queue list.
The maxRetry
parameter will put a message in a dead letter list when the retry number is reached.
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $consumer = new RedisMessageBroker\MessageHandler\LostMessagesConsumer($queue, $redisClient, 360, 3); $consumer->requeueOldMessages();
messageTtl
parameter need to be superior of your max time to process a message. (Otherwise it will consider a message old when its processing)