High level abstraction for streams and sockets

2.7.0 2024-03-09 16:55 UTC

This package is auto-updated.

Last update: 2024-11-09 18:17:02 UTC


README

Build Status codecov Type Coverage

High level abstraction on top of innmind/stream to work with streams in a more functional way.

Installation

composer require innmind/io

Usage

Note

examples below use innmind/operating-system

Reading from a stream by chunks

use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Immutable\Str;

$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$chunks = $io
    ->readable()
    ->wrap(
        $streams
            ->readable()
            ->acquire(\fopen('/some/file.ext', 'r')),
    )
    ->toEncoding(Str\Encoding::ascii)
    // or call ->watch() to wait forever for the stream to be ready before
    // reading from it
    ->timeoutAfter(ElapsedPeriod::of(1_000))
    ->chunks(8192) // max length of each chunk
    ->lazy()
    ->sequence();

The $chunks variable is a Innmind\Innmutable\Sequence containing Innmind\Immutable\Str values, where each value is of a maximum length of 8192 bytes. Before a value is yielded it will make sure data is available before reading from the stream. If no data is available within 1 second the Sequence will throw an exception saying it can't read from the stream, if you don't want it to throw replace timeoutAfter() by watch() so it will wait as long as it needs to.

Reading from a stream by lines

use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Immutable\Str;

$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$lines = $io
    ->readable()
    ->wrap(
        $streams
            ->readable()
            ->acquire(\fopen('/some/file.ext', 'r')),
    )
    ->toEncoding(Str\Encoding::ascii)
    // or call ->watch() to wait forever for the stream to be ready before
    // reading from it
    ->timeoutAfter(ElapsedPeriod::of(1_000))
    ->lines()
    ->lazy()
    ->sequence();

This is the same as reading by chunks (described above) except that the delimiter is the end of line character \n.

Reading from a socket with a periodic heartbeat

use Innmind\IO\{
    IO,
    Readable\Frame,
};
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Socket\{
    Address,
    Client,
};
use Innmind\Stream\Streams;
use Innmind\Immutable\{
    Str,
    Sequence,
};

$socket = Client\Unix::of(Address\Unix::of('/tmp/foo'))->match(
    static fn($socket) => $socket,
    static fn() => throw new \RuntimeException;
);
$os = Factory::build();
$io = IO::of($os->sockets()->watch(...));
$frame = $io
    ->sockets()
    ->clients()
    ->wrap($socket)
    ->toEncoding(Str\Encoding::ascii)
    ->timeoutAfter(ElapsedPeriod::of(1_000))
    ->heartbeatWith(static fn() => Sequence::of(Str::of('heartbeat')))
    ->frames(Frame\Line::new())
    ->one()
    ->match(
        static fn($line) => $line,
        static fn() => throw new \RuntimeException,
    );

This example will wait to read a single from the socket /tmp/foo.sock and it will send a heartbeat message every second until the expected line is received.

Reading from a stream

use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Socket\Address\Unix;
use Innmind\Immutable\{
    Str,
    Fold,
    Either,
};

$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$io
    ->readable()
    ->wrap(
        $os
            ->sockets()
            ->connectTo(Unix::of('/some/socket')),
    )
    ->toEncoding('ASCII')
    // or call ->watch() to wait forever for the stream to be ready before
    // reading from it
    ->timeoutAfter(ElapsedPeriod::of(1_000))
    ->chunks(8192) // max length of each chunk
    ->fold(
        Fold::with([]),
        static function(array $chunks, Str $chunk) {
            $chunks[] = $chunk->toString();

            if ($chunk->contains('quit')) {
                return Fold::result($chunks);
            }

            if ($chunk->contains('throw')) {
                return Fold::fail('some error');
            }

            return Fold::with($chunks);
        },
    )
    ->match(
        static fn(Either $result) => $result->match(
            static fn(array $chunks) => doStuff($chunks),
            static fn(string $error) => throw new \Exception($error), // $error === 'some error'
        ),
        static fn() => throw new \RuntimeException('Failed to read from the stream or it timed out'),
    );

This example will:

  • open the local socket /some/socket
  • watch the socket to be ready for 1 second before it times out each time it tries to read from it
  • read chunks of a maximum length of 8192
  • use the encoding ASCII
  • call the function passed to ->fold() each time a chunk is read
  • it will continue reading from the stream until one of the chunks contains quit or throw
  • return a Maybe<Either<string, list<string>>>
    • contains nothing when it failed to read from the stream or it timed out
    • string is the value passed to Fold::fail()
    • list<string> is the value passed to Fold::result()

You can think of this fold operation as a reduce where you can control when to stop iterating by return either Fold::fail() or Fold::result().