riki137/stream-ipc

Inter-Process Communication (IPC) over streams, pipes, and stdio with built-in request-response correlation, message framing, and serialization.

dev-main 2025-05-30 08:05 UTC

This package is auto-updated.

Last update: 2025-05-30 08:05:21 UTC


README

Packagist Version Code Coverage GitHub Tests PHPStan Level 8 PHP Version License

PHP Stream IPC is a lightweight, zero-dependency PHP library designed for robust IPC (inter-process communication) through streams, pipes, sockets, and standard I/O. Whether you're managing background jobs, orchestrating parallel tasks, or simply need efficient communication between PHP processes, PHP Stream IPC makes it straightforward, reliable, and fast.

Forget complicated setups or bloated frameworksโ€”this library is pure PHP, requiring no external dependencies, and seamlessly integrates with native PHP streams, Symfony's popular Process component or AMPHP's ByteStream component (or your own adapter). It handles everything from framing messages to correlating requests and responses, enabling your applications to effortlessly communicate in real time.

๐Ÿ”ฅ Why choose PHP Stream IPC?

  • Zero Dependencies: Lightweight, pure PHPโ€”installs fast and clean.
  • Reliable Messaging: Automatic message framing ensures data integrity.
  • Performance-Focused: Built for speed and efficiency. You can send hundreds of messages per second.
  • Built-in Request-Response Handling: Easily correlate requests with their responses, simplifying async communication.
  • Flexible Serialization: Fast Native PHP serialization by default, with JSON support ready out of the box.
  • Easy Integration with Symfony/AMPHP: Fits perfectly into your existing workflow.
  • Real-time Notifications and Updates: Effortlessly handle real-time progress updates and event-driven messaging.
  • Error and Timeout Management: Robust exception handling, graceful stream closure management, and built-in timeout control keep your processes resilient.
  • Extendable by Design: Simple interfaces and clearly defined contracts mean you can easily adapt or extend functionality for your specific needs.

Whether you're building scalable PHP services, handling parallel background processing, or connecting multiple PHP scripts reliably, PHP Stream IPC gives you the control and simplicity you've been looking for.

๐Ÿ“ฆ Quick Installation

Install with Composer in seconds:

composer require riki137/stream-ipc

๐Ÿ“ฆ Quick Installation

Install via Composer:

composer require riki137/stream-ipc

โšก Quick Usage Example

Parent-Child IPC Example:

Parent (parent.php):

use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;

$process = proc_open('php child.php', [
    ['pipe', 'r'], ['pipe', 'w'], ['pipe', 'w']
], $pipes);

$peer = new NativeIpcPeer();
$session = $peer->createStreamSession($pipes[0], $pipes[1], $pipes[2]);

$response = $session->request(new LogMessage('Ping!'))->await();
echo "Child responded: {$response->message}\n";

proc_close($process);

Child (child.php):

use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$peer = new NativeIpcPeer();
$session = $peer->createStdioSession();

$session->onRequest(fn(Message $msg) => new LogMessage("Pong!"));
$peer->tick();

๐Ÿ“– Common Use Cases

  • Background Tasks: Run asynchronous workers with real-time communication.
  • Multi-Process PHP Applications: Efficiently manage parallel PHP scripts.
  • Real-time Progress Tracking: Provide updates on task progress via IPC.
  • Server-Client PHP Scripts: Use PHP scripts as IPC-driven microservices.

๐Ÿ“š Understanding the Message Flow

  1. Direct Notifications: Send messages from one process to another with notify()
  2. Request-Response: Send a request with request() and get a correlated response
  3. Progress Updates: A process can send notifications while processing a request
  4. Event Handling: Register callbacks for messages and requests with onMessage() and onRequest()

๐Ÿ”„ Message Handling

Register event-driven handlers easily:

// Notification Handler
$session->onMessage(function (Message $msg) {
    echo "Received: {$msg->message}\n";
});

// Request Handler with Response
$session->onRequest(function (Message $msg): ?Message {
    return new LogMessage("Processed request: {$msg->message}");
});

โณ Timeout and Exception Management

Handle request timeouts gracefully:

use StreamIpc\Transport\TimeoutException;

try {
    $session->request(new LogMessage("Quick task"), 3.0)->await();
} catch (TimeoutException $e) {
    echo "Task timed out: {$e->getMessage()}\n";
}

๐ŸŽ› Advanced Configuration

Custom Serialization (JSON):

use StreamIpc\Serialization\JsonMessageSerializer;
$peer = new NativeIpcPeer(new JsonMessageSerializer());

Custom Request ID Generation (UUID):

use StreamIpc\Envelope\Id\RequestIdGenerator;

class UuidRequestIdGenerator implements RequestIdGenerator {
    public function generate(): string {
        return bin2hex(random_bytes(16));
    }
}

$peer = new NativeIpcPeer(null, new UuidRequestIdGenerator());

๐Ÿ›  Development & Contribution

Contributions are welcome! To contribute:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/my-feature)
  3. Commit changes (git commit -m "Add my feature")
  4. Push your branch (git push origin feature/my-feature)
  5. Open a Pull Request on GitHub

Current areas open for contributions:

  • Enhancing AMPHP transport stability and tests
  • Improving error handling documentation and examples

๐Ÿ“„ License

PHP Stream IPC is open-source software licensed under the MIT License. See LICENSE for more details.

๐Ÿ“ˆ SEO Keywords

IPC PHP, PHP inter-process communication, PHP streams, PHP IPC library, IPC pipes, Symfony Process IPC, asynchronous PHP, PHP messaging, PHP IPC example, PHP parallel processing

๐Ÿ“Œ Tags

php, ipc, symfony-process, stream, asynchronous, inter-process communication, message passing, php-library, ipc-framework

For issues, feature requests, or general inquiries, please open an issue.

ยฉ riki137

๐Ÿงฉ Documentation

Using Symfony's Process Component

The library works seamlessly with Symfony's Process component. The createSymfonyProcessSession() helper automatically starts the process and wires it for message passing using a Symfony InputStream. Configure your Process instance (working directory, environment variables, timeouts, etc.) before handing it to the session:

use Symfony\Component\Process\Process;

$process = new Process([PHP_BINARY, 'child.php']);
$process->setTimeout(0); // disable Process timeouts if desired
$peer = new SymfonyIpcPeer();
$session = $peer->createSymfonyProcessSession($process);

$response = $session->request(new LogMessage('Hello from parent!'), 5.0)->await();
echo "Child responded: {$response->message}\n";

You may run multiple processes in parallel and drive them all by calling $peer->tick() (or tickFor()) inside your main loop.

This approach requires the symfony/process package:

composer require symfony/process
// child.php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$peer = new NativeIpcPeer();
$session = $peer->createStdioSession();

// Handle requests from parent
$session->onRequest(function(Message $msg, $session): Message {
    // Process the message from parent
    echo "Received from parent: {$msg->message}\n";
    
    // Send response back to parent
    return new LogMessage("Hello from child!");
});

// Process messages until parent closes connection
$peer->tick();

Long-Running Background Process

Create a background process that regularly sends status updates:

// backgroundWorker.php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;

 $peer = new NativeIpcPeer();
$session = $peer->createStdioSession();

// Simulate background work
for ($i = 1; $i <= 5; $i++) {
    // Do some work...
    sleep(1);
    
    // Send status update to parent
    $session->notify(new LogMessage("Progress: {$i}/5 complete", "info"));
}

// Send final success message
$session->notify(new LogMessage("Task completed successfully", "success"));
// monitor.php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$descriptors = [
    0 => ['pipe', 'r'],
    1 => ['pipe', 'w'],
    2 => ['pipe', 'w']
];
$process = proc_open('php backgroundWorker.php', $descriptors, $pipes);
[$stdin, $stdout, $stderr] = $pipes;

 $peer = new NativeIpcPeer();
$session = $peer->createStreamSession($stdin, $stdout, $stderr);

// Listen for status updates
$session->onMessage(function(Message $msg) {
    if ($msg instanceof LogMessage) {
        echo "[{$msg->level}] {$msg->message}\n";
    }
});

// Keep processing messages until process exits
while (proc_get_status($process)['running']) {
    $peer->tick(0.1);
}

proc_close($process);

Request-Response Pattern with Progress Updates

// server.php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

 $peer = new NativeIpcPeer();
$session = $peer->createStdioSession();

$session->onRequest(function(Message $msg, $session): Message {
    // Start processing request
    $session->notify(new LogMessage("Starting work", "info"));
    
    // Simulate work with progress updates
    for ($i = 1; $i <= 3; $i++) {
        sleep(1);
        $session->notify(new LogMessage("Progress: {$i}/3", "info"));
    }
    
    // Return final result
    return new LogMessage("Task complete!", "success");
});

$peer->tick();
// client.php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

$descriptors = [0 => ['pipe', 'r'], 1 => ['pipe', 'w'], 2 => ['pipe', 'w']];
$process = proc_open('php server.php', $descriptors, $pipes);
[$stdin, $stdout, $stderr] = $pipes;

 $peer = new NativeIpcPeer();
$session = $peer->createStreamSession($stdin, $stdout, $stderr);

// Listen for progress notifications
$session->onMessage(function(Message $msg) {
    if ($msg instanceof LogMessage) {
        echo "Progress: {$msg->message}\n";
    }
});

// Send request and wait for final response
echo "Sending request...\n";
$response = $session->request(new LogMessage("Start processing"), 10.0);
echo "Final response: {$response->message}\n";

proc_close($process);

Multiple Parallel Workers

// manager.php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

// Create IPC peer
 $peer = new NativeIpcPeer();
$sessions = [];
$workers = [];

// Launch multiple worker processes
for ($i = 1; $i <= 3; $i++) {
    $descriptors = [0 => ['pipe', 'r'], 1 => ['pipe', 'w'], 2 => ['pipe', 'w']];
    $process = proc_open("php worker.php {$i}", $descriptors, $pipes);
    [$stdin, $stdout, $stderr] = $pipes;
    
    $session = $peer->createStreamSession($stdin, $stdout, $stderr);
    
    // Store session and process
    $sessions[$i] = $session;
    $workers[$i] = $process;
    
    // Listen for messages from this worker
    $session->onMessage(function(Message $msg) use ($i) {
        if ($msg instanceof LogMessage) {
            echo "Worker {$i}: {$msg->message}\n";
        }
    });
}

// Assign tasks to workers
foreach ($sessions as $id => $session) {
    $session->request(new LogMessage("Process task {$id}"), 5.0);
}

// Clean up
foreach ($workers as $process) {
    proc_close($process);
}
// worker.php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;
use StreamIpc\Message\Message;

 $peer = new NativeIpcPeer();
$session = $peer->createStdioSession();

// Get worker ID from command line
$workerId = $argv[1] ?? 'unknown';

// Handle task requests
$session->onRequest(function(Message $msg, $session) use ($workerId): Message {
    // Send some progress notifications
    $session->notify(new LogMessage("Worker {$workerId} starting task"));
    sleep(1);
    $session->notify(new LogMessage("Worker {$workerId} halfway done"));
    sleep(1);
    
    // Return final result
    return new LogMessage("Worker {$workerId} completed task");
});

$peer->tick();

Custom Message Types

Define custom message types by implementing the Message interface:

// TaskMessage.php
namespace App\Messages;

use StreamIpc\Message\Message;

final readonly class TaskMessage implements Message
{
    public function __construct(
        public string $action,
        public array $parameters = []
    ) {
    }
}
// usage.php
use StreamIpc\NativeIpcPeer;
use App\Messages\TaskMessage;

 $peer = new NativeIpcPeer();
$session = $peer->createStdioSession();

// Send a custom task message
$task = new TaskMessage('processFile', [
    'filename' => 'data.csv',
    'columns' => ['name', 'email', 'age']
]);

$session->notify($task);
// Or make a request with the custom message
$response = $session->request($task)->await();

Handling Timeouts

// client.php
use StreamIpc\NativeIpcPeer;
use StreamIpc\Message\LogMessage;

 $peer = new NativeIpcPeer();
$session = $peer->createStdioSession();

try {
    // Set a short timeout (2 seconds)
    $response = $session->request(new LogMessage("Fast request"), 2.0)->await();
    echo "Received response: {$response->message}\n";
} catch (\StreamIpc\Transport\TimeoutException $e) {
    echo "Request timed out: {$e->getMessage()}\n";
    // Handle timeout situation
}

๐Ÿ”„ Event-Driven Architecture

PHP Stream IPC uses an event-driven model where you can register handlers for different types of events:

// Register a handler for notifications
$session->onMessage(function(Message $msg, IpcSession $session) {
    if ($msg instanceof LogMessage) {
        echo "[{$msg->level}] {$msg->message}\n";
    }
});

// Register a handler for requests
$session->onRequest(function(Message $msg, IpcSession $session): ?Message {
    // Process request
    if ($msg instanceof LogMessage) {
        // Return a response
        return new LogMessage("Processed: {$msg->message}");
    }
    
    // Return null if this handler can't process the request
    return null;
});

๐Ÿ”‹ Advanced Configuration

Custom Serialization

use StreamIpc\NativeIpcPeer;
use StreamIpc\Serialization\JsonMessageSerializer;

// Create a peer with custom serializer
$peer = new NativeIpcPeer(
    new JsonMessageSerializer()
);

$session = $peer->createStdioSession();

Custom Request ID Generation

use StreamIpc\NativeIpcPeer;
use StreamIpc\Envelope\Id\RequestIdGenerator;

class UuidRequestIdGenerator implements RequestIdGenerator
{
    public function generate(): string
    {
        return sprintf(
            '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
            mt_rand(0, 0xffff), mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0x0fff) | 0x4000,
            mt_rand(0, 0x3fff) | 0x8000,
            mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff)
        );
    }
}

// Create peer with custom ID generator
$peer = new NativeIpcPeer(
    null, // use default serializer
    new UuidRequestIdGenerator()
);