Rx connector for PHP


License Latest Stable Version Total Downloads Latest Unstable Version composer.lock

RxPhp is a great work that brings us Reactive programming : asynchronous programming for human being.
You can play with reactiveX on, find all the available operators on the official website or read an interesting introduction.

RxNet is an effort to bring it battery included.

Thanks to react/react, its marvelous reactor pattern and all work done with it, many are just simple wrappers.


With composer : domraider/rxnet

Or just clone the lib, run composer install and try the examples scripts.

Why one repository for all ? Because when you start using it you want every libraries to be RxFriendly :)


Asynchronous DNS resolver. Thanks to daverandom/libdns parser it was a breeze to code.

No extra extensions are needed

$dns = new Dns();
// Procedural way
echo Rx\awaitOnce($dns->resolve('', ''));

// All types of queries are allowed
  ->subscribe(new StdoutObserver());


HTTP client with all ReactiveX sweet

No extra extensions are needed

$http = new Http();
  // Timeout after 0.3s
  // will retry 2 times on error 
  // Transform response to something else
  ->map(function(Psr\Http\Message\ResponseInterface $response) {
  	$body = (string) $response->getBody();
	// Domcrawler to extract commits
    return $body;
  ->subscribe(new StdoutObserver());

// All the given options
$opts = [
  // No buffering, you will receive chunks has they arrived
  // Perfect for big files to parse or streaming json
  'stream' => true,
  // You can use body, json or form_params
  // * json will add the header and json_encode
  // * form_params will build query in body and add application/x-www-form-urlencoded header
  'body' => 'raw body for post',
  'json' => ['my'=>'parameters', 'they-will->be'=>'json'],
  'form_param' => ['param_0'=>'value_0', 'param_1'=>'value_1'],
  // Set query string from here not the url
  'query'=> [
  // Use a proxy
  'proxy' => 'http://user:password@myproxy:8080',
  // Append extra headers
  'headers' => [
    'Authorization' => 'Basic '.base64_encode('user:password'),
    // Specify user-agent to use
    'User-Agent' => 'SuperParser/0.1',
  // see
  // Add whatever option you want on your https query
  'ssl' => [
    'verify_peer' => false
  // allow redirect
  'allow_redirects' => true,
  // or
  'allow_redirects' => [
    // max redirects to follow
    'max' => 10

$http->post('', $opts)
  ->subscribeCallback(function($chunk) {
    // let's give it to expat while it arrives


[ ] Psr7 request/response

[ ] multipart

[ ] gzip/deflate


Standalone HTTP server based on react/http implements nikic/fast-route as default router

No extra extensions are needed

$httpd = new HttpD();
$httpd->route('GET', '/', function(HttpdRequest $request, HttpdResponse $response) {
$httpd->route('POST', '/{var}', function(HttpdRequest $request, HttpdResponse $response) {
  $var = $request->getRouteParam('var');

Performances on a macbook pro are around 500 msg/s for one php process.

Remember that it does not need any fpm to run. And that default fpm configuration is with 10 childs.


[ ] Psr7 Request / Response

[ ] gzip / deflate

[ ] http2

[ ] multipart ?

[ ] ssl ? :D


Wrapper from jakubkulhan/bunny that works just fine

No extra extensions are needed


$rabbit = new RabbitMq('rabbit://guest:guest@', new Serialize());
// Wait for rabbit to be connected

// Will wait for messages
    ->subscribeCallback(function (RabbitMessage $message) use ($debug, $rabbit) {
        echo '.';
        $data = $message->getData();
        $name = $message->getName();
        $head = $message->getLabels();
        // Do what you want but do one of this to get next


$queue = $rabbit->queue('test_queue', []);
$exchange = $rabbit->exchange('');

      // Declare all the binding
        $queue->bind('/routing/key', ''),
        $exchange->create($exchange::TYPE_DIRECT, [
    // Everything's done let's produce
    ->subscribeCallback(function () use ($exchange, $loop) {
        $done = 0;
		// Just a simple array
        \Rx\Observable::just(['id' => 2, 'foo' => 'bar'])
            // Wait for one produce to be done before starting another
            ->flatMap(function ($data) use ($exchange) {
                // Rabbit will handle serialize and unserialize
                return $exchange->produce($data, '/routing/key');
            // Produce it many times
            // Let's get some stats
                function () use (&$done) { $done++;}, 
                function (\Exception $e) { echo "shit happens : ".$e->getMessage();}, 
                function () use (&$done, $loop) { echo number_format($done)." lines produced"; }


[ ] add all possible options has constant


Wrapper from clue/redis (great work !)

No extra extensions are needed

$redis = new Redis();

// Wait for redis to be ready
$redis = RxNet\awaitOnce($redis->connect(''));

  ->subscribe(new StdoutObserver());
// Every redis operators return an observable
// And they are all implemented


Message exchange through tcp (or ipc or inproc).

Needs Pecl ZMQ extension to work

Router/dealer is a both direction communication. Dealer will wait for the router, router will not, so dealer has to start first

$zmq = new ZeroMq(new MsgPack());
// Connect to the router with my identity
$dealer = $zmq->dealer('tcp://', 'Roger');
// Display output
$dealer->subscribeCallback(new StdoutObserver())	
// And start
$dealer->send(new PingCommand('ping'));
// Bind and wait
$router = $zmq->router('tcp://');
// Show received message and wait 0.1s to answer
$router->doOnEach(new StdOutObserver())
  ->subscribeCallback(function($data) use($router) {
  	$router->send(new ReceivedEvent('pong'), 'Roger');

Different protocols

You can do push/pull, req/rep, read ØMQ - The Guide to see what network models fits you.

5k to 10k msg/s in router dealer. 30k msg/s in push pull.


[ ] pub/sub


InfluxDB client based on influxdata/influxdb-php It only supports UDP protocol for the moment (write only). Our primary goal was to have a non blocking client to send metrics.


Statsd client based on this gist and php-datadogstatsd for tagging support.

$statsd->gauge("database.connections", 42)
  ->subscribe(new StdOutObserver(), new EventLoopScheduler($loop));


The mysql client uses mysqli.

$conn = new Rxnte\Mysql\Connection([
    'host' => 'localhost',
    'user' => 'myUser',
    'password' => 'myPasswd',
    'database' => 'myDb'

$conn->query('SELECT NOW()');

$conn->transaction(['SELECT NOW()']);



Classic procedural is always possible but take care of side effects

$observable = $http->get('')
// Loop continue to go forward during await
$response = Rxnet\awaitOnce($observable);
// but this echo will wait before running
echo "1";


Using rx/await you can transform you observable to a generator

$source = \Rx\Observable::interval(1000)
    ->take(5); //Limit items to 5

$generator = \Rx\await($source);

foreach ($generator as $item) {
    echo $item, PHP_EOL;
echo "DONE";

On demand

// Great to read gigabytes without memory leaks
$reader = new \Rxnet\OnDemand\OnDemandFileReader("./test.csv");
        function ($row) use ($reader) {
            echo "got row : {$row}\n";
            // read next octet
        function() {
            echo "------------------\n";
            echo "read completed\n";


$backPressure = new \Rxnet\Operator\OnBackPressureBuffer(
    5, // Buffer capacity 
    function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable) 
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow

    ->doOnNext(function($i) {
        echo "produce {$i} ";
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i)
    ->doOnNext([$backPressure, 'request'])
    ->subscribe($stdout, $scheduler);


if consuming is slower than producing, next element will be written to file in givent folder.

On start, read buffer's path to search for existing and un-consumed events

$backPressure = new \Rxnet\Operator\OnBackPressureBufferFile(
    './', // Folder to write files
    new MsgPack(), // Serializer to use
    -1, // Buffer capacity, -1 for unlimited
    function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable) 
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow

    ->doOnNext(function($i) {
        echo "produce {$i} ";
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i)
    ->doOnNext([$backPressure, 'request'])
    ->subscribe($stdout, $scheduler);