Use any data source as a stream with ReactPHP

1.0 2021-01-06 17:11 UTC

This package is auto-updated.

Last update: 2024-06-07 01:12:18 UTC


Welcome to Toalett, a humble initiative. Toalett is the Norwegian word for toilet 💩.

What is toalett/react-stream-adapter?

It is a library that allows any datasource to be used as a stream with ReactPHP. It is very small - there is one interface, one class and one trait. Its only dependency is react/stream.

The StreamAdapter takes an implementation of the Source interface and makes it approachable as a stream in applications using an event loop.


It is available on Packagist:

composer require toalett/react-stream-adapter


I was working on a project that required an application to respond to AMQP messages in a non-blocking way. The application made use of an event loop. Initially I used a periodic timer with a callback, but as the application grew this became a cluttered mess. It slowly started to feel more natural to treat the message queue as a stream. This makes sense if you think about it:

In computer science, a stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.

Stream (computing) on Wikipedia

This definition suits a message queue.

In the project I mentioned earlier, I use this library to poll an AMQP queue every 10 seconds. This keeps my load low and allows me to do other things in the meantime. This abstraction turned out really useful, so I thought that others might enjoy it too.

How do I use this?

There are only three components to worry about, and one of them is optional! The main components are the Source interface and the StreamAdapter class. The optional component is the EndlessTrait, which can be used in an endless source.

The steps you need to take to use this library are as follows:

  1. Define a class that is able to generate or provide some data. It must implement the Source interface.
  2. The select() method is called periodically. This is where you return your next piece of data. Make sure the select() method returns anything that is not null when data is available (anything goes) or null when there is none. You may add a typehint to your implementation of select() such as select(): ?string or select(): ?MyData for improved clarity.
  3. The interface also specifies the close(): void and eof(): bool methods. In an endless (infinite) stream, close() may be left empty and eof() should return false (EOF is never reached). The EndlessTrait provides these implementations.
  4. Use the StreamAdapter to attach your Source to the loop.
  5. Interact with the adapter as if it were any other ReadableInputStream.

Note: This library uses polling under the hood. The default polling interval is 0.5 seconds, though if checking for data is an intensive operation, you might want to increase the interval a bit to prevent slowdowns. This is a tradeoff between responsiveness and load. Custom intervals can be set by passing them as a third argument to the StreamAdapter constructor.

Note: The StreamAdapter reads data eagerly from the source - it won't stop reading until there is nothing left to read. This prevents congestion when high polling intervals are used but it might block execution for a while when there is a lot of data to be read or if your select() routine takes some time.

$loop = Factory::create();

$source = new MySource(); // implements Source
$stream = new StreamAdapter($source, $loop);
$stream->on('data', function(MyData $data) {
    /* do something with data */


Check out the examples folder for some simple implementations.


Q: Where is the code that deals with AMQP messages?
A: It will be released in a separate package, but it needs some work before it can be published.

Q: Where are the tests?
A: There is only one class, and it is mostly based on the ReadableResourceStream from react/stream. Tests might be added later, but as of now, I don't really see the value. Feel free to create an issue if this bothers you!