riki137 / stream-ipc
Inter-Process Communication (IPC) over streams, pipes, and stdio with built-in request-response correlation, message framing, and serialization.
Requires
- php: ^8.2
Requires (Dev)
- amphp/byte-stream: ^2.1
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^12.1
- symfony/process: ^7.2
This package is auto-updated.
Last update: 2025-06-24 14:35:35 UTC
README
PHP Stream IPC is a lightweight, zero-dependency PHP library designed for robust IPC (inter-process communication) through streams, pipes, sockets, and standard I/O. Whether you're managing background jobs, orchestrating parallel tasks, or simply need efficient communication between PHP processes, PHP Stream IPC makes it straightforward, reliable, and fast.
Forget complicated setups or bloated frameworksβthis library is pure PHP, requiring no external dependencies, and seamlessly integrates with native PHP streams, Symfony's popular Process
component or AMPHP's ByteStream component (or your own adapter). It handles everything from framing messages to correlating requests and responses, enabling your applications to effortlessly communicate in real time.
π₯ Why choose PHP Stream IPC?
- Zero Dependencies: Lightweight, pure PHPβinstalls fast and clean.
- Reliable Messaging: Automatic message framing ensures data integrity.
- Performance-Focused: Built for speed and efficiency. You can send hundreds of messages per second.
- Built-in Request-Response Handling: Easily correlate requests with their responses, simplifying async communication.
- Flexible Serialization: Fast Native PHP serialization by default, with JSON support ready out of the box.
- Easy Integration with Symfony/AMPHP: Fits perfectly into your existing workflow.
- Real-time Notifications and Updates: Effortlessly handle real-time progress updates and event-driven messaging.
- Error and Timeout Management: Robust exception handling, graceful stream closure management, and built-in timeout control keep your processes resilient.
- Extendable by Design: Simple interfaces and clearly defined contracts mean you can easily adapt or extend functionality for your specific needs.
Whether you're building scalable PHP services, handling parallel background processing, or connecting multiple PHP scripts reliably, PHP Stream IPC gives you the control and simplicity you've been looking for.
π¦ Quick Installation
Install with Composer in seconds:
composer require riki137/stream-ipc
β‘ 30-Second Tour
The fastest way to grok the API is to copy-paste the two files below,
run php parent.php
, and watch βPong!β come back from a child process.
parent.php β ask, await, done
<?php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; $peer = new NativeIpcPeer(); // β create peer $cmd = proc_open('php child.php', // β‘ launch child [['pipe','r'], ['pipe','w'], ['pipe','w']], $pipes); $session = $peer->createStreamSession(...$pipes); // β’ wrap its pipes $reply = $session->request(new LogMessage('Ping!')) // β£ send request ->await(); // β€ wait (should be extremely fast) echo "Child said: {$reply->message}\n"; // β₯ print response
child.php β listen & respond
<?php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); // β wires STDIN/OUT $session->onRequest(fn(Message $m) // β‘ one-liner handler => new LogMessage("Pong!")); // β’ respond $peer->tick(); // β£ process once
Thatβs the entire request/response stack β no frameworks, no bootstrapping, just PHP streams and a sprinkle of Stream IPC sugar.
π Everyday Patterns
Need progress bars, fire-and-forget notifications, or multiple workers? These snippets are battle-tested shortcuts.
Fire off notifications (no response expected)
$session->notify(new LogMessage('Build started...'));
Progress reporting while working on a request
$session->onRequest(function (Message $req, IpcSession $session) { for ($i = 1; $i <= 3; $i++) { $session->notify(new LogMessage("Step $i/3 done")); sleep(1); } return new LogMessage('All steps complete β '); });
Spawning N parallel workers
$workers = []; for ($i = 1; $i <= 4; $i++) { $proc = proc_open("php worker.php $i", [['pipe','r'],['pipe','w'],['pipe','w']], $p); $workers[$i] = $peer->createStreamSession(...$p); $workers[$i]->onMessage(fn(Message $m) => printf("[W#%d] %s\n", $i, $m->message)); }
π Advanced Patterns
When the basics feel too tame, dip into these expandable sections for in-depth scenarios. Theyβre long, so they stay folded until summoned.
β‘ **Symfony Process transport** β zero-copy I/O, built-in timeout
use Symfony\Component\Process\Process; use StreamIpc\SymfonyIpcPeer; use StreamIpc\Message\LogMessage; $process = new Process([PHP_BINARY, 'child.php']); $peer = new SymfonyIpcPeer(); $session = $peer->createSymfonyProcessSession($process); $response = $session->request(new LogMessage('Hello π'), 5.0)->await(); echo $response->message;
π **AMPHP ByteStream transport** β async, event-loop friendly
use Amp\ByteStream\ResourceInputStream; use Amp\ByteStream\ResourceOutputStream; use StreamIpc\AmphpIpcPeer; use StreamIpc\Message\LogMessage; [$r1,$w1] = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); [$r2,$w2] = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); $peer = new AmphpIpcPeer(); $session = $peer->createByteStreamSession( new WritableResourceStream($w1), [new ReadableResourceStream($r2)] ); $session->notify(new LogMessage('Async says hi!')); $peer->tick(); // Ampβs EventLoop will drive this in real life
πΉ **Custom serializers &\nbsp;ID generators**
use StreamIpc\NativeIpcPeer; use StreamIpc\Serialization\JsonMessageSerializer; use StreamIpc\Envelope\Id\RequestIdGenerator; // JSON on the wire $peer = new NativeIpcPeer(new JsonMessageSerializer()); // 128-bit random IDs class UuidGen implements RequestIdGenerator { public function generate(): string { return bin2hex(random_bytes(16)); } } $peer = new NativeIpcPeer(null, new UuidGen());
π Deep-Dive Cookbook
Each pattern below is a self-contained, runnable demo. Put the files in the same directory, run the βdriverβ script (
php client.php
,php manager.php
, β¦) and watch the messages fly.
π Request β Response with live progress updates
server.php β the worker that streams progress then replies
<?php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); /** Answer every request with 3 progress pings + a final success */ $session->onRequest(function (Message $req, $session): Message { for ($i = 1; $i <= 3; $i++) { $session->notify(new LogMessage("Progress $i / 3")); sleep(1); } return new LogMessage('β Finished all work'); }); $peer->tick(); // block until parent closes streams
client.php β the caller that shows progress in real-time
<?php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $proc = proc_open('php server.php', [['pipe','r'], ['pipe','w'], ['pipe','w']], $pipes); $peer = new NativeIpcPeer(); $session = $peer->createStreamSession(...$pipes); /** Show every notification immediately */ $session->onMessage(function (Message $m) { if ($m instanceof LogMessage) { echo "[update] {$m->message}\n"; } }); echo "β sending job β¦\n"; $final = $session->request(new LogMessage('Start!'), 10)->await(); echo "β DONE: {$final->message}\n"; proc_close($proc);
β Long-running background worker that streams status
backgroundWorker.php
<?php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); for ($i = 1; $i <= 5; $i++) { sleep(1); $session->notify(new LogMessage("Step $i/5 complete")); } $session->notify(new LogMessage('π Task finished', 'success'));
monitor.php
<?php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $proc = proc_open('php backgroundWorker.php', [['pipe','r'], ['pipe','w'], ['pipe','w']], $pipes); $peer = new NativeIpcPeer(); $session = $peer->createStreamSession(...$pipes); $session->onMessage(function (Message $m) { if ($m instanceof LogMessage) { printf("[%s] %s\n", strtoupper($m->level), $m->message); } }); while (proc_get_status($proc)['running']) { $peer->tick(0.1); // non-blocking poll } proc_close($proc);
π·ββοΈ Multi-tenant task manager (parallel workers)
manager.php β spins up 3 workers and assigns jobs
<?php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $peer = new NativeIpcPeer(); $sessions = []; $processes = []; /* Launch three child workers */ for ($id = 1; $id <= 3; $id++) { $p = proc_open("php worker.php $id", [['pipe','r'], ['pipe','w'], ['pipe','w']], $pipes); $sessions[$id] = $peer->createStreamSession(...$pipes); $processes[$id] = $p; $sessions[$id]->onMessage(fn(Message $m) => printf("Β· Worker %d says: %s\n", $id, $m->message)); } /* Fire one job at each worker */ foreach ($sessions as $id => $s) { $s->request(new LogMessage("Job for W$id")); } /* Pump until everybody is done */ while (array_filter($processes, fn($p) => proc_get_status($p)['running'])) { $peer->tick(0.05); } array_walk($processes, 'proc_close');
worker.php β does its thing, streams updates, replies
<?php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $wid = $argv[1] ?? '?'; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); $session->onRequest(function (Message $m, $s) use ($wid): Message { $s->notify(new LogMessage("[$wid] starting")); sleep(1); $s->notify(new LogMessage("[$wid] halfway")); sleep(1); return new LogMessage("[$wid] done"); }); $peer->tick();
π§© Custom DTOs (domain-specific messages)
src/TaskMessage.php β your own typed message
<?php namespace App\Messages; use StreamIpc\Message\Message; final readonly class TaskMessage implements Message { public function __construct( public string $action, public array $params = [], ) {} }
usage.php β sending custom messages
<?php use StreamIpc\NativeIpcPeer; use App\Messages\TaskMessage; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); /* Fire-and-forget notification */ $session->notify(new TaskMessage('reindex', ['db' => 'catalog'])); /* Or ask for a result */ $reply = $session->request(new TaskMessage('checksum', ['path' => '/dump.sql'])) ->await(); var_dump($reply);
π‘ Why choose PHP Stream IPC?
- Zero Dependencies β Installs in seconds; works on shared hosting.
- Reliable Framing β 4-byte magic + 32-bit length = no corrupted payloads.
- Correlation Built-in β Automatic promise matching (
request()->await()
). - Pluggable Serialization β Native
serialize()
, JSON, or roll your own. - Graceful Timeouts & Errors β Exceptions bubble exactly where you need them.
- Symphony & AMPHP adapters β Mix blocking and async worlds effortlessly.
- Runs Everywhere β Works on Linux, macOS, Windows, inside Docker, CI, etc.
π£ Roadmap & Contributing
- Stability and Polishment
- More tests
PRs are welcome! Fork β branch β commit β pull-request.
Guidelines live in CONTRIBUTING.md
β tests & PHPStan must pass.
π SEO Keywords
IPC PHP, PHP inter-process communication, PHP streams, IPC pipes, Symfony Process IPC, asynchronous PHP, PHP messaging, PHP IPC example, parallel processing PHP
Β© riki137 β’ Licensed under MIT