venndev/vosaka-fourotines

Structured async programming for PHP using Fibers, inspired by Kotlin Coroutines. Features AsyncIO (non-blocking streams), ForkProcess (low-overhead child processes), Flow/SharedFlow/StateFlow with backpressure, Channels, Mutex, and cooperative scheduling. Can integrate with VOsaka.

Maintainers

Package info

github.com/vosaka-php/vosaka-foroutines

pkg:composer/venndev/vosaka-fourotines

Statistics

Installs: 24

Dependents: 3

Suggesters: 0

Stars: 2

Open Issues: 0

1.4.6 2026-03-14 08:13 UTC

README

A PHP library for structured asynchronous programming using foroutines (fiber + coroutines), inspired by Kotlin coroutines. This is project with the contribution of a project from php-async

πŸ“š Documentation

New to VOsaka Foroutines? Check out our Structured Documentation (following the DiΓ‘taxis framework), which includes:

  • Tutorials: Step-by-step learning lessons.
  • How-to Guides: Task-oriented recipes for common problems.
  • Reference: Detailed technical descriptions of the API.
  • Explanation: Conceptual overviews and architectural deep-dives.

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        main() entry point                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚ RunBlocking   β”‚   β”‚   Launch      β”‚   β”‚     Async        β”‚    β”‚
β”‚  β”‚ (drive loop)  β”‚   β”‚ (fire & wait) β”‚   β”‚ (await result)   β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚         β”‚                  β”‚                   β”‚                β”‚
β”‚         β–Ό                  β–Ό                   β–Ό                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚               Cooperative Scheduler Loop                 β”‚    β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚    β”‚
β”‚  β”‚  β”‚ AsyncIO       β”‚  WorkerPool     β”‚  Launch Queue  β”‚   β”‚    β”‚
β”‚  β”‚  β”‚ pollOnce()    β”‚  run()          β”‚  runOnce()     β”‚   β”‚    β”‚
β”‚  β”‚  β”‚ stream_select β”‚  child procs    β”‚  fiber resume  β”‚   β”‚    β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚    β”‚
β”‚  β”‚                                                         β”‚    β”‚
β”‚  β”‚  FiberPool: reusable Fiber instances (default: 10)      β”‚    β”‚
β”‚  β”‚  Idle detection β†’ usleep(500Β΅s) to prevent CPU spin     β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    Dispatchers                            β”‚   β”‚
β”‚  β”‚  DEFAULT: fibers in current process (+ AsyncIO streams)  β”‚   β”‚
β”‚  β”‚  IO:      child process (ForkProcess or symfony/process)  β”‚   β”‚
β”‚  β”‚  MAIN:    EventLoop (deferred scheduling)                 β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    Channel (4 transports)                 β”‚   β”‚
β”‚  β”‚  IN-PROCESS:  fiber ←→ fiber (in-memory array buffer)    β”‚   β”‚
β”‚  β”‚  SOCKET POOL: Channel::create() β†’ ChannelBrokerPool      β”‚   β”‚
β”‚  β”‚  SOCKET IPC:  newSocketInterProcess() β†’ ChannelBroker     β”‚   β”‚
β”‚  β”‚  FILE IPC:    newInterProcess() β†’ temp file + Mutex       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  Flow (cold) β”‚  β”‚ SharedFlow / β”‚  β”‚  WorkerPool          β”‚    β”‚
β”‚  β”‚  + buffer()  β”‚  β”‚ StateFlow    β”‚  β”‚  (task batching +    β”‚    β”‚
β”‚  β”‚  operator    β”‚  β”‚ (hot, back-  β”‚  β”‚   dynamic scaling +  β”‚    β”‚
β”‚  β”‚              β”‚  β”‚  pressure)   β”‚  β”‚   respawn backoff)   β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  Mutex       β”‚  β”‚  Select      β”‚  β”‚  Job lifecycle       β”‚    β”‚
β”‚  β”‚  (multi-proc β”‚  β”‚  (channel    β”‚  β”‚  (cancel, join,      β”‚    β”‚
β”‚  β”‚   file/sem)  β”‚  β”‚   multiplex) β”‚  β”‚   invokeOnComplete)  β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚  β”‚  Actor Model β”‚  β”‚  Supervisor Tree (OTP-style)          β”‚     β”‚
β”‚  β”‚  (mailbox +  β”‚  β”‚  ONE_FOR_ONE / ONE_FOR_ALL /          β”‚     β”‚
β”‚  β”‚   message)   β”‚  β”‚  REST_FOR_ONE + restart budget         β”‚     β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Features

Core β€” RunBlocking, Launch, Async, Async::awaitAll(), Delay, Repeat, WithTimeout, Job lifecycle

Dispatchers β€” DEFAULT (fibers + AsyncIO), IO (child process via WorkerPool), MAIN (event loop)

WorkerPool β€” Pre-spawned long-lived worker processes with task batching, dynamic pool sizing, and respawn backoff

FiberPool β€” Reusable Fiber instances for scheduler optimization (default: 10, dynamic sizing)

Channel β€” Four transports: in-process, socket pool (default), socket per-channel, file-based

AsyncIO β€” Non-blocking stream I/O via stream_select() (TCP, TLS, HTTP, files, DNS)

Flow β€” Cold Flow, SharedFlow, StateFlow with backpressure (SUSPEND, DROP_OLDEST, DROP_LATEST, ERROR)

Actor Model β€” Message-passing concurrency with Channel-based mailboxes and ActorSystem registry

Supervisor Tree β€” OTP-style supervision with ONE_FOR_ONE, ONE_FOR_ALL, REST_FOR_ONE strategies

Sync β€” Mutex (file, semaphore, APCu), Select for channel multiplexing

Rules

Rules

Requirements

  • PHP 8.2+
  • ext-shmop, ext-fileinfo, ext-zlib
Optional Extension Purpose
ext-pcntl Low-overhead IO dispatch via pcntl_fork() (~1-5ms vs ~50-200ms)
ext-sysvsem Semaphore-based Mutex
ext-apcu APCu-based Mutex

Installation

composer require venndev/vosaka-fourotines

Usage

All entry points must be wrapped in main() or use the #[AsyncMain] attribute:

use function vosaka\foroutines\main;

main(function () {
    // Your async code here
});

RunBlocking + Launch

use vosaka\foroutines\{RunBlocking, Launch, Delay, Thread};
use function vosaka\foroutines\main;

main(function () {
    RunBlocking::new(function () {
        Launch::new(function () {
            Delay::new(1000);
            var_dump('Task 1 done');
        });

        Launch::new(function () {
            Delay::new(500);
            var_dump('Task 2 done');
        });
    });
});

Async / Await

use vosaka\foroutines\{Async, Delay, Dispatchers};

// Create and await a single async task
$result = Async::new(function () {
    Delay::new(100);
    return 42;
})->await();

// Run in a separate worker process (IO dispatcher)
$io = Async::new(function () {
    return file_get_contents('data.txt');
}, Dispatchers::IO)->await();

Async::awaitAll β€” Concurrent Awaiting

awaitAll() drives multiple async tasks forward simultaneously, returning all results in order. This is significantly more efficient than awaiting sequentially.

use vosaka\foroutines\{Async, Delay};

$asyncA = Async::new(function () {
    Delay::new(500);
    return 42;
});

$asyncB = Async::new(function () {
    Delay::new(800);
    return 'hello';
});

$asyncC = Async::new(function () {
    Delay::new(300);
    return 100;
});

// All three run concurrently β€” total time β‰ˆ 800ms, not 1600ms
[$a, $b, $c] = Async::awaitAll($asyncA, $asyncB, $asyncC);

// Also works with spread operator
$results = Async::awaitAll(...$arrayOfAsyncs);

WithTimeout

use vosaka\foroutines\{WithTimeout, WithTimeoutOrNull, Delay};

// Throws RuntimeException if exceeded
$val = WithTimeout::new(2000, function () {
    Delay::new(1000);
    return 'ok';
});

// Returns null instead of throwing
$val = WithTimeoutOrNull::new(500, function () {
    Delay::new(3000);
    return 'too slow';
});

Job Lifecycle

use vosaka\foroutines\Launch;

$job = Launch::new(function () {
    Delay::new(5000);
    return 'done';
});

$job->invokeOnCompletion(function ($j) {
    var_dump('Job finished: ' . $j->getStatus()->name);
});

$job->cancelAfter(2.0);

Channel

Mode Factory Use Case
In-process Channel::new(capacity) Fibers in the same process
Socket pool (default) Channel::create(capacity) IPC via shared ChannelBrokerPool
Socket per-channel Channel::newSocketInterProcess(name, capacity) Legacy β€” 1 process per channel
File-based Channel::newInterProcess(name, capacity) IPC via temp file + mutex
use vosaka\foroutines\channel\Channel;
use vosaka\foroutines\{RunBlocking, Launch, Dispatchers, Thread};
use function vosaka\foroutines\main;

main(function () {
    $ch = Channel::create(5);   // pool-backed IPC channel

    RunBlocking::new(function () use ($ch) {
        Launch::new(function () use ($ch) {
            $ch->connect();     // reconnect in child process
            $ch->send('from child 1');
            $ch->send('from child 2');
        }, Dispatchers::IO);

        Launch::new(function () use ($ch) {
            var_dump($ch->receive()); // "from child 1"
            var_dump($ch->receive()); // "from child 2"
        });

        $ch->close();
    });
});

Non-blocking operations:

$ok  = $ch->trySend(42);     // false if buffer full
$val = $ch->tryReceive();    // null if buffer empty

Channels utility class:

use vosaka\foroutines\channel\Channels;

$merged  = Channels::merge($ch1, $ch2, $ch3);
$doubled = Channels::map($ch, fn($v) => $v * 2);
$evens   = Channels::filter($ch, fn($v) => $v % 2 === 0);
$first3  = Channels::take($ch, 3);
$zipped  = Channels::zip($ch1, $ch2);
$nums    = Channels::range(1, 100);
$ticks   = Channels::timer(500, maxTicks: 10);

Select

use vosaka\foroutines\channel\Channel;
use vosaka\foroutines\selects\Select;

$ch1 = Channel::new(1);
$ch2 = Channel::new(1);
$ch1->send('from ch1');

$result = (new Select())
    ->onReceive($ch1, fn($v) => "Got: $v")
    ->onReceive($ch2, fn($v) => "Got: $v")
    ->default('nothing ready')
    ->execute();

Flow

use vosaka\foroutines\flow\{Flow, SharedFlow, MutableStateFlow, BackpressureStrategy};

// Cold Flow
Flow::of(1, 2, 3, 4, 5)
    ->filter(fn($v) => $v % 2 === 0)
    ->map(fn($v) => $v * 10)
    ->collect(fn($v) => var_dump($v)); // 20, 40

// SharedFlow with backpressure
$flow = SharedFlow::new(
    replay: 3,
    extraBufferCapacity: 10,
    onBufferOverflow: BackpressureStrategy::DROP_OLDEST,
);

// StateFlow
$state = MutableStateFlow::new(0);
$state->collect(fn($v) => var_dump("State: $v"));
$state->emit(1);

// Cold Flow with buffer operator
Flow::fromArray(range(1, 1000))
    ->filter(fn($v) => $v % 2 === 0)
    ->buffer(capacity: 64, onOverflow: BackpressureStrategy::SUSPEND)
    ->collect(fn($v) => process($v));

AsyncIO β€” Non-blocking Stream I/O

All methods return Deferred β€” a lazy wrapper that executes on ->await():

use vosaka\foroutines\AsyncIO;

$body   = AsyncIO::httpGet('https://example.com')->await();
$data   = AsyncIO::fileGetContents('/path/to/file')->await();
$socket = AsyncIO::tcpConnect('example.com', 80)->await();
$ip     = AsyncIO::dnsResolve('example.com')->await();
Method Returns Description
tcpConnect(host, port)->await() resource Non-blocking TCP connection
tlsConnect(host, port)->await() resource Non-blocking TLS/SSL connection
streamRead(stream, maxBytes)->await() string Read up to N bytes
streamReadAll(stream)->await() string Read until EOF
streamWrite(stream, data)->await() int Write data
httpGet(url)->await() string HTTP GET
httpPost(url, body)->await() string HTTP POST
fileGetContents(path)->await() string Read entire file
filePutContents(path, data)->await() int Write file
dnsResolve(hostname)->await() string Resolve hostname to IP

Mutex

use vosaka\foroutines\sync\Mutex;

Mutex::protect('my-resource', function () {
    file_put_contents('shared.txt', 'safe write');
});

Dispatchers

Dispatcher Description
DEFAULT Runs in the current fiber context (+ AsyncIO for non-blocking streams)
IO Offloads to a worker process via WorkerPool
MAIN Schedules on the main event loop
use vosaka\foroutines\{RunBlocking, Launch, Dispatchers, Thread};

RunBlocking::new(function () {
    Launch::new(fn() => heavy_io_work(), Dispatchers::IO);
});

Thread::await()

While RunBlocking automatically drains all pending tasks before returning, Thread::await() allows you to manually block and drive the event loop until all work (Launch jobs, WorkerPool tasks, and AsyncIO) is finished.

When do you need it?

  • Inside RunBlocking: If you want to ensure all background tasks (like Launch jobs) are completed before proceeding to the next line of code within the same RunBlocking block.
  • Outside RunBlocking: When you are using AsyncMain or main() and have scheduled tasks that need to be completed before the script exits, but you aren't using a blocking runner.
RunBlocking::new(function () {
    Launch::new(fn() => print("A"));
    Thread::await(); // Blocks here until "A" is printed
    print("B");      // Always prints after "A"
});

WorkerPool

A pool of pre-spawned long-lived child processes. On Linux/macOS uses pcntl_fork() + Unix socket pairs; on Windows uses proc_open() + TCP loopback sockets.

use vosaka\foroutines\WorkerPool;

WorkerPool::setPoolSize(8);

$result = WorkerPool::addAsync(function () {
    return 'processed';
})->await();

Task Batching

When many small tasks are submitted, IPC round-trip overhead dominates. Task batching groups multiple tasks into a single message sent to each worker, dramatically reducing round-trips.

batchSize=1 (default):  Parent ──TASK:A──▢ Worker ──RESULT:A──▢ Parent  (1000 round-trips for 1000 tasks)
batchSize=5:            Parent ──BATCH:[A,B,C,D,E]──▢ Worker ──BATCH_RESULTS:[A,B,C,D,E]──▢ Parent  (200 round-trips)
use vosaka\foroutines\WorkerPool;

// Group up to 5 tasks per worker message
WorkerPool::setBatchSize(5);
Batch Size Behavior
1 (default) Original single-task protocol β€” lowest latency per task
5–10 Good balance for many small/fast tasks
20–50 Maximum throughput for trivial tasks

Batching is fully backward compatible β€” when batchSize=1, the pool uses the original TASK:/RESULT: protocol.

Dynamic Pool Sizing

The pool can automatically scale between a minimum and maximum number of workers based on workload pressure.

use vosaka\foroutines\WorkerPool;

WorkerPool::setPoolSize(4);    // initial workers at boot

WorkerPool::setDynamicScaling(
    enabled: true,
    minPoolSize: 2,            // always keep at least 2 workers alive
    maxPoolSize: 8,            // never exceed 8 workers
    idleTimeout: 10.0,         // shut down a worker after 10s idle
    scaleUpCooldown: 0.5,      // wait 0.5s between scale-ups
    scaleDownCooldown: 5.0,    // wait 5s between scale-downs
);

Scale-up: When all workers are busy and tasks are queued, a new worker is spawned (up to maxPoolSize).

Scale-down: When a worker has been idle longer than idleTimeout and the pool exceeds minPoolSize, it is shut down.

Workload spike:    2 workers β†’ 4 β†’ 6 β†’ 8 (max)
Workload drops:    8 workers β†’ 6 β†’ 4 β†’ 2 (min, after idle timeout)

When dynamic scaling is disabled (default), the pool behaves exactly as before β€” a fixed number of workers.

Worker Respawn Backoff

When a worker crashes repetitively, respawning uses exponential backoff (100ms β†’ 200ms β†’ … max 30s) to prevent CPU spin. After 10 consecutive failures, the worker slot is removed (circuit-breaker).

// Customizable
WorkerPoolState::$maxRespawnAttempts = 10;
WorkerPoolState::$respawnBaseDelayMs = 100;

FiberPool

Reusable Fiber instances to reduce allocation overhead. Integrated into Launch, Async, RunBlocking.

use vosaka\foroutines\FiberPool;

// Adjust global pool size
FiberPool::setDefaultSize(20);

// Direct usage (zero-alloc reuse after first run)
$pool = new FiberPool(maxSize: 10);
$result = $pool->run(fn() => heavyComputation());

Actor Model

use vosaka\foroutines\actor\{Actor, Message, ActorSystem};

class GreeterActor extends Actor {
    protected function receive(Message $msg): void {
        echo "Hello, {$msg->payload}!\n";
    }
}

main(function () {
    RunBlocking::new(function () {
        $system = ActorSystem::new()
            ->register(new GreeterActor('greeter'));

        $system->startAll();
        $system->send('greeter', Message::of('greet', 'World'));

        Delay::new(100);
        $system->stopAll();
    });
});

Supervisor Tree

OTP-style supervision with automatic restart on child failure.

use vosaka\foroutines\supervisor\{Supervisor, RestartStrategy};

main(function () {
    RunBlocking::new(function () {
        Supervisor::new(RestartStrategy::ONE_FOR_ONE)
            ->child(fn() => workerA(), 'worker-a')
            ->child(fn() => workerB(), 'worker-b', maxRestarts: 5)
            ->start();
    });
});
Strategy Behavior
ONE_FOR_ONE Restart only the crashed child
ONE_FOR_ALL Restart all children
REST_FOR_ONE Restart crashed child + all started after it

ForkProcess

On Linux/macOS, ForkProcess creates child processes by forking the current process instead of spawning a new interpreter:

Strategy Overhead Closure Serialization
ForkProcess (pcntl_fork) ~1-5ms Not needed (memory copied)
Process (symfony/process) ~50-200ms Required

Selection is automatic β€” Worker uses fork when available, falls back to symfony/process on Windows.

Platform Support

Feature Linux/macOS Windows
Fibers (core) βœ… βœ…
FiberPool βœ… βœ…
AsyncIO (stream_select) βœ… βœ…
Channel (all transports) βœ… βœ…
Actor Model βœ… βœ…
Supervisor Tree βœ… βœ…
WorkerPool (fork mode) βœ… ❌ (uses socket mode)
WorkerPool (socket mode) βœ… βœ…
ForkProcess (pcntl_fork) βœ… ❌ (fallback to symfony/process)
Mutex (file lock) βœ… βœ…
Mutex (semaphore) βœ… (ext-sysvsem) ❌
Mutex (APCu) βœ… (ext-apcu) βœ… (ext-apcu)

Comparison with JavaScript Async

Aspect Node.js VOsaka Foroutines
Runtime libuv event loop (C) PHP Fibers + stream_select
I/O model Non-blocking by default AsyncIO for streams; Dispatchers::IO for blocking APIs
Concurrency Single-threaded + worker threads Single process + child processes (fork/spawn)
Syntax async/await (language-level) Async::new()->await() / Async::awaitAll() (library-level)
Worker pool worker_threads WorkerPool with task batching + dynamic scaling
IPC channels MessagePort Channel::create() (shared TCP pool)
Flow control Node.js Streams BackpressureStrategy (SUSPEND/DROP/ERROR)

License

GNU Lesser General Public License v2.1