innmind/stream

Wrapper to PHP stream functions

4.2.0 2023-09-16 15:20 UTC

README

Build Status codecov Type Coverage

Simple wrapper to work with resource streams.

Installation

composer require innmind/stream

Usage

File handling:

use Innmind\Stream\Streams;
use Innmind\Url\Path;

$file = Streams::of()->readable()->open(Path::of('/some/path/to/a/file'));

while (!$file->end()) {
    echo $file->readLine()->match(
        static fn($line) => $line->toString(),
        static fn() => throw new \Exception('failed to read the stream'),
    );
}

$file->close()->match(
    static fn() => null, // closed correctly
    static fn() => throw new \Exception('failed to close the stream'),
);

Socket handling:

use Innmind\Stream\{
    Stream\Bidirectional,
    Streams,
};
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Immutable\Either;

$socket = Bidirectional::of(\stream_socket_client('unix:///path/to/socket.sock'));
$select = Streams::of()
    ->watch()
    ->timeoutAfter(new ElapsedPeriod(60 * 1000)) // select with a 1 minute timeout
    ->forRead($socket);

do {
    $socket = $select()
        ->filter(static fn($ready) => $ready->toRead()->contains($socket))
        ->flatMap(static fn() => $socket->read())
        ->map(static fn($data) => $data->toUpper())
        ->match(
            static fn($data) => $socket->write($data),
            static fn() => Either::right($socket), // no data to send
        )
        ->match(
            static fn($socket) => $socket, // data sent back
            static fn($error) => throw new \Exception(\get_class($error)),
        );
} while (true);

This example will listen for messages sent from the socket unix:///path/to/socket.sock and will send it back in upper case.