Treat AMQP messages as a stream in ReactPHP
Open Issues: 0
Welcome to Toalett, a humble initiative. Toalett is the Norwegian word for toilet 💩.
This is a libary that allows you to interact with an AMQP message queue as if it were a readable stream in ReactPHP.
It's very lightweight - its only dependencies
AMQPSource implements the
toalett/react-stream-adapter. It needs an instance
PhpAmqpLib\Channel\AMQPChannel and the name of the queue to read from. You may provide additional options for the
AMQPChannel::basic_consume() by passing in an
Options object as the third parameter of the constructor.
Toalett\React\Stream\StreamAdapter wraps a
Source and makes it approachable as a
readable stream in an event loop.
It is available on Packagist:
composer require toalett/react-amqp-stream
Note: this motivation is the same as given
I was working on a project that required an application to respond to AMQP messages in a non-blocking way. The application made use of an event loop. Initially I used a periodic timer with a callback, but as the application grew this became a cluttered mess. It slowly started to feel more natural to treat the message queue as a stream. This makes sense if you think about it:
In computer science, a stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.
This definition suits a message queue.
In the project I mentioned earlier, I use this library to poll an AMQP queue every 10 seconds. This keeps my load low and allows me to do other things in the meantime. This abstraction turned out really useful, so I thought that others might enjoy it too.
How do I use this?
The library tries to stay out of your way as much as possible. Going from an AMQP connection to a readable stream only takes a few lines of code! 😀
- Create a connection to an AMQP host with PhpAmqpLib, and grab a channel.
- Pass the channel instance and the name of the queue to consume, optionally with an instance
Options, to the constructor of
- Create an event loop using the
- Create a
StreamAdapter, and pass the
AMQPSourceand the event loop to the constructor.
- Interact with the adapter as if it were any other
Let us see this in action with two examples.
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use React\EventLoop\Factory as EventLoopFactory; use Toalett\React\AMQP\AMQPSource; use Toalett\React\Stream\StreamAdapter; $channel = (new AmqpStreamConnection(/* ... */))->channel(); $queueName = 'my-app.work-queue'; $amqpSource = new AMQPSource($channel, $queueName); $eventLoop = EventLoopFactory::create(); $stream = new StreamAdapter($amqpSource, $eventLoop); $stream->on('data', fn(AMQPMessage $m) => /* ... */); $stream->on('error', fn(RuntimeException $e) => /* ... */); $eventLoop->run();
As you can see, it takes only 2 SLOC to go from an AMQP connection or channel to a readable stream.
Options class provides a way to pass arguments to the call to
Once you passed the
Options instance to the constructor of
lose the ability to change them; the
Options instance is cloned to prevent unexpected behaviour.
use Toalett\React\AMQP\AMQPSource; use Toalett\React\AMQP\Options; // ... $options = (new Options) ->setConsumerTag('worker.1') ->setNoAck(true); $amqpSource = new AMQPSource($channel, $queueName, $options); // ...
If you don't provide a consumer tag, the server will assign
one. You can retrieve this consumer tag from an
Check out the examples folder for some simple implementations. They are not much different than the ones given in this readme. Feel free to play around with them.
Note: There are some concessions with regards to latency due to the fact that the stream adapter library uses polling
under the hood. Please refer
toalett/react-stream-adapter for more information
about this subject.
Q: How do I handle stream errors?
A: The RuntimeException that is passed to the
on('error', ...) callback contains the exception that was actually
thrown by the
getPrevious() on the RuntimeException gives you the original exception.
Q: Where are the tests?
A: Tests might be added later. Feel free to create an issue if the lack of tests bothers you!