Treat AMQP messages as a stream in ReactPHP

1.0 2021-01-31 12:52 UTC

This package is auto-updated.

Last update: 2024-03-29 04:06:10 UTC


Welcome to Toalett, a humble initiative. Toalett is the Norwegian word for toilet 💩.

What is toalett/react-amqp-stream?

This is a libary that allows you to interact with an AMQP message queue as if it were a readable stream in ReactPHP. It's very lightweight - its only dependencies are toalett/react-stream-adapter and php-amqplib/php-amqplib.

The class AMQPSource implements the Toalett\React\Stream\Source interface from toalett/react-stream-adapter. It needs an instance of PhpAmqpLib\Channel\AMQPChannel and the name of the queue to read from. You may provide additional options for the call to AMQPChannel::basic_consume() by passing in an Options object as the third parameter of the constructor.

The Toalett\React\Stream\StreamAdapter wraps a Source and makes it approachable as a readable stream in an event loop.


It is available on Packagist:

composer require toalett/react-amqp-stream


Note: this motivation is the same as given in toalett/react-stream-adapter.

I was working on a project that required an application to respond to AMQP messages in a non-blocking way. The application made use of an event loop. Initially I used a periodic timer with a callback, but as the application grew this became a cluttered mess. It slowly started to feel more natural to treat the message queue as a stream. This makes sense if you think about it:

In computer science, a stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.

Stream (computing) on Wikipedia

This definition suits a message queue.

In the project I mentioned earlier, I use this library to poll an AMQP queue every 10 seconds. This keeps my load low and allows me to do other things in the meantime. This abstraction turned out really useful, so I thought that others might enjoy it too.

How do I use this?

The library tries to stay out of your way as much as possible. Going from an AMQP connection to a readable stream only takes a few lines of code! 😀

  1. Create a connection to an AMQP host with PhpAmqpLib, and grab a channel.
  2. Pass the channel instance and the name of the queue to consume, optionally with an instance of Options, to the constructor of AMQPSource.
  3. Create an event loop using the React\EventLoop\Factory.
  4. Create a StreamAdapter, and pass the AMQPSource and the event loop to the constructor.
  5. Interact with the adapter as if it were any other ReadableInputStream.

Let us see this in action with two examples.

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use React\EventLoop\Factory as EventLoopFactory;
use Toalett\React\AMQP\AMQPSource;
use Toalett\React\Stream\StreamAdapter;

$channel = (new AmqpStreamConnection(/* ... */))->channel();
$queueName = '';
$amqpSource = new AMQPSource($channel, $queueName);

$eventLoop = EventLoopFactory::create();
$stream = new StreamAdapter($amqpSource, $eventLoop);

$stream->on('data', fn(AMQPMessage $m) => /* ... */);
$stream->on('error', fn(RuntimeException $e) => /* ... */);


As you can see, it takes only 2 SLOC to go from an AMQP connection or channel to a readable stream.

The Options class provides a way to pass arguments to the call to AMQPChannel::basic_consume. Once you passed the Options instance to the constructor of AMQPSource, you lose the ability to change them; the Options instance is cloned to prevent unexpected behaviour.

use Toalett\React\AMQP\AMQPSource;
use Toalett\React\AMQP\Options;

// ...
$options = (new Options)

$amqpSource = new AMQPSource($channel, $queueName, $options);
// ...

If you don't provide a consumer tag, the server will assign one. You can retrieve this consumer tag from an AMQPSource with getConsumerTag().

Check out the examples folder for some simple implementations. They are not much different than the ones given in this readme. Feel free to play around with them.

Note: There are some concessions with regards to latency due to the fact that the stream adapter library uses polling under the hood. Please refer to toalett/react-stream-adapter for more information about this subject.


Q: How do I handle stream errors?
A: The RuntimeException that is passed to the on('error', ...) callback contains the exception that was actually thrown by the Source. Calling getPrevious() on the RuntimeException gives you the original exception.

Q: Where are the tests?
A: Tests might be added later. Feel free to create an issue if the lack of tests bothers you!