smwks/superprocess

A fluent PHP library for supervised master-child process control using pcntl and pipes

Maintainers

Package info

github.com/smwks/superprocess

pkg:composer/smwks/superprocess

Fund package maintenance!

ralphschindler

Statistics

Installs: 32

Dependents: 1

Suggesters: 0

Stars: 0

Open Issues: 0

v0.1.1 2026-02-23 23:19 UTC

This package is auto-updated.

Last update: 2026-03-02 16:49:20 UTC


README

+--------------------------------+
|                                |
|  >> SuperProcess               |
|                                |
|  [master] --+--> [worker]      |
|             +--> [worker]      |
|             +--> [worker]      |
|                                |
|  fork · supervise · scale      |
+--------------------------------+

A fluent PHP library for supervised master-child process control using pcntl and pipes. Run one or more copies of a command or PHP closure, keep them alive automatically, communicate with them via stdin/stdout and a structured IPC channel, and scale the pool up or down at runtime.

Two features define SuperProcess:

  • Closures fork. Pass closure() any PHP closure and it becomes an independent OS process — no serialisation boilerplate, no separate file. The closure runs in a pcntl_fork() copy of the master, with a socket wired back for structured IPC.

  • The heartbeat drives the pool. Register a periodic callback that fires on the master process. Inside it you have full control: query a database, read a queue depth, inspect a config value — then call scaleUp() or scaleDown() to match worker supply to real-time demand.

Requires PHP 8.4+, ext-pcntl, ext-posix — Linux and macOS only.

Installation

composer require smwks/superprocess

Quick start

Supervise a pool of command workers:

use SMWks\SuperProcess\Child;
use SMWks\SuperProcess\CreateReason;
use SMWks\SuperProcess\ExitReason;
use SMWks\SuperProcess\SuperProcess;

$sp = new SuperProcess;
$sp->command('php artisan inspire:loop')
   ->scaleLimits(min: 2, max: 5)
   ->onChildCreate(fn (Child $c, CreateReason $r) => printf("[master] spawned %d\n", $c->pid))
   ->onChildExit(fn (Child $c, ExitReason $r)    => printf("[master] %d exited\n",   $c->pid))
   ->onChildOutput(fn (Child $c, string $data)   => print $data)
   ->run(); // blocks until SIGTERM is received

Fork a closure as a worker process:

use SMWks\SuperProcess\Child;
use SMWks\SuperProcess\SuperProcess;

// The closure is forked by the master — no separate file needed.
$sp = new SuperProcess;
$sp->closure(function (mixed $socket): void {
        for ($i = 1; $i <= 5; $i++) {
            fwrite($socket, json_encode(['step' => $i]) . "\n");
            sleep(1);
        }
    })
    ->scaleLimits(min: 3, max: 3)
    ->onChildMessage(fn (Child $c, mixed $msg) => printf("[%d] step %d\n", $c->pid, $msg['step']))
    ->run();

Drive pool size from an external source via heartbeat:

use SMWks\SuperProcess\SuperProcess;

// Every 5 seconds the master checks the queue depth and adjusts the pool.
$sp = new SuperProcess;
$sp->command('php artisan queue:work')
   ->scaleLimits(min: 1, max: 20)
   ->heartbeat(5, function (SuperProcess $sp): void {
       $depth = (int) DB::scalar('SELECT COUNT(*) FROM jobs WHERE queue = ?', ['default']);

       if ($depth > 100) { $sp->scaleUp();   } // no-op if already at max
       if ($depth < 10)  { $sp->scaleDown(); } // no-op if already at min
   })
   ->run();

Concepts

Command children

command(string $cmd) runs an external process for each child slot. The command is started with proc_open(), so it inherits PATH and environment variables. Each child gets four file descriptors:

fd direction purpose
0 — stdin master → child sendChildInput()
1 — stdout child → master onChildOutput()
2 — stderr child → master onChildOutput()
3 — IPC child → master onChildMessage() (JSON lines)

All pipes are non-blocking; reads happen inside the event loop via stream_select().

Closure children

closure(Closure $fn) forks the master with pcntl_fork() and runs the closure in the child. The closure receives a socket resource as its only argument — write JSON lines to it to send structured messages to the master via onChildMessage().

$sp->closure(function (mixed $socket): void {
    for ($i = 1; $i <= 5; $i++) {
        fwrite($socket, json_encode(['progress' => $i * 20]) . "\n");
        sleep(1);
    }
});

The closure captures its surrounding scope at the moment run() is called — that is when the fork actually happens. Any variable in scope at that point is available inside the child. Because the fork copies the master's entire memory image, expensive one-time work (bootstrapping a framework, parsing config, establishing a schema) is paid once in the master and shared copy-on-write across every worker.

Child lifecycle

On startup run() spawns min children. When a child exits:

  1. onChildExit fires with an ExitReason.
  2. If running count drops below min, a replacement is spawned with CreateReason::Replacement.

The master never exits the event loop on its own — send it SIGTERM or SIGINT (or call signal(posix_getpid(), ProcessSignal::Stop) from within a callback) to trigger a graceful shutdown.

Heartbeat

heartbeat(int $intervalSeconds, Closure $fn) registers a callback that fires on the master process at a regular interval throughout the event loop. The callback receives the live SuperProcess instance, giving it full runtime control.

This is the primary mechanism for externally-driven pool management. Instead of sizing the pool statically at startup, query whatever source of truth you have — a queue depth, a database flag, a config value — and nudge the pool toward the right size:

$sp->command('php artisan queue:work')
   ->scaleLimits(min: 1, max: 20)
   ->heartbeat(5, function (SuperProcess $sp): void {
       $depth = (int) DB::scalar('SELECT COUNT(*) FROM jobs WHERE queue = ?', ['default']);

       if ($depth > 100) { $sp->scaleUp();   } // no-op if already at max
       if ($depth < 10)  { $sp->scaleDown(); } // no-op if already at min
   })
   ->run();

Because the heartbeat runs on the master — the process that owns the event loop — it is single-threaded and safe to call without locks. scaleUp() and scaleDown() each step by one and are no-ops when the pool is already at the configured limit.

Graceful shutdown

On SIGTERM or SIGINT (Ctrl+C) the master:

  1. Runs the onShutdown callback, if registered, while all children are still alive.
  2. Sends SIGTERM to every child.
  3. Waits up to 5 seconds for each to exit.
  4. Sends SIGKILL to any that remain.

The onShutdown callback is the right place to flush state, close connections, or send a final message to children before they are signalled:

$sp->onShutdown(function (SuperProcess $sp): void {
    echo "Shutting down — waiting for workers to finish current jobs\n";
})
->run();

API reference

Configuration

// Set the command to run in each child (mutually exclusive with closure())
->command(string $command): static

// Set a PHP closure to run in each child (mutually exclusive with command())
->closure(Closure $fn): static   // fn(resource $socket): void

// Set the min/max number of running children (default: 1, 1)
->scaleLimits(int $min, int $max): static

// Register a periodic master heartbeat
->heartbeat(int $intervalSeconds, Closure $fn): static  // fn(SuperProcess $self): void

Callbacks

// Called when a child is spawned
->onChildCreate(Closure $fn): static   // fn(Child $child, CreateReason $reason): void

// Called when a child exits
->onChildExit(Closure $fn): static     // fn(Child $child, ExitReason $reason): void

// Called when SIGUSR1 or SIGUSR2 is received by the master
->onChildSignal(Closure $fn): static   // fn(Child $child, int $signal): void

// Called for each JSON message received on the child's IPC channel
->onChildMessage(Closure $fn): static  // fn(Child $child, mixed $message): void

// Called with raw stdout/stderr data from a command child
->onChildOutput(Closure $fn): static   // fn(Child $child, string $data): void

// Called once on shutdown (SIGTERM or SIGINT), before children are signalled
->onShutdown(Closure $fn): static      // fn(SuperProcess $self): void

Runtime control

// Write to a running child's stdin
->sendChildInput(int $pid, string $data): void

// Send any POSIX signal to a PID (use ProcessSignal constants)
->signal(string|int $pid, ProcessSignal $signal): void

// Spawn one more child (if below max)
->scaleUp(): static

// Terminate one child (if above min)
->scaleDown(): static

// Start the blocking event loop
->run(): void

Enums and constants

// Why a child was created
CreateReason::Initial       // first spawn on run()
CreateReason::Replacement   // auto-restarted after exit
CreateReason::ScaleUp       // spawned by scaleUp()

// Why a child exited
ExitReason::Normal          // exited via exit() / end of script
ExitReason::Signal          // terminated by a signal (SIGTERM etc.)
ExitReason::Killed          // force-killed with SIGKILL
ExitReason::Unknown         // status could not be determined

// Signal shortcuts (values map to POSIX signal numbers)
ProcessSignal::Stop         // SIGTERM — graceful stop
ProcessSignal::Kill         // SIGKILL — force kill
ProcessSignal::Reload       // SIGHUP  — reload
ProcessSignal::Usr1         // SIGUSR1
ProcessSignal::Usr2         // SIGUSR2

Child properties

$child->pid            // int  — process ID
$child->createReason   // CreateReason
$child->running        // bool — false once the process has exited
$child->exitCode       // int  — exit code (populated after exit)
$child->exitReason     // ExitReason (populated after exit)

Sending structured messages from a command child

Write newline-delimited JSON to file descriptor 3. The master delivers each parsed line to onChildMessage.

// child-worker.php
$ipc = fopen('php://fd/3', 'w');

fwrite($ipc, json_encode(['type' => 'started', 'pid' => getmypid()]) . "\n");

// ... do work ...

fwrite($ipc, json_encode(['type' => 'done', 'items_processed' => 1234]) . "\n");
fclose($ipc);
// supervisor
$sp->command('php child-worker.php')
   ->onChildMessage(function (Child $child, mixed $msg): void {
       echo "[{$child->pid}] {$msg['type']}\n";
   });

Signals

Signal received by master Behaviour
SIGTERM Graceful shutdown — fires onShutdown, then drains children
SIGINT Graceful shutdown — same as SIGTERM (handles Ctrl+C)
SIGHUP Forwarded to all children
SIGCHLD Internal — triggers zombie reaping and pool replenishment
SIGUSR1 Fires onChildSignal for every running child
SIGUSR2 Fires onChildSignal for every running child

Development

composer lint          # fix code style with Pint
composer refactor      # apply Rector suggestions
composer test:lint     # check code style
composer test:types    # PHPStan (max level)
composer test:unit     # Pest unit tests
composer test          # run all checks

SuperProcess is open-sourced under the MIT license.