riki137 / stream-ipc
Inter-Process Communication (IPC) over streams, pipes, and stdio with built-in request-response correlation, message framing, and serialization.
Requires
- php: ^8.2
Requires (Dev)
- amphp/byte-stream: ^2.1
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^12.1
- symfony/process: ^7.2
This package is auto-updated.
Last update: 2025-05-30 08:05:21 UTC
README
PHP Stream IPC is a lightweight, zero-dependency PHP library designed for robust IPC (inter-process communication) through streams, pipes, sockets, and standard I/O. Whether you're managing background jobs, orchestrating parallel tasks, or simply need efficient communication between PHP processes, PHP Stream IPC makes it straightforward, reliable, and fast.
Forget complicated setups or bloated frameworksโthis library is pure PHP, requiring no external dependencies, and seamlessly integrates with native PHP streams, Symfony's popular Process
component or AMPHP's ByteStream component (or your own adapter). It handles everything from framing messages to correlating requests and responses, enabling your applications to effortlessly communicate in real time.
๐ฅ Why choose PHP Stream IPC?
- Zero Dependencies: Lightweight, pure PHPโinstalls fast and clean.
- Reliable Messaging: Automatic message framing ensures data integrity.
- Performance-Focused: Built for speed and efficiency. You can send hundreds of messages per second.
- Built-in Request-Response Handling: Easily correlate requests with their responses, simplifying async communication.
- Flexible Serialization: Fast Native PHP serialization by default, with JSON support ready out of the box.
- Easy Integration with Symfony/AMPHP: Fits perfectly into your existing workflow.
- Real-time Notifications and Updates: Effortlessly handle real-time progress updates and event-driven messaging.
- Error and Timeout Management: Robust exception handling, graceful stream closure management, and built-in timeout control keep your processes resilient.
- Extendable by Design: Simple interfaces and clearly defined contracts mean you can easily adapt or extend functionality for your specific needs.
Whether you're building scalable PHP services, handling parallel background processing, or connecting multiple PHP scripts reliably, PHP Stream IPC gives you the control and simplicity you've been looking for.
๐ฆ Quick Installation
Install with Composer in seconds:
composer require riki137/stream-ipc
๐ฆ Quick Installation
Install via Composer:
composer require riki137/stream-ipc
โก Quick Usage Example
Parent-Child IPC Example:
Parent (parent.php
):
use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; $process = proc_open('php child.php', [ ['pipe', 'r'], ['pipe', 'w'], ['pipe', 'w'] ], $pipes); $peer = new NativeIpcPeer(); $session = $peer->createStreamSession($pipes[0], $pipes[1], $pipes[2]); $response = $session->request(new LogMessage('Ping!'))->await(); echo "Child responded: {$response->message}\n"; proc_close($process);
Child (child.php
):
use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); $session->onRequest(fn(Message $msg) => new LogMessage("Pong!")); $peer->tick();
๐ Common Use Cases
- Background Tasks: Run asynchronous workers with real-time communication.
- Multi-Process PHP Applications: Efficiently manage parallel PHP scripts.
- Real-time Progress Tracking: Provide updates on task progress via IPC.
- Server-Client PHP Scripts: Use PHP scripts as IPC-driven microservices.
๐ Understanding the Message Flow
- Direct Notifications: Send messages from one process to another with
notify()
- Request-Response: Send a request with
request()
and get a correlated response - Progress Updates: A process can send notifications while processing a request
- Event Handling: Register callbacks for messages and requests with
onMessage()
andonRequest()
๐ Message Handling
Register event-driven handlers easily:
// Notification Handler $session->onMessage(function (Message $msg) { echo "Received: {$msg->message}\n"; }); // Request Handler with Response $session->onRequest(function (Message $msg): ?Message { return new LogMessage("Processed request: {$msg->message}"); });
โณ Timeout and Exception Management
Handle request timeouts gracefully:
use StreamIpc\Transport\TimeoutException; try { $session->request(new LogMessage("Quick task"), 3.0)->await(); } catch (TimeoutException $e) { echo "Task timed out: {$e->getMessage()}\n"; }
๐ Advanced Configuration
Custom Serialization (JSON):
use StreamIpc\Serialization\JsonMessageSerializer; $peer = new NativeIpcPeer(new JsonMessageSerializer());
Custom Request ID Generation (UUID):
use StreamIpc\Envelope\Id\RequestIdGenerator; class UuidRequestIdGenerator implements RequestIdGenerator { public function generate(): string { return bin2hex(random_bytes(16)); } } $peer = new NativeIpcPeer(null, new UuidRequestIdGenerator());
๐ Development & Contribution
Contributions are welcome! To contribute:
- Fork the repository
- Create a feature branch (
git checkout -b feature/my-feature
) - Commit changes (
git commit -m "Add my feature"
) - Push your branch (
git push origin feature/my-feature
) - Open a Pull Request on GitHub
Current areas open for contributions:
- Enhancing AMPHP transport stability and tests
- Improving error handling documentation and examples
๐ License
PHP Stream IPC is open-source software licensed under the MIT License. See LICENSE for more details.
๐ SEO Keywords
IPC PHP, PHP inter-process communication, PHP streams, PHP IPC library, IPC pipes, Symfony Process IPC, asynchronous PHP, PHP messaging, PHP IPC example, PHP parallel processing
๐ Tags
php
, ipc
, symfony-process
, stream
, asynchronous
, inter-process communication
, message passing
, php-library
, ipc-framework
For issues, feature requests, or general inquiries, please open an issue.
ยฉ riki137
๐งฉ Documentation
Using Symfony's Process Component
The library works seamlessly with Symfony's Process
component. The
createSymfonyProcessSession()
helper automatically starts the process
and wires it for message passing using a Symfony InputStream
. Configure
your Process
instance (working directory, environment variables, timeouts,
etc.) before handing it to the session:
use Symfony\Component\Process\Process; $process = new Process([PHP_BINARY, 'child.php']); $process->setTimeout(0); // disable Process timeouts if desired $peer = new SymfonyIpcPeer(); $session = $peer->createSymfonyProcessSession($process); $response = $session->request(new LogMessage('Hello from parent!'), 5.0)->await(); echo "Child responded: {$response->message}\n";
You may run multiple processes in parallel and drive them all by calling
$peer->tick()
(or tickFor()
) inside your main loop.
This approach requires the symfony/process
package:
composer require symfony/process
// child.php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); // Handle requests from parent $session->onRequest(function(Message $msg, $session): Message { // Process the message from parent echo "Received from parent: {$msg->message}\n"; // Send response back to parent return new LogMessage("Hello from child!"); }); // Process messages until parent closes connection $peer->tick();
Long-Running Background Process
Create a background process that regularly sends status updates:
// backgroundWorker.php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); // Simulate background work for ($i = 1; $i <= 5; $i++) { // Do some work... sleep(1); // Send status update to parent $session->notify(new LogMessage("Progress: {$i}/5 complete", "info")); } // Send final success message $session->notify(new LogMessage("Task completed successfully", "success"));
// monitor.php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $descriptors = [ 0 => ['pipe', 'r'], 1 => ['pipe', 'w'], 2 => ['pipe', 'w'] ]; $process = proc_open('php backgroundWorker.php', $descriptors, $pipes); [$stdin, $stdout, $stderr] = $pipes; $peer = new NativeIpcPeer(); $session = $peer->createStreamSession($stdin, $stdout, $stderr); // Listen for status updates $session->onMessage(function(Message $msg) { if ($msg instanceof LogMessage) { echo "[{$msg->level}] {$msg->message}\n"; } }); // Keep processing messages until process exits while (proc_get_status($process)['running']) { $peer->tick(0.1); } proc_close($process);
Request-Response Pattern with Progress Updates
// server.php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); $session->onRequest(function(Message $msg, $session): Message { // Start processing request $session->notify(new LogMessage("Starting work", "info")); // Simulate work with progress updates for ($i = 1; $i <= 3; $i++) { sleep(1); $session->notify(new LogMessage("Progress: {$i}/3", "info")); } // Return final result return new LogMessage("Task complete!", "success"); }); $peer->tick();
// client.php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $descriptors = [0 => ['pipe', 'r'], 1 => ['pipe', 'w'], 2 => ['pipe', 'w']]; $process = proc_open('php server.php', $descriptors, $pipes); [$stdin, $stdout, $stderr] = $pipes; $peer = new NativeIpcPeer(); $session = $peer->createStreamSession($stdin, $stdout, $stderr); // Listen for progress notifications $session->onMessage(function(Message $msg) { if ($msg instanceof LogMessage) { echo "Progress: {$msg->message}\n"; } }); // Send request and wait for final response echo "Sending request...\n"; $response = $session->request(new LogMessage("Start processing"), 10.0); echo "Final response: {$response->message}\n"; proc_close($process);
Multiple Parallel Workers
// manager.php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; // Create IPC peer $peer = new NativeIpcPeer(); $sessions = []; $workers = []; // Launch multiple worker processes for ($i = 1; $i <= 3; $i++) { $descriptors = [0 => ['pipe', 'r'], 1 => ['pipe', 'w'], 2 => ['pipe', 'w']]; $process = proc_open("php worker.php {$i}", $descriptors, $pipes); [$stdin, $stdout, $stderr] = $pipes; $session = $peer->createStreamSession($stdin, $stdout, $stderr); // Store session and process $sessions[$i] = $session; $workers[$i] = $process; // Listen for messages from this worker $session->onMessage(function(Message $msg) use ($i) { if ($msg instanceof LogMessage) { echo "Worker {$i}: {$msg->message}\n"; } }); } // Assign tasks to workers foreach ($sessions as $id => $session) { $session->request(new LogMessage("Process task {$id}"), 5.0); } // Clean up foreach ($workers as $process) { proc_close($process); }
// worker.php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; use StreamIpc\Message\Message; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); // Get worker ID from command line $workerId = $argv[1] ?? 'unknown'; // Handle task requests $session->onRequest(function(Message $msg, $session) use ($workerId): Message { // Send some progress notifications $session->notify(new LogMessage("Worker {$workerId} starting task")); sleep(1); $session->notify(new LogMessage("Worker {$workerId} halfway done")); sleep(1); // Return final result return new LogMessage("Worker {$workerId} completed task"); }); $peer->tick();
Custom Message Types
Define custom message types by implementing the Message
interface:
// TaskMessage.php namespace App\Messages; use StreamIpc\Message\Message; final readonly class TaskMessage implements Message { public function __construct( public string $action, public array $parameters = [] ) { } }
// usage.php use StreamIpc\NativeIpcPeer; use App\Messages\TaskMessage; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); // Send a custom task message $task = new TaskMessage('processFile', [ 'filename' => 'data.csv', 'columns' => ['name', 'email', 'age'] ]); $session->notify($task); // Or make a request with the custom message $response = $session->request($task)->await();
Handling Timeouts
// client.php use StreamIpc\NativeIpcPeer; use StreamIpc\Message\LogMessage; $peer = new NativeIpcPeer(); $session = $peer->createStdioSession(); try { // Set a short timeout (2 seconds) $response = $session->request(new LogMessage("Fast request"), 2.0)->await(); echo "Received response: {$response->message}\n"; } catch (\StreamIpc\Transport\TimeoutException $e) { echo "Request timed out: {$e->getMessage()}\n"; // Handle timeout situation }
๐ Event-Driven Architecture
PHP Stream IPC uses an event-driven model where you can register handlers for different types of events:
// Register a handler for notifications $session->onMessage(function(Message $msg, IpcSession $session) { if ($msg instanceof LogMessage) { echo "[{$msg->level}] {$msg->message}\n"; } }); // Register a handler for requests $session->onRequest(function(Message $msg, IpcSession $session): ?Message { // Process request if ($msg instanceof LogMessage) { // Return a response return new LogMessage("Processed: {$msg->message}"); } // Return null if this handler can't process the request return null; });
๐ Advanced Configuration
Custom Serialization
use StreamIpc\NativeIpcPeer; use StreamIpc\Serialization\JsonMessageSerializer; // Create a peer with custom serializer $peer = new NativeIpcPeer( new JsonMessageSerializer() ); $session = $peer->createStdioSession();
Custom Request ID Generation
use StreamIpc\NativeIpcPeer; use StreamIpc\Envelope\Id\RequestIdGenerator; class UuidRequestIdGenerator implements RequestIdGenerator { public function generate(): string { return sprintf( '%04x%04x-%04x-%04x-%04x-%04x%04x%04x', mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0x0fff) | 0x4000, mt_rand(0, 0x3fff) | 0x8000, mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff) ); } } // Create peer with custom ID generator $peer = new NativeIpcPeer( null, // use default serializer new UuidRequestIdGenerator() );