hiblaphp / stream
Requires
- evenement/evenement: ^3.0
- hiblaphp/event-loop: dev-main
- hiblaphp/promise: dev-main
Requires (Dev)
- laravel/pint: ^1.10
- pestphp/pest: ^4.0
- phpstan/extension-installer: ^1.4
- phpstan/phpstan: ^2.1
- phpstan/phpstan-strict-rules: ^2.0
This package is auto-updated.
Last update: 2026-03-20 08:28:51 UTC
README
Non-blocking, event-driven streams for PHP with promise-based I/O and automatic backpressure.
hiblaphp/stream provides readable, writable, and duplex stream abstractions built
on top of the Hibla event loop. Streams register I/O watchers with the event loop and
let data flow cooperatively — a read or write never blocks the thread. Backpressure is
tracked automatically: when a writable buffer fills, write() returns false and the
drain event signals when it is safe to continue. Pipe chains wire all of this
together without any manual coordination.
Contents
Getting started
Stream types
Promise-based API
I/O and platform
Reference
- Stream Lifecycle and Events
- No-Op Behaviour
- Resource Cleanup and Destructors
- Events Reference
- API Reference
Meta
Installation
composer require hiblaphp/stream
Requirements:
- PHP 8.3+
hiblaphp/event-loophiblaphp/promiseevenement/evenement
Introduction
PHP's built-in I/O is synchronous. fread() blocks until data arrives. fwrite()
blocks until the kernel accepts the data. For a single file or socket this is fine —
but the moment you are handling multiple streams concurrently, blocking on any one of
them stalls everything else. The event loop cannot fire timers, cannot resume Fibers,
cannot process other I/O until the blocking call returns.
The solution is to hand stream I/O to the event loop entirely. Instead of calling
fread() blindly and waiting, you register a read watcher and supply a callback. The
event loop monitors all active streams at once — using stream_select() or ext-uv
depending on the active driver — and wakes only when data is actually available, then
calls fread() immediately, knowing it will return without blocking.
Hibla streams are that abstraction. A ReadableResourceStream registers a read
watcher with the event loop and emits data events when the underlying resource is
ready. A WritableResourceStream buffers outgoing data and registers a write watcher
to drain the buffer asynchronously without blocking. When you use the promise-based
API — readAsync(), writeAsync(), pipeAsync() — the stream suspends the current
Fiber at the await() point and resumes it exactly when the I/O completes, so the
rest of your code reads top to bottom like ordinary synchronous PHP while the event
loop handles everything underneath.
This library is the foundation that higher-level Hibla components build on.
hiblaphp/socket exposes TCP and UDP connections as duplex streams. hiblaphp/dns
reads and writes over DNS sockets using the same stream primitives. hiblaphp/parallel
uses the promise-based stream API and Fibers to orchestrate IPC between parent and
child processes. If you are using any of these packages you are already using
hiblaphp/stream — understanding it directly gives you full visibility into how data
moves through the entire Hibla stack.
Stream Types
| Class | Direction | Use case |
|---|---|---|
ReadableResourceStream |
Read | Event-driven reads from any readable PHP resource |
WritableResourceStream |
Write | Buffered non-blocking writes to any writable PHP resource |
DuplexResourceStream |
Read + Write | Single resource opened in read/write mode (e.g. r+) |
CompositeStream |
Read + Write | Two independent streams combined into a single duplex interface |
ThroughStream |
Read + Write | In-line transformer — data written in is emitted out, optionally transformed |
PromiseReadableStream |
Read | ReadableResourceStream extended with readAsync(), readLineAsync(), readAllAsync(), pipeAsync() |
PromiseWritableStream |
Write | WritableResourceStream extended with writeAsync(), writeLineAsync(), endAsync() |
Stream Events
Every stream class emits a defined set of events at precise lifecycle points. Understanding what each event means before working with individual stream types makes the rest of this document easier to follow.
Readable stream events
| Event | Arguments | When it fires |
|---|---|---|
data |
string $chunk |
A chunk of data is available from the resource |
end |
— | The stream has reached EOF — no more data events will follow |
close |
— | The underlying resource has been closed and freed |
error |
\Throwable $e |
A read error occurred. The stream closes immediately after |
pause |
— | The stream has been paused — the read watcher is removed |
resume |
— | The stream has resumed — the read watcher is re-registered |
Writable stream events
| Event | Arguments | When it fires |
|---|---|---|
drain |
— | The write buffer has dropped below the soft limit — safe to write again |
finish |
— | end() was called and all buffered data has been fully flushed |
close |
— | The underlying resource has been closed and freed |
error |
\Throwable $e |
A write error occurred, or write() was called on a closed stream |
Key ordering guarantees
- On a readable stream,
endalways fires beforecloseon a clean EOF. A read error skipsendand goes directly toclose. - On a writable stream,
finishalways fires beforecloseafter a cleanend(). Callingclose()directly skipsfinishand discards any buffered data. - After
closefires, all listeners are removed — any listener attached afterclosewill never fire. erroris always followed byclose. The stream closes itself after emittingerror. You do not need to callclose()inside an error handler.
Always attach an
errorlistener on any stream you open. An unhandlederrorevent on anEventEmitterpropagates and may terminate your process.
For the full event sequence diagrams showing exactly when each event fires relative to stream operations, see Stream Lifecycle and Events in the reference section.
Creating Streams
Each stream class accepts a PHP resource directly in its constructor. Resources must
be valid and opened in the appropriate mode — the constructor validates this and throws
a StreamException if not. Non-blocking mode is set automatically.
Note: The examples throughout this README use
fopen()to create resources for brevity.fopen()is a blocking call — it blocks the event loop for the duration of the file open operation. In production code running inside the event loop you should obtain resources through non-blocking means and pass them to the stream constructors.
use Hibla\Stream\ReadableResourceStream; use Hibla\Stream\WritableResourceStream; use Hibla\Stream\DuplexResourceStream; use Hibla\Stream\PromiseReadableStream; use Hibla\Stream\PromiseWritableStream; use Hibla\Stream\CompositeStream; use Hibla\Stream\ThroughStream; $readable = new ReadableResourceStream(fopen('/path/to/input.log', 'rb')); $writable = new WritableResourceStream(fopen('/path/to/output.log', 'wb')); $duplex = new DuplexResourceStream(fopen('/path/to/data.bin', 'r+b')); $composite = new CompositeStream($readable, $writable); $through = new ThroughStream(fn(string $data) => strtoupper($data)); // Promise-based variants $readable = new PromiseReadableStream(fopen('/path/to/input.log', 'rb')); $writable = new PromiseWritableStream(fopen('/path/to/output.log', 'wb')); // Named constructor alternative for promise streams $readable = PromiseReadableStream::fromResource(fopen('/path/to/input.log', 'rb')); $writable = PromiseWritableStream::fromResource(fopen('/path/to/output.log', 'wb'));
A Stream static factory is also available as a convenience shortcut for common
cases like opening files and wrapping standard I/O handles:
use Hibla\Stream\Stream; $readable = Stream::readableFile('/path/to/input.log'); $writable = Stream::writableFile('/path/to/output.log'); $stdin = Stream::stdin(); $stdout = Stream::stdout();
Readable Streams
ReadableResourceStream starts paused by default — no data events fire, no bytes are
read from the resource, and no event loop watcher is registered until you explicitly
call resume() or attach a data listener. The pause-by-default design gives you
time to attach all your event listeners, wire up error handlers, and set up pipe
destinations before any data starts flowing.
use Hibla\Stream\ReadableResourceStream; $stream = new ReadableResourceStream(fopen('/var/log/app.log', 'rb')); // Attach all listeners first — no data flows yet $stream->on('data', function (string $chunk) { echo $chunk; }); $stream->on('end', function () { echo "Stream fully consumed\n"; }); $stream->on('error', function (\Throwable $e) { echo "Error: " . $e->getMessage() . "\n"; }); // Start the flow only after all listeners are in place $stream->resume();
If no data seems to arrive, the most likely cause is a missing
resume()call. The stream is paused and waiting for you to start it. Alternatively, switch to the promise-based API which handlesresume()internally.
Pausing and resuming
Calling pause() stops data events from firing and removes the event loop watcher,
so the stream truly idles between reads rather than spinning. Both pause() and
resume() are idempotent — calling them on an already-paused or already-flowing
stream is a no-op.
$stream->on('data', function (string $chunk) use ($stream) { $stream->pause(); processChunk($chunk); $stream->resume(); });
Seeking and telling
ReadableResourceStream exposes seek() and tell() for seekable resources such as
files. seek() repositions the internal pointer, clears the read-ahead buffer, and
resets the EOF flag — meaning data will flow again from the new position even if the
stream had previously reached the end.
$stream = new ReadableResourceStream(fopen('/tmp/data.bin', 'rb')); $stream->on('data', function (string $chunk) use ($stream) { echo "Read: " . strlen($chunk) . " bytes\n"; $stream->pause(); }); $stream->resume(); // Rewind to the beginning and read again $stream->seek(0); $stream->resume();
seek() returns false on non-seekable resources — pipes, sockets, and STDIN —
without throwing. Always check the return value if seekability matters:
// WRONG — seek() on a pipe silently returns false $stream = new ReadableResourceStream(STDIN); $stream->seek(0); // false — silently ignored // CORRECT if ($stream->seek(0) === false) { // resource is non-seekable — handle accordingly }
seek() throws a StreamException in two cases: when the stream is closed, and when
the underlying resource is no longer valid. tell() follows the same contract.
seek() return value
───────────────────────────────────────────────────
true seek succeeded, buffer cleared, EOF reset
false resource is non-seekable (pipe, socket, STDIN)
StreamException stream is closed or resource is invalid
$position = $stream->tell(); $stream->seek(512, SEEK_SET); // seek to byte 512 $stream->seek(0, SEEK_END); // seek to end of file $stream->seek(-128, SEEK_CUR); // seek relative to current position
Note:
seek()discards any internally buffered data. After a seek the buffer is stale and continuing from it would produce incorrect data, so it is always cleared.
Writable Streams
WritableResourceStream accepts data via write(), buffers it internally, and
registers a write watcher with the event loop to drain the buffer non-blocking. Unlike
readable streams, no resume() is needed — write() can be called immediately after
construction. end() signals that no more data will be written. The buffer drains,
then finish fires, then the stream closes.
end() and close() are both idempotent — calling them multiple times will not throw
or emit duplicate events. This makes it safe to call them defensively without checking
isWritable() first.
Calling write() on a closed stream emits an error event and returns false rather
than throwing an exception. Always attach an error listener if there is any chance
write() could be called after the stream closes.
use Hibla\Stream\WritableResourceStream; $stream = new WritableResourceStream(fopen('/tmp/output.txt', 'wb')); $stream->on('finish', fn() => echo "All data written\n"); $stream->on('error', fn(\Throwable $e) => echo "Write error: " . $e->getMessage() . "\n"); $stream->write("First line\n"); $stream->write("Second line\n"); $stream->end("Final line\n");
Backpressure
When a writable stream's internal buffer exceeds its soft limit, write() returns
false. This is the backpressure signal — stop writing and wait for the drain event
before continuing.
Consumer WritableResourceStream Kernel buffer
│ │ │
│ write("chunk 1") │ │
├──────────────────────────► │ buffer: 10 KB (< 64 KB) │
│ ◄── true (keep going) │ │
│ │ │
│ write("chunk 2") │ │
├──────────────────────────► │ buffer: 70 KB (>= 64 KB) │
│ ◄── FALSE ◄───────────────│ STOP WRITING │
│ │ │
│ [wait for drain] │ ─────────────────────────────►│ fwrite()
│ │ buffer: 15 KB (< 64 KB) │
│ ◄──── emit('drain') ◄─────│ RESUME │
use Hibla\Stream\WritableResourceStream; $socket = stream_socket_client('tcp://example.com:9000'); $writable = new WritableResourceStream($socket, softLimit: 65536); function pump(string $data, WritableResourceStream $writable): void { $canContinue = $writable->write($data); if ($canContinue === false) { $writable->once('drain', function () use ($writable) { pump(getNextChunk(), $writable); }); } }
pipe() and pipeAsync() handle all of this automatically. Manual backpressure
management is only needed when calling write() directly.
Piping Streams
pipe() wires a readable stream to a writable stream and handles backpressure
automatically. When the writable buffer fills, the readable is paused. When the
writable drains, the readable resumes. The destination stream is returned for
chaining.
use Hibla\Stream\ReadableResourceStream; use Hibla\Stream\WritableResourceStream; $source = new ReadableResourceStream(fopen('/tmp/input.bin', 'rb')); $destination = new WritableResourceStream(fopen('/tmp/output.bin', 'wb')); $destination->on('finish', fn() => echo "Transfer complete\n"); // pipe() calls resume() internally — no need to call it yourself $source->pipe($destination);
Pipe chains are composable — each pipe() call returns the destination:
use Hibla\Stream\ThroughStream; $source ->pipe(new ThroughStream(fn($d) => gzencode($d))) ->pipe(new WritableResourceStream(fopen('/tmp/compressed.gz', 'wb')));
Pass ['end' => false] to keep the destination open after the source ends — useful
when piping multiple sources to the same destination sequentially:
// Source A's 'end' will NOT call dest->end() $sourceA->pipe($dest, ['end' => false]); $sourceA->on('end', function () use ($sourceB, $dest) { $sourceB->pipe($dest); // this one WILL close dest when finished });
Calling pipe() on a non-readable source or a non-writable destination is a no-op —
no error is thrown, making it safe to call defensively in cleanup paths.
Through Streams
ThroughStream is a duplex stream that sits in the middle of a pipe chain. Data
written to it is emitted as data events on its readable side, optionally transformed
by a callable. Without a transformer it acts as a transparent passthrough.
use Hibla\Stream\ThroughStream; // Transform: compress mid-pipe $source ->pipe(new ThroughStream(fn(string $data) => gzencode($data))) ->pipe($destination); // Spy: inspect data mid-pipe without modifying it $spy = new ThroughStream(function (string $data) { fwrite(STDERR, sprintf("[spy] %d bytes\n", strlen($data))); return $data; // must return data to pass it through }); $source->pipe($spy)->pipe($destination);
write() on a closed ThroughStream emits an error event and returns false.
end() and close() on an already-closed stream are no-ops. If the transformer
throws, the error event fires and the stream closes.
Duplex Streams
DuplexResourceStream wraps a single resource opened in read/write mode — such as a
TCP socket or a file opened with r+. It presents a unified duplex interface while
managing readable and writable sides on the same underlying resource internally.
Like all readable streams in Hibla, the readable side starts paused — attach your
data and error listeners first, then call resume(). The writable side is always
ready to accept write() calls immediately.
use Hibla\Stream\DuplexResourceStream; $socket = stream_socket_client('tcp://api.example.com:80'); $duplex = new DuplexResourceStream($socket); $duplex->on('data', function (string $response) use ($duplex) { echo $response; $duplex->close(); }); $duplex->on('error', fn(\Throwable $e) => echo "Error: " . $e->getMessage() . "\n"); // Write is always available immediately $duplex->write("GET / HTTP/1.0\r\nHost: api.example.com\r\n\r\n"); // Start receiving after listeners are in place $duplex->resume();
The resource must be opened in a read/write mode (containing + in its mode string).
Passing a read-only or write-only resource throws a StreamException.
Composite Streams
CompositeStream combines two independent, one-directional streams into a single
duplex interface. This is useful when your readable and writable sides are separate
resources — for example, a child process's stdout and stdin.
use Hibla\Stream\CompositeStream; use Hibla\Stream\ReadableResourceStream; use Hibla\Stream\WritableResourceStream; $process = proc_open('ffmpeg -i pipe:0 -f mp3 pipe:1', [ 0 => ['pipe', 'r'], 1 => ['pipe', 'w'], ], $pipes); $composite = new CompositeStream( new ReadableResourceStream($pipes[1]), // process stdout → our readable new WritableResourceStream($pipes[0]) // our writable → process stdin ); // Attach listeners before resuming $composite->on('data', fn(string $chunk) => saveChunk($chunk)); $composite->on('end', fn() => proc_close($process)); $composite->resume(); $inputStream->pipe($composite);
Events from each inner stream are forwarded onto the composite automatically. The
composite close event fires only when both inner streams have closed. Calling
close() on an already-closed composite is a no-op.
Promise-Based API
The event-driven API — on('data', ...), pause(), resume(), drain — is the
right tool when you need maximum throughput and full control over flow. A TCP server
handling hundreds of simultaneous connections, a proxy streaming bytes between two
sockets, or a pipeline processing a continuous feed are all cases where the
event-driven model pays off.
But not every use case needs that level of control. Reading a log file line by line,
processing a CSV upload, writing a sequence of records to a file — these are tasks
where setting up data listeners, managing the end event, and wiring drain
callbacks is pure boilerplate. PromiseReadableStream and PromiseWritableStream
eliminate that boilerplate. They extend their base classes with promise-returning
methods that let you express sequential I/O as straight-line code.
The promise-based API manages pausing and resuming internally. readAsync() resumes
the stream to fetch the next chunk and pauses it again once the chunk is delivered —
data only flows when you ask for it. When writing, writeAsync() waits for the
drain event automatically if the buffer is full — you never need to check write()'s
return value or attach a drain listener yourself.
Reading data
use Hibla\Stream\PromiseReadableStream; use function Hibla\await; $stream = new PromiseReadableStream(fopen('/tmp/data.txt', 'rb')); // Read the next chunk (up to $length bytes) $chunk = await($stream->readAsync(1024)); // Read a full line including the trailing newline character $line = await($stream->readLineAsync()); // Read the entire stream into a single string $contents = await($stream->readAllAsync());
readAsync(), readLineAsync(), and pipeAsync() signal end-of-stream by resolving
with null. Always check with a strict !== null guard — a truthiness check breaks
on valid data:
| Value | Truthiness check | !== null check |
|---|---|---|
"0" — a line containing zero |
stops early | continues |
"" — an empty string |
stops early | continues |
"\n" — a blank line |
stops early | continues |
null — true EOF |
stops | stops |
// CORRECT — stops only on null (EOF) while (($line = await($stream->readLineAsync())) !== null) { processLine(rtrim($line)); } // WRONG — stops on any falsy chunk, including valid data like "0" or "\n" while ($line = await($stream->readLineAsync())) { processLine($line); }
readAllAsync() is the exception — it resolves with a plain string (never null)
because it accumulates everything and returns the complete contents in one go:
$contents = await($stream->readAllAsync());
Both readAllAsync() and readLineAsync() accept a $maxLength parameter to
prevent unbounded memory usage:
$contents = await($stream->readAllAsync(maxLength: 524288)); // 512 KiB limit $line = await($stream->readLineAsync(maxLength: 4096)); // 4 KiB per line
Calling readAsync() on a stream that has already reached EOF resolves immediately
with null — no event loop tick needed.
Reading exact byte counts
readAsync($length) resolves with up to $length bytes — not exactly $length
bytes. The $length argument is passed to fread() as the maximum read size, but
fread() returns whatever the OS has buffered at the moment the read watcher fires,
which may be less than requested.
This is correct for general streaming, but wrong for binary protocol parsing where
message boundaries are defined by fixed field sizes. For those cases, loop over
readAsync() until you have accumulated the required count:
/** * Read exactly $length bytes from a stream. * Returns null if EOF is reached before $length bytes are available. * * @return string|null */ function readExact(PromiseReadableStream $stream, int $length): ?string { $buffer = ''; $remaining = $length; while ($remaining > 0) { $chunk = await($stream->readAsync($remaining)); if ($chunk === null) { return null; // EOF before enough bytes arrived } $buffer .= $chunk; $remaining -= strlen($chunk); } return $buffer; }
With that helper, binary protocol parsing is correct regardless of how the OS delivers data:
// Read a length-prefixed binary message: // [ 4-byte uint32 length ][ N bytes payload ] $header = readExact($stream, 4); if ($header === null) { return; // clean EOF — no more messages } $payloadLength = unpack('N', $header)[1]; $payload = readExact($stream, $payloadLength); if ($payload === null) { throw new \RuntimeException("Truncated message: stream ended early"); }
Use readLineAsync() for text protocols, readExact() for binary protocols, and raw
readAsync() only when your processing logic is genuinely chunk-size-agnostic —
streaming file copies, proxying raw bytes, or feeding a streaming parser that handles
partial input itself.
Writing data
use Hibla\Stream\PromiseWritableStream; use function Hibla\await; $stream = new PromiseWritableStream(fopen('/tmp/out.txt', 'wb')); // Resolves with the number of bytes buffered // Backpressure is handled internally — no drain listener needed $bytes = await($stream->writeAsync("Hello, world\n")); // Write a line (appends "\n" automatically) await($stream->writeLineAsync("Another line")); // End the stream and wait for all data to flush // Resolves only after 'finish' fires — all data is durably written await($stream->endAsync());
writeAsync('') resolves immediately with 0 — no write is attempted.
endAsync() called on a stream that is already ending or already closed resolves
immediately — it is safe to call defensively.
Piping with pipeAsync()
pipeAsync() pipes a PromiseReadableStream to any writable stream and resolves with
the total number of bytes transferred once the source ends. Backpressure between
source and destination is handled automatically.
use Hibla\Stream\PromiseReadableStream; use Hibla\Stream\WritableResourceStream; use function Hibla\await; $source = new PromiseReadableStream(fopen('/tmp/large.bin', 'rb')); $dest = new WritableResourceStream(fopen('/tmp/copy.bin', 'wb')); $totalBytes = await($source->pipeAsync($dest)); echo "Transferred: $totalBytes bytes\n";
Cancellation
All promise-based methods return a standard PromiseInterface — cancel any
in-flight operation by calling cancel() on the returned promise. Cancelling detaches
all internal event listeners, pauses the stream, and cleans up pending state. No
further callbacks fire after cancellation.
use Hibla\EventLoop\Loop; use function Hibla\await; $readable = new PromiseReadableStream(fopen('/tmp/large.log', 'rb')); $readPromise = $readable->readLineAsync(); $timerId = Loop::addTimer(2.0, function () use ($readPromise) { $readPromise->cancel(); }); try { $line = await($readPromise); Loop::cancelTimer($timerId); } catch (\Hibla\Promise\Exceptions\CancelledException $e) { echo "Read cancelled — no data within 2 seconds\n"; }
Cancelling pipeAsync() stops the transfer immediately, pauses the source, and
detaches the destination listener without closing either stream:
$transferPromise = $source->pipeAsync($dest); Loop::addTimer(5.0, fn() => $transferPromise->cancel()); try { $totalBytes = await($transferPromise); } catch (\Hibla\Promise\Exceptions\CancelledException $e) { echo "Transfer cancelled\n"; // $source and $dest are still open — you decide what to do with them }
For structured cancellation across multiple operations, use CancellationTokenSource from hiblaphp/cancellation:
use Hibla\Cancellation\CancellationTokenSource; use function Hibla\await; $cts = new CancellationTokenSource(30.0); // 30 second hard limit try { while (($line = await($stream->readLineAsync(), $cts->token)) !== null) { processLine(rtrim($line)); } } catch (\Hibla\Promise\Exceptions\CancelledException $e) { echo "Stream read timed out after 30 seconds\n"; }
Standard I/O
use Hibla\Stream\PromiseReadableStream; use Hibla\Stream\WritableResourceStream; use Hibla\Stream\CompositeStream; use Hibla\Stream\ReadableResourceStream; use function Hibla\await; // Read from STDIN line by line $stdin = new PromiseReadableStream(STDIN); while (($line = await($stdin->readLineAsync())) !== null) { processLine(rtrim($line)); } // Write to STDOUT respecting backpressure $stdout = new WritableResourceStream(STDOUT); $stdout->write("Hello from async PHP\n"); // Write errors to STDERR $stderr = new WritableResourceStream(STDERR); $stderr->write("Something went wrong\n"); // Combined interactive console — STDIN readable, STDOUT writable $stdio = new CompositeStream( new ReadableResourceStream(STDIN), new WritableResourceStream(STDOUT) ); $stdio->on('data', fn(string $input) => $stdio->write("Echo: $input")); $stdio->resume();
Platform Notes
Windows non-blocking limitations
Non-blocking mode is set automatically on stream construction, but the stream types that support it differ between Unix and Windows.
On Unix and macOS, non-blocking mode is applied to sockets, pipes, STDIO handles,
plain files, and in-memory streams (php://memory, php://temp).
On Windows, non-blocking mode is only applied to socket and pipe resources. Plain
files, STDIO handles, and in-memory streams are left in blocking mode because PHP's
stream_set_blocking() has no effect on non-socket handles on Windows. This means
that on Windows, reading from a file or writing to STDOUT through a Hibla stream will
block the event loop for the duration of the operation — exactly as a raw fread() or
fwrite() call would.
If you are building an application that must run on Windows and needs truly
non-blocking file or STDIO I/O, offload those operations to a worker process via
hiblaphp/parallel rather than using stream watchers directly. Socket-based streams
— TCP, UDP, Unix sockets — behave identically on all platforms.
Stream Lifecycle and Events
This section documents the exact sequence of events emitted by each stream type. It is reference material — understanding lifecycle order matters when you are implementing custom flow control, building protocol parsers, or debugging unexpected behaviour.
Readable stream lifecycle
A ReadableResourceStream always starts paused. No watcher is registered, no data
flows, and no events fire until resume() is called or a data listener is attached.
new ReadableResourceStream($resource)
│
▼
┌─────────┐
│ PAUSED │ ◄─── pause() called (or initial state)
└────┬────┘
│ resume()
▼
┌─────────┐
│ FLOWING │ ◄─── read watcher registered with event loop
└────┬────┘
│ data arrives
├──────────────── emit('data', $chunk) ← repeats each read
│
│ pause() called
├──────────────── emit('pause')
│ read watcher removed
│
│ EOF reached
├──────────────── emit('end')
│ emit('close') ← always follows 'end'
│ resource closed
│
│ read error
└──────────────── emit('error', $e)
emit('close') ← always follows 'error'
resource closed
end and close are always emitted in that order on EOF. A read error skips end
and goes directly to close. After close, all listeners are removed — any listener
attached after close will never fire.
$stream = new ReadableResourceStream(fopen('/var/log/app.log', 'rb')); $stream->on('data', fn(string $chunk) => echo $chunk); $stream->on('end', fn() => echo "Done reading\n"); $stream->on('close', fn() => echo "Stream closed\n"); $stream->on('error', fn(\Throwable $e) => echo "Error: " . $e->getMessage() . "\n"); $stream->resume();
Writable stream lifecycle
A WritableResourceStream is ready immediately — no resume() needed.
new WritableResourceStream($resource)
│
▼
┌──────────┐
│ WRITABLE │
└────┬─────┘
│ write($data) — data buffered, write watcher registered
│
│ buffer < softLimit ──► write() returns true
│ buffer >= softLimit ──► write() returns false
│
│ write watcher drains the buffer
├──────────────── emit('drain')
│
│ end() called
├──────────────── emit('finish')
│ emit('close')
│
│ write() on closed stream
└──────────────── emit('error', $e)
$stream = new WritableResourceStream(fopen('/tmp/output.log', 'wb')); $stream->on('drain', fn() => echo "Drained, can write again\n"); $stream->on('finish', fn() => echo "All data written\n"); $stream->on('close', fn() => echo "Stream closed\n"); $stream->on('error', fn(\Throwable $e) => echo "Write error: " . $e->getMessage() . "\n"); $stream->write("Hello\n"); $stream->end("Goodbye\n");
Pipe event flow
pipe() coordinates data, drain, end, and close events between source and
destination automatically:
ReadableResourceStream WritableResourceStream
│ │
emit('data', $chunk) ──────────────────► write($chunk)
│
│ [buffer not full]
│ write() returns true ◄─────
│ keep flowing
│
emit('data', $chunk) ──────────────────► write($chunk)
│
│ [buffer now full]
│ write() returns false ◄────
pause() ◄──────────────────────────────── STOP SOURCE
│
│ [event loop drains the buffer]
│ emit('drain')
│ ◄── resume() ────────────────────┘
│ [flowing again]
│
emit('end') ───────────────────────────► end()
│ emit('finish')
│ emit('close')
ThroughStream event flow
ThroughStream is both a writable (receives write() calls) and a readable (emits
data events). Unlike resource-backed streams, it has no I/O watcher — the event
loop is not involved.
Upstream writes ThroughStream Downstream reads
│ │ │
│ write($chunk) │ │
├───────────────────────►│ │
│ [transform($chunk)] │
│ emit('data', $result) ────────►│
│ │ │
│ end($final) │ │
├───────────────────────►│ │
│ emit('data', $transformed) │
│ emit('end') │
│ emit('finish') │
│ emit('close') │
│ │ │
│ transformer throws │ │
│ emit('error', $e) │
│ emit('close') │
CompositeStream and DuplexResourceStream events
Both are wrappers around underlying streams. Their events are forwarded from the inner streams, not re-emitted independently.
The close event on a CompositeStream fires only when both inner streams have
closed. On a DuplexResourceStream, close fires as soon as either side closes
because both share the same underlying resource.
Error event behaviour
Both readable and writable streams follow the same rule: an error event is always
followed by close. The stream closes itself after emitting error. You do not need
to call close() inside an error handler.
// CORRECT — just handle the error; close fires on its own $stream->on('error', function (\Throwable $e) { echo "Error: " . $e->getMessage() . "\n"; }); $stream->on('close', function () { cleanupResources(); });
No-Op Behaviour
All stream types are designed to be called defensively without needing to check state first. Redundant calls are silently ignored — no exceptions are thrown, no duplicate events fire.
| Call | Condition | Behaviour |
|---|---|---|
pause() |
Stream already paused or closed | No-op |
resume() |
Stream already flowing or closed | No-op |
close() |
Stream already closed | No-op |
end() |
Stream already ending or closed | No-op |
write('') |
Any writable stream | No-op — returns true, no buffer interaction |
writeAsync('') |
Any PromiseWritableStream |
Resolves immediately with 0 |
endAsync() |
Stream already ending or closed | Resolves immediately |
readAsync() |
Stream already at EOF | Resolves immediately with null |
pipe() |
Source not readable | No-op — returns destination unchanged |
pipe() |
Destination not writable | Pauses source, returns destination unchanged |
removeReadWatcher() |
Watcher already removed | No-op — returns false |
removeWriteWatcher() |
Watcher already removed | No-op — returns false |
The one exception is write() on a closed stream — this emits an error event and
returns false rather than silently succeeding, because writing to a closed stream is
almost always a logic error that should surface rather than be swallowed.
Resource Cleanup and Destructors
All stream classes implement __destruct. If the stream has not been explicitly
closed by the time the object is garbage collected, the destructor calls close()
automatically to free the underlying resource. Stream resources are never silently
leaked.
However, the destructor calls close() directly — it does not call end() first.
For writable streams this has an important consequence: any data still buffered at
destruction time is discarded and the finish event never fires. If you rely on
finish to confirm that all data has been flushed, always call end() or endAsync()
explicitly before letting the stream go out of scope.
// Wrong — buffer may be discarded if $stream goes out of scope $stream = new WritableResourceStream(fopen('/tmp/output.txt', 'wb')); $stream->write("Important data\n"); // $stream goes out of scope — destructor calls close(), buffer is discarded // Correct — drain the buffer before releasing the stream $stream = new WritableResourceStream(fopen('/tmp/output.txt', 'wb')); $stream->on('finish', fn() => echo "All data flushed\n"); $stream->write("Important data\n"); $stream->end();
With the promise-based API this risk is eliminated as long as you await every write
and end call to completion. By the time endAsync() resolves, all data is durably
written and the destructor has nothing left to discard:
$stream = new PromiseWritableStream(fopen('/tmp/output.txt', 'wb')); await($stream->writeAsync("Important data\n")); await($stream->endAsync()); // all data flushed before this resolves
The destructor is a safety net for resource handles, not a substitute for explicit lifecycle management.
Events Reference
Readable stream events
| Event | Arguments | Description |
|---|---|---|
data |
string $chunk |
Fires when a chunk of data is available |
end |
— | Fires when the stream reaches EOF — no more data events will follow |
close |
— | Fires when the underlying resource is closed |
error |
\Throwable $e |
Fires on a read error. The stream closes after emitting error |
pause |
— | Fires when the stream is paused |
resume |
— | Fires when the stream resumes |
Writable stream events
| Event | Arguments | Description |
|---|---|---|
drain |
— | Fires when the write buffer drops below the soft limit — safe to write again |
finish |
— | Fires after end() is called and all buffered data has been flushed |
close |
— | Fires when the underlying resource is closed |
error |
\Throwable $e |
Fires on a write error. The stream closes after emitting error |
API Reference
Stream factory
| Method | Returns | Description |
|---|---|---|
Stream::readable($resource, $chunkSize) |
ReadableResourceStream |
Wrap a readable resource |
Stream::writable($resource, $softLimit) |
WritableResourceStream |
Wrap a writable resource |
Stream::duplex($resource, $readChunkSize, $writeSoftLimit) |
DuplexResourceStream |
Wrap a read/write resource |
Stream::composite($readable, $writable) |
CompositeStream |
Combine two streams into one duplex |
Stream::through(?callable $transformer) |
ThroughStream |
Create a transform stream |
Stream::readableFile($path, $chunkSize) |
ReadableResourceStream |
Open a file for reading |
Stream::writableFile($path, $append, $softLimit) |
WritableResourceStream |
Open a file for writing |
Stream::duplexFile($path, $readChunkSize, $writeSoftLimit) |
DuplexResourceStream |
Open a file for read/write |
Stream::stdin($chunkSize) |
ReadableResourceStream |
STDIN as a readable stream |
Stream::stdout($softLimit) |
WritableResourceStream |
STDOUT as a writable stream |
Stream::stderr($softLimit) |
WritableResourceStream |
STDERR as a writable stream |
Stream::stdio($readChunkSize, $writeSoftLimit) |
CompositeStream |
STDIN + STDOUT as a single duplex |
ReadableStreamInterface
| Method | Returns | Description |
|---|---|---|
pipe($destination, $options) |
WritableStreamInterface |
Pipe to a writable stream with automatic backpressure |
isReadable() |
bool |
True if the stream is open and readable |
pause() |
void |
Stop emitting data events. No-op if already paused or closed |
resume() |
void |
Resume emitting data events. No-op if already flowing or closed |
close() |
void |
Close the stream and free the resource. No-op if already closed |
ReadableResourceStream
Extends ReadableStreamInterface with:
| Method | Returns | Description |
|---|---|---|
isEof() |
bool |
True if the stream has reached the end of the resource |
isPaused() |
bool |
True if the stream is currently paused |
seek($offset, $whence) |
bool |
Reposition the stream pointer. Clears internal buffer and resets EOF. Returns false on non-seekable resources. Throws StreamException if the stream is closed |
tell() |
int|false |
Return the current byte position. Returns false if undetermined. Throws StreamException if the stream is closed |
WritableStreamInterface
| Method | Returns | Description |
|---|---|---|
write($data) |
bool |
Write data. Returns false if the buffer is full (backpressure). Emits error if called on a closed stream |
end($data) |
void |
Signal end-of-stream, optionally writing a final chunk. No-op if already ending or closed |
isWritable() |
bool |
True if the stream is open and writable |
close() |
void |
Close the stream, discarding any buffered data. No-op if already closed |
WritableResourceStream
Extends WritableStreamInterface with:
| Method | Returns | Description |
|---|---|---|
isEnding() |
bool |
True if end() has been called and the stream is draining its buffer before closing |
PromiseReadableStreamInterface
| Method | Returns | Description |
|---|---|---|
readAsync($length) |
PromiseInterface<string|null> |
Read a chunk. Resolves with null at EOF. Supports cancellation |
readLineAsync($maxLength) |
PromiseInterface<string|null> |
Read until \n or $maxLength. Resolves with null at EOF. Supports cancellation |
readAllAsync($maxLength) |
PromiseInterface<string> |
Read entire stream into a string. Supports cancellation |
pipeAsync($destination, $options) |
PromiseInterface<int> |
Pipe to a writable stream. Resolves with total bytes transferred. Supports cancellation |
PromiseWritableStreamInterface
| Method | Returns | Description |
|---|---|---|
writeAsync($data) |
PromiseInterface<int> |
Write data. Waits for drain if buffer is full. Resolves with 0 if $data is empty. Supports cancellation |
writeLineAsync($data) |
PromiseInterface<int> |
Write data with an appended \n. Supports cancellation |
endAsync($data) |
PromiseInterface<void> |
End the stream. Resolves when all buffered data is flushed. Resolves immediately if already ending or closed. Supports cancellation |
Development
git clone https://github.com/hiblaphp/stream.git
cd stream
composer install
./vendor/bin/pest
./vendor/bin/phpstan analyse
Credits
- API Design: Inspired by the ReactPHP Streams interface. If you are familiar with ReactPHP's stream API, Hibla's will feel immediately familiar — with the addition of native promise-based methods and Fiber-aware I/O.
- Event Emitter: Built on evenement/evenement.
- Event Loop Integration: Powered by hiblaphp/event-loop.
- Promise Integration: Built on hiblaphp/promise.
License
MIT License. See LICENSE for more information.