toalett / react-amqp-stream
Treat AMQP messages as a stream in ReactPHP
Requires
- php: ^7.4
- php-amqplib/php-amqplib: ^2.0
- toalett/react-stream-adapter: ^1.0
README
Welcome to Toalett, a humble initiative. Toalett is the Norwegian word for toilet 💩.
What is toalett/react-amqp-stream
?
This is a libary that allows you to interact with an AMQP message queue as if it were a readable stream in ReactPHP.
It's very lightweight - its only dependencies
are toalett/react-stream-adapter
and php-amqplib/php-amqplib
.
The class AMQPSource
implements the Toalett\React\Stream\Source
interface
from toalett/react-stream-adapter
. It needs an instance
of PhpAmqpLib\Channel\AMQPChannel
and the name of the queue to read from. You may provide additional options for the
call to AMQPChannel::basic_consume()
by passing in an Options
object as the third parameter of the constructor.
The Toalett\React\Stream\StreamAdapter
wraps a Source
and makes it approachable as a
readable stream in an event loop.
Installation
It is available on Packagist:
composer require toalett/react-amqp-stream
Motivation
Note: this motivation is the same as given
in toalett/react-stream-adapter
.
I was working on a project that required an application to respond to AMQP messages in a non-blocking way. The application made use of an event loop. Initially I used a periodic timer with a callback, but as the application grew this became a cluttered mess. It slowly started to feel more natural to treat the message queue as a stream. This makes sense if you think about it:
In computer science, a stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.
This definition suits a message queue.
In the project I mentioned earlier, I use this library to poll an AMQP queue every 10 seconds. This keeps my load low and allows me to do other things in the meantime. This abstraction turned out really useful, so I thought that others might enjoy it too.
How do I use this?
The library tries to stay out of your way as much as possible. Going from an AMQP connection to a readable stream only takes a few lines of code! 😀
- Create a connection to an AMQP host with PhpAmqpLib, and grab a channel.
- Pass the channel instance and the name of the queue to consume, optionally with an instance
of
Options
, to the constructor ofAMQPSource
. - Create an event loop using the
React\EventLoop\Factory
. - Create a
StreamAdapter
, and pass theAMQPSource
and the event loop to the constructor. - Interact with the adapter as if it were any other
ReadableInputStream
.
Let us see this in action with two examples.
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use React\EventLoop\Factory as EventLoopFactory; use Toalett\React\AMQP\AMQPSource; use Toalett\React\Stream\StreamAdapter; $channel = (new AmqpStreamConnection(/* ... */))->channel(); $queueName = 'my-app.work-queue'; $amqpSource = new AMQPSource($channel, $queueName); $eventLoop = EventLoopFactory::create(); $stream = new StreamAdapter($amqpSource, $eventLoop); $stream->on('data', fn(AMQPMessage $m) => /* ... */); $stream->on('error', fn(RuntimeException $e) => /* ... */); $eventLoop->run();
As you can see, it takes only 2 SLOC to go from an AMQP connection or channel to a readable stream.
The Options
class provides a way to pass arguments to the call to AMQPChannel::basic_consume
.
Once you passed the Options
instance to the constructor of AMQPSource
, you
lose the ability to change them; the Options
instance is cloned to prevent unexpected behaviour.
use Toalett\React\AMQP\AMQPSource; use Toalett\React\AMQP\Options; // ... $options = (new Options) ->setConsumerTag('worker.1') ->setNoAck(true); $amqpSource = new AMQPSource($channel, $queueName, $options); // ...
If you don't provide a consumer tag, the server will assign
one. You can retrieve this consumer tag from an AMQPSource
with getConsumerTag()
.
Check out the examples folder for some simple implementations. They are not much different than the ones given in this readme. Feel free to play around with them.
Note: There are some concessions with regards to latency due to the fact that the stream adapter library uses polling
under the hood. Please refer
to toalett/react-stream-adapter
for more information
about this subject.
Questions
Q: How do I handle stream errors?
A: The RuntimeException that is passed to the on('error', ...)
callback contains the exception that was actually
thrown by the Source
. Calling getPrevious()
on the RuntimeException gives you the original exception.
Q: Where are the tests?
A: Tests might be added later. Feel free to create an issue if the lack of tests bothers you!