rx / stream
Async Stream for RxPHP
Installs: 2 277
Dependents: 1
Suggesters: 0
Security: 0
Stars: 3
Watchers: 3
Forks: 2
Open Issues: 0
Requires
- php: ^7.0
- react/stream: ^1.0 || ^0.7.1
- reactivex/rxphp: ^2.0
- rx/operator-extras: ^2.0
- voryx/event-loop: ^3.0 || ^2.0
Requires (Dev)
- phpunit/phpunit: ^5.7
README
Provides RxPHP Observables for PHP streams
This library is a wrapper around the ReactPHP stream library. It uses the Voryx event-loop which behaves like the Javascript event-loop. ie. You don't need to start it.
Usage
From File
$source = new \Rx\React\FromFileObservable("example.csv"); $source ->cut() //Cut the stream by PHP_EOL ->map('str_getcsv') //Convert csv row to an array ->map(function (array $row) { //Strip numbers from the first field $row[0] = preg_replace('/\d+/u', '', $row[0]); return $row; }) ->subscribe( function ($data) { echo $data[0] . "\n"; }, function ($e) { echo "error\n"; }, function () { echo "done\n"; } );
Read and Write to File
$source = new \Rx\React\FromFileObservable("source.txt"); $dest = new \Rx\React\ToFileObserver("dest.txt"); $source ->cut() ->filter(function ($row) { return strpos($row, 'foo'); }) ->map(function ($row) { return $row . 'bar'; }) ->subscribe($dest);
Stream - echo example
$read = new \Rx\React\StreamSubject(STDIN); $read ->takeWhile(function ($x) { return trim($x) != 15; }) ->subscribe(new \Rx\React\StreamSubject(STDOUT));