riki137/stream-ipc

Inter-Process Communication (IPC) over streams, pipes, and stdio with built-in request-response correlation, message framing, and serialization.

1.0.0-beta6 2025-06-24 14:35 UTC

README

Packagist Version Code Coverage GitHub Tests PHPStan Level 8 PHP Version License

PHP Stream IPC is a lightweight, zero-dependency PHP library designed for robust IPC (inter-process communication) through streams, pipes, sockets, and standard I/O. Whether you're managing background jobs, orchestrating parallel tasks, or simply need efficient communication between PHP processes, PHP Stream IPC makes it straightforward, reliable, and fast.

Forget complicated setups or bloated frameworksβ€”this library is pure PHP, requiring no external dependencies, and seamlessly integrates with native PHP streams, Symfony's popular Process component or AMPHP's ByteStream component (or your own adapter). It handles everything from framing messages to correlating requests and responses, enabling your applications to effortlessly communicate in real time.

πŸ”₯ Why choose PHP Stream IPC?

  • Zero Dependencies: Lightweight, pure PHPβ€”installs fast and clean.
  • Reliable Messaging: Automatic message framing ensures data integrity.
  • Performance-Focused: Built for speed and efficiency. You can send hundreds of messages per second.
  • Built-in Request-Response Handling: Easily correlate requests with their responses, simplifying async communication.
  • Flexible Serialization: Fast Native PHP serialization by default, with JSON support ready out of the box.
  • Easy Integration with Symfony/AMPHP: Fits perfectly into your existing workflow.
  • Real-time Notifications and Updates: Effortlessly handle real-time progress updates and event-driven messaging.
  • Error and Timeout Management: Robust exception handling, graceful stream closure management, and built-in timeout control keep your processes resilient.
  • Extendable by Design: Simple interfaces and clearly defined contracts mean you can easily adapt or extend functionality for your specific needs.

Whether you're building scalable PHP services, handling parallel background processing, or connecting multiple PHP scripts reliably, PHP Stream IPC gives you the control and simplicity you've been looking for.

πŸ“¦ Quick Installation

Install with Composer in seconds:

composer require riki137/stream-ipc

⚑ 30-Second Tour

The fastest way to grok the API is to copy-paste the two files below, run php parent.php, and watch β€œPong!” come back from a child process.

parent.php – ask, await, done
<?php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;

$peer    = new NativeIpcPeer();                       // β‘  create peer
$cmd     = proc_open('php child.php',                 // β‘‘ launch child
             [['pipe','r'], ['pipe','w'], ['pipe','w']], $pipes);

$session = $peer->createStreamSession(...$pipes);     // β‘’ wrap its pipes
$reply   = $session->request(new LogMessage('Ping!')) // β‘£ send request
                   ->await();                         // β‘€ wait (should be extremely fast)

echo "Child said: {$reply->message}\n";               // β‘₯ print response
child.php – listen & respond
<?php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$peer    = new NativeIpcPeer();
$session = $peer->createStdioSession();               // β‘  wires STDIN/OUT

$session->onRequest(fn(Message $m)                    // β‘‘ one-liner handler
    => new LogMessage("Pong!"));                      // β‘’ respond

$peer->tick();                                        // β‘£ process once

That’s the entire request/response stack – no frameworks, no bootstrapping, just PHP streams and a sprinkle of Stream IPC sugar.

πŸ›  Everyday Patterns

Need progress bars, fire-and-forget notifications, or multiple workers? These snippets are battle-tested shortcuts.

Fire off notifications (no response expected)
$session->notify(new LogMessage('Build started...'));
Progress reporting while working on a request
$session->onRequest(function (Message $req, IpcSession $session) {
    for ($i = 1; $i <= 3; $i++) {
        $session->notify(new LogMessage("Step $i/3 done"));
        sleep(1);
    }
    return new LogMessage('All steps complete βœ…');
});
Spawning N parallel workers
$workers = [];
for ($i = 1; $i <= 4; $i++) {
    $proc          = proc_open("php worker.php $i", [['pipe','r'],['pipe','w'],['pipe','w']], $p);
    $workers[$i]   = $peer->createStreamSession(...$p);
    $workers[$i]->onMessage(fn(Message $m) => printf("[W#%d] %s\n", $i, $m->message));
}

πŸš€ Advanced Patterns

When the basics feel too tame, dip into these expandable sections for in-depth scenarios. They’re long, so they stay folded until summoned.

⚑ **Symfony Process transport** – zero-copy I/O, built-in timeout
use Symfony\Component\Process\Process;
use StreamIpc\SymfonyIpcPeer;
use StreamIpc\Message\LogMessage;

$process  = new Process([PHP_BINARY, 'child.php']);
$peer     = new SymfonyIpcPeer();
$session  = $peer->createSymfonyProcessSession($process);

$response = $session->request(new LogMessage('Hello πŸ‘‹'), 5.0)->await();
echo $response->message;
πŸ”Œ **AMPHP ByteStream transport** – async, event-loop friendly
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use StreamIpc\AmphpIpcPeer;
use StreamIpc\Message\LogMessage;

[$r1,$w1] = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
[$r2,$w2] = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

$peer     = new AmphpIpcPeer();
$session  = $peer->createByteStreamSession(
    new WritableResourceStream($w1),
    [new ReadableResourceStream($r2)]
);

$session->notify(new LogMessage('Async says hi!'));
$peer->tick();      // Amp’s EventLoop will drive this in real life
πŸ•Ή **Custom serializers &\nbsp;ID generators**
use StreamIpc\NativeIpcPeer;
use StreamIpc\Serialization\JsonMessageSerializer;
use StreamIpc\Envelope\Id\RequestIdGenerator;

// JSON on the wire
$peer = new NativeIpcPeer(new JsonMessageSerializer());

// 128-bit random IDs
class UuidGen implements RequestIdGenerator {
    public function generate(): string { return bin2hex(random_bytes(16)); }
}
$peer = new NativeIpcPeer(null, new UuidGen());

πŸ“š Deep-Dive Cookbook

Each pattern below is a self-contained, runnable demo. Put the files in the same directory, run the β€œdriver” script (php client.php, php manager.php, …) and watch the messages fly.

πŸ“– Request ⇄ Response with live progress updates

server.php – the worker that streams progress then replies

<?php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$peer    = new NativeIpcPeer();
$session = $peer->createStdioSession();

/** Answer every request with 3 progress pings + a final success */
$session->onRequest(function (Message $req, $session): Message {
    for ($i = 1; $i <= 3; $i++) {
        $session->notify(new LogMessage("Progress $i / 3"));
        sleep(1);
    }
    return new LogMessage('βœ…  Finished all work');
});

$peer->tick();   // block until parent closes streams

client.php – the caller that shows progress in real-time

<?php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$proc = proc_open('php server.php',
    [['pipe','r'], ['pipe','w'], ['pipe','w']], $pipes);

$peer    = new NativeIpcPeer();
$session = $peer->createStreamSession(...$pipes);

/** Show every notification immediately */
$session->onMessage(function (Message $m) {
    if ($m instanceof LogMessage) {
        echo "[update] {$m->message}\n";
    }
});

echo "β†’ sending job …\n";
$final = $session->request(new LogMessage('Start!'), 10)->await();
echo "β†’ DONE: {$final->message}\n";

proc_close($proc);
⛏ Long-running background worker that streams status

backgroundWorker.php

<?php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;

$peer    = new NativeIpcPeer();
$session = $peer->createStdioSession();

for ($i = 1; $i <= 5; $i++) {
    sleep(1);
    $session->notify(new LogMessage("Step $i/5 complete"));
}

$session->notify(new LogMessage('πŸŽ‰  Task finished', 'success'));

monitor.php

<?php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$proc  = proc_open('php backgroundWorker.php',
    [['pipe','r'], ['pipe','w'], ['pipe','w']], $pipes);

$peer    = new NativeIpcPeer();
$session = $peer->createStreamSession(...$pipes);

$session->onMessage(function (Message $m) {
    if ($m instanceof LogMessage) {
        printf("[%s] %s\n", strtoupper($m->level), $m->message);
    }
});

while (proc_get_status($proc)['running']) {
    $peer->tick(0.1);      // non-blocking poll
}

proc_close($proc);
πŸ‘·β€β™€οΈ Multi-tenant task manager (parallel workers)

manager.php – spins up 3 workers and assigns jobs

<?php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$peer      = new NativeIpcPeer();
$sessions  = [];
$processes = [];

/* Launch three child workers */
for ($id = 1; $id <= 3; $id++) {
    $p = proc_open("php worker.php $id",
        [['pipe','r'], ['pipe','w'], ['pipe','w']], $pipes);

    $sessions[$id]  = $peer->createStreamSession(...$pipes);
    $processes[$id] = $p;

    $sessions[$id]->onMessage(fn(Message $m)
        => printf("Β· Worker %d says: %s\n", $id, $m->message));
}

/* Fire one job at each worker */
foreach ($sessions as $id => $s) {
    $s->request(new LogMessage("Job for W$id"));
}

/* Pump until everybody is done */
while (array_filter($processes, fn($p) => proc_get_status($p)['running'])) {
    $peer->tick(0.05);
}

array_walk($processes, 'proc_close');

worker.php – does its thing, streams updates, replies

<?php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$wid     = $argv[1] ?? '?';
$peer    = new NativeIpcPeer();
$session = $peer->createStdioSession();

$session->onRequest(function (Message $m, $s) use ($wid): Message {
    $s->notify(new LogMessage("[$wid] starting"));
    sleep(1);
    $s->notify(new LogMessage("[$wid] halfway"));
    sleep(1);
    return new LogMessage("[$wid] done");
});

$peer->tick();
🧩 Custom DTOs (domain-specific messages)

src/TaskMessage.php – your own typed message

<?php
namespace App\Messages;

use StreamIpc\Message\Message;

final readonly class TaskMessage implements Message
{
    public function __construct(
        public string $action,
        public array  $params = [],
    ) {}
}

usage.php – sending custom messages

<?php
use StreamIpc\NativeIpcPeer;
use App\Messages\TaskMessage;

$peer    = new NativeIpcPeer();
$session = $peer->createStdioSession();

/* Fire-and-forget notification */
$session->notify(new TaskMessage('reindex', ['db' => 'catalog']));

/* Or ask for a result */
$reply = $session->request(new TaskMessage('checksum', ['path' => '/dump.sql']))
                ->await();
var_dump($reply);

πŸ’‘ Why choose PHP Stream IPC?

  • Zero Dependencies – Installs in seconds; works on shared hosting.
  • Reliable Framing – 4-byte magic + 32-bit length = no corrupted payloads.
  • Correlation Built-in – Automatic promise matching (request()->await()).
  • Pluggable Serialization – Native serialize(), JSON, or roll your own.
  • Graceful Timeouts & Errors – Exceptions bubble exactly where you need them.
  • Symphony & AMPHP adapters – Mix blocking and async worlds effortlessly.
  • Runs Everywhere – Works on Linux, macOS, Windows, inside Docker, CI, etc.

πŸ›£ Roadmap & Contributing

  • Stability and Polishment
  • More tests

PRs are welcome! Fork β†’ branch β†’ commit β†’ pull-request. Guidelines live in CONTRIBUTING.md – tests & PHPStan must pass.

πŸ“ˆ SEO Keywords

IPC PHP, PHP inter-process communication, PHP streams, IPC pipes, Symfony Process IPC, asynchronous PHP, PHP messaging, PHP IPC example, parallel processing PHP

Β© riki137 β€’ Licensed under MIT