Async Stream for RxPHP

3.0.2 2020-05-05 14:08 UTC

This package is auto-updated.

Last update: 2021-01-05 15:37:45 UTC


Provides RxPHP Observables for PHP streams

This library is a wrapper around the ReactPHP stream library. It uses the Voryx event-loop which behaves like the Javascript event-loop. ie. You don't need to start it.


From File

    $source = new \Rx\React\FromFileObservable("example.csv");
        ->cut() //Cut the stream by PHP_EOL
        ->map('str_getcsv') //Convert csv row to an array
        ->map(function (array $row) {
            //Strip numbers from the first field
            $row[0] = preg_replace('/\d+/u', '', $row[0]);
            return $row;
            function ($data) {
                echo $data[0] . "\n";
            function ($e) {
                echo "error\n";
            function () {
                echo "done\n";

Read and Write to File

$source = new \Rx\React\FromFileObservable("source.txt");
$dest   = new \Rx\React\ToFileObserver("dest.txt");

    ->filter(function ($row) {
        return strpos($row, 'foo');
    ->map(function ($row) {
        return $row . 'bar';

Stream - echo example

$read  = new \Rx\React\StreamSubject(STDIN);

    ->takeWhile(function ($x) {
        return trim($x) != 15;
    ->subscribe(new \Rx\React\StreamSubject(STDOUT));