venndev / vosaka-fourotines
Structured async programming for PHP using Fibers, inspired by Kotlin Coroutines. Features AsyncIO (non-blocking streams), ForkProcess (low-overhead child processes), Flow/SharedFlow/StateFlow with backpressure, Channels, Mutex, and cooperative scheduling. Can integrate with VOsaka.
Requires
- php: >=8.1
- ext-fileinfo: *
- ext-shmop: *
- ext-zlib: *
- laravel/serializable-closure: ^2.0
- symfony/process: ^7.3
Suggests
- ext-apcu: Enables APCu-based Mutex locking (in-memory, very fast). Requires APCu extension.
- ext-event: Future: potential integration for epoll/kqueue-based event loop (not yet used).
- ext-pcntl: Enables ForkProcess for low-overhead IO dispatch via pcntl_fork() (~1-5ms vs ~50-200ms). Linux/macOS only.
- ext-posix: Used by ForkProcess for immediate child exit via posix_kill(). Linux/macOS only.
- ext-sysvsem: Enables semaphore-based Mutex locking (faster than file locks). Linux/macOS only.
- dev-main
- 1.4.6
- 1.4.5
- 1.4.4
- 1.4.3
- 1.4.2
- 1.4.1
- 1.4.0
- 1.3.9.1
- 1.3.9
- 1.3.8
- 1.3.7
- 1.3.6
- 1.3.5
- 1.3.4
- 1.3.3
- 1.3.2
- 1.3.1
- 1.3.0
- 1.2.9
- 1.2.8
- 1.2.7
- 1.2.6
- 1.2.5
- 1.2.4
- 1.2.3
- 1.2.2
- 1.2.1
- 1.2.0
- 1.1.9
- 1.1.8
- 1.1.7
- 1.1.6
- 1.1.5
- 1.1.4
- 1.1.3
- 1.1.2
- 1.1.1
- 1.1.0
- 1.0.9.1
- 1.0.9
- 1.0.8
- 1.0.7
- 1.0.6
- 1.0.5
- 1.0.4
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
This package is auto-updated.
Last update: 2026-03-14 08:17:38 UTC
README
A PHP library for structured asynchronous programming using foroutines (fiber + coroutines), inspired by Kotlin coroutines. This is project with the contribution of a project from php-async
π Documentation
New to VOsaka Foroutines? Check out our Structured Documentation (following the DiΓ‘taxis framework), which includes:
- Tutorials: Step-by-step learning lessons.
- How-to Guides: Task-oriented recipes for common problems.
- Reference: Detailed technical descriptions of the API.
- Explanation: Conceptual overviews and architectural deep-dives.
Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β main() entry point β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β
β β RunBlocking β β Launch β β Async β β
β β (drive loop) β β (fire & wait) β β (await result) β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Cooperative Scheduler Loop β β
β β βββββββββββββββββ¬ββββββββββββββββββ¬βββββββββββββββββ β β
β β β AsyncIO β WorkerPool β Launch Queue β β β
β β β pollOnce() β run() β runOnce() β β β
β β β stream_select β child procs β fiber resume β β β
β β βββββββββββββββββ΄ββββββββββββββββββ΄βββββββββββββββββ β β
β β β β
β β FiberPool: reusable Fiber instances (default: 10) β β
β β Idle detection β usleep(500Β΅s) to prevent CPU spin β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Dispatchers β β
β β DEFAULT: fibers in current process (+ AsyncIO streams) β β
β β IO: child process (ForkProcess or symfony/process) β β
β β MAIN: EventLoop (deferred scheduling) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Channel (4 transports) β β
β β IN-PROCESS: fiber ββ fiber (in-memory array buffer) β β
β β SOCKET POOL: Channel::create() β ChannelBrokerPool β β
β β SOCKET IPC: newSocketInterProcess() β ChannelBroker β β
β β FILE IPC: newInterProcess() β temp file + Mutex β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ ββββββββββββββββββββββββ β
β β Flow (cold) β β SharedFlow / β β WorkerPool β β
β β + buffer() β β StateFlow β β (task batching + β β
β β operator β β (hot, back- β β dynamic scaling + β β
β β β β pressure) β β respawn backoff) β β
β βββββββββββββββ βββββββββββββββ ββββββββββββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ ββββββββββββββββββββββββ β
β β Mutex β β Select β β Job lifecycle β β
β β (multi-proc β β (channel β β (cancel, join, β β
β β file/sem) β β multiplex) β β invokeOnComplete) β β
β βββββββββββββββ βββββββββββββββ ββββββββββββββββββββββββ β
β β
β βββββββββββββββ ββββββββββββββββββββββββββββββββββββββββ β
β β Actor Model β β Supervisor Tree (OTP-style) β β
β β (mailbox + β β ONE_FOR_ONE / ONE_FOR_ALL / β β
β β message) β β REST_FOR_ONE + restart budget β β
β βββββββββββββββ ββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Features
Core β RunBlocking, Launch, Async, Async::awaitAll(), Delay, Repeat, WithTimeout, Job lifecycle
Dispatchers β DEFAULT (fibers + AsyncIO), IO (child process via WorkerPool), MAIN (event loop)
WorkerPool β Pre-spawned long-lived worker processes with task batching, dynamic pool sizing, and respawn backoff
FiberPool β Reusable Fiber instances for scheduler optimization (default: 10, dynamic sizing)
Channel β Four transports: in-process, socket pool (default), socket per-channel, file-based
AsyncIO β Non-blocking stream I/O via stream_select() (TCP, TLS, HTTP, files, DNS)
Flow β Cold Flow, SharedFlow, StateFlow with backpressure (SUSPEND, DROP_OLDEST, DROP_LATEST, ERROR)
Actor Model β Message-passing concurrency with Channel-based mailboxes and ActorSystem registry
Supervisor Tree β OTP-style supervision with ONE_FOR_ONE, ONE_FOR_ALL, REST_FOR_ONE strategies
Sync β Mutex (file, semaphore, APCu), Select for channel multiplexing
Rules
Requirements
- PHP 8.2+
- ext-shmop, ext-fileinfo, ext-zlib
| Optional Extension | Purpose |
|---|---|
| ext-pcntl | Low-overhead IO dispatch via pcntl_fork() (~1-5ms vs ~50-200ms) |
| ext-sysvsem | Semaphore-based Mutex |
| ext-apcu | APCu-based Mutex |
Installation
composer require venndev/vosaka-fourotines
Usage
All entry points must be wrapped in main() or use the #[AsyncMain] attribute:
use function vosaka\foroutines\main; main(function () { // Your async code here });
RunBlocking + Launch
use vosaka\foroutines\{RunBlocking, Launch, Delay, Thread}; use function vosaka\foroutines\main; main(function () { RunBlocking::new(function () { Launch::new(function () { Delay::new(1000); var_dump('Task 1 done'); }); Launch::new(function () { Delay::new(500); var_dump('Task 2 done'); }); }); });
Async / Await
use vosaka\foroutines\{Async, Delay, Dispatchers}; // Create and await a single async task $result = Async::new(function () { Delay::new(100); return 42; })->await(); // Run in a separate worker process (IO dispatcher) $io = Async::new(function () { return file_get_contents('data.txt'); }, Dispatchers::IO)->await();
Async::awaitAll β Concurrent Awaiting
awaitAll() drives multiple async tasks forward simultaneously, returning all results in order. This is significantly more efficient than awaiting sequentially.
use vosaka\foroutines\{Async, Delay}; $asyncA = Async::new(function () { Delay::new(500); return 42; }); $asyncB = Async::new(function () { Delay::new(800); return 'hello'; }); $asyncC = Async::new(function () { Delay::new(300); return 100; }); // All three run concurrently β total time β 800ms, not 1600ms [$a, $b, $c] = Async::awaitAll($asyncA, $asyncB, $asyncC); // Also works with spread operator $results = Async::awaitAll(...$arrayOfAsyncs);
WithTimeout
use vosaka\foroutines\{WithTimeout, WithTimeoutOrNull, Delay}; // Throws RuntimeException if exceeded $val = WithTimeout::new(2000, function () { Delay::new(1000); return 'ok'; }); // Returns null instead of throwing $val = WithTimeoutOrNull::new(500, function () { Delay::new(3000); return 'too slow'; });
Job Lifecycle
use vosaka\foroutines\Launch; $job = Launch::new(function () { Delay::new(5000); return 'done'; }); $job->invokeOnCompletion(function ($j) { var_dump('Job finished: ' . $j->getStatus()->name); }); $job->cancelAfter(2.0);
Channel
| Mode | Factory | Use Case |
|---|---|---|
| In-process | Channel::new(capacity) |
Fibers in the same process |
| Socket pool (default) | Channel::create(capacity) |
IPC via shared ChannelBrokerPool |
| Socket per-channel | Channel::newSocketInterProcess(name, capacity) |
Legacy β 1 process per channel |
| File-based | Channel::newInterProcess(name, capacity) |
IPC via temp file + mutex |
use vosaka\foroutines\channel\Channel; use vosaka\foroutines\{RunBlocking, Launch, Dispatchers, Thread}; use function vosaka\foroutines\main; main(function () { $ch = Channel::create(5); // pool-backed IPC channel RunBlocking::new(function () use ($ch) { Launch::new(function () use ($ch) { $ch->connect(); // reconnect in child process $ch->send('from child 1'); $ch->send('from child 2'); }, Dispatchers::IO); Launch::new(function () use ($ch) { var_dump($ch->receive()); // "from child 1" var_dump($ch->receive()); // "from child 2" }); $ch->close(); }); });
Non-blocking operations:
$ok = $ch->trySend(42); // false if buffer full $val = $ch->tryReceive(); // null if buffer empty
Channels utility class:
use vosaka\foroutines\channel\Channels; $merged = Channels::merge($ch1, $ch2, $ch3); $doubled = Channels::map($ch, fn($v) => $v * 2); $evens = Channels::filter($ch, fn($v) => $v % 2 === 0); $first3 = Channels::take($ch, 3); $zipped = Channels::zip($ch1, $ch2); $nums = Channels::range(1, 100); $ticks = Channels::timer(500, maxTicks: 10);
Select
use vosaka\foroutines\channel\Channel; use vosaka\foroutines\selects\Select; $ch1 = Channel::new(1); $ch2 = Channel::new(1); $ch1->send('from ch1'); $result = (new Select()) ->onReceive($ch1, fn($v) => "Got: $v") ->onReceive($ch2, fn($v) => "Got: $v") ->default('nothing ready') ->execute();
Flow
use vosaka\foroutines\flow\{Flow, SharedFlow, MutableStateFlow, BackpressureStrategy}; // Cold Flow Flow::of(1, 2, 3, 4, 5) ->filter(fn($v) => $v % 2 === 0) ->map(fn($v) => $v * 10) ->collect(fn($v) => var_dump($v)); // 20, 40 // SharedFlow with backpressure $flow = SharedFlow::new( replay: 3, extraBufferCapacity: 10, onBufferOverflow: BackpressureStrategy::DROP_OLDEST, ); // StateFlow $state = MutableStateFlow::new(0); $state->collect(fn($v) => var_dump("State: $v")); $state->emit(1); // Cold Flow with buffer operator Flow::fromArray(range(1, 1000)) ->filter(fn($v) => $v % 2 === 0) ->buffer(capacity: 64, onOverflow: BackpressureStrategy::SUSPEND) ->collect(fn($v) => process($v));
AsyncIO β Non-blocking Stream I/O
All methods return Deferred β a lazy wrapper that executes on ->await():
use vosaka\foroutines\AsyncIO; $body = AsyncIO::httpGet('https://example.com')->await(); $data = AsyncIO::fileGetContents('/path/to/file')->await(); $socket = AsyncIO::tcpConnect('example.com', 80)->await(); $ip = AsyncIO::dnsResolve('example.com')->await();
| Method | Returns | Description |
|---|---|---|
tcpConnect(host, port)->await() |
resource |
Non-blocking TCP connection |
tlsConnect(host, port)->await() |
resource |
Non-blocking TLS/SSL connection |
streamRead(stream, maxBytes)->await() |
string |
Read up to N bytes |
streamReadAll(stream)->await() |
string |
Read until EOF |
streamWrite(stream, data)->await() |
int |
Write data |
httpGet(url)->await() |
string |
HTTP GET |
httpPost(url, body)->await() |
string |
HTTP POST |
fileGetContents(path)->await() |
string |
Read entire file |
filePutContents(path, data)->await() |
int |
Write file |
dnsResolve(hostname)->await() |
string |
Resolve hostname to IP |
Mutex
use vosaka\foroutines\sync\Mutex; Mutex::protect('my-resource', function () { file_put_contents('shared.txt', 'safe write'); });
Dispatchers
| Dispatcher | Description |
|---|---|
DEFAULT |
Runs in the current fiber context (+ AsyncIO for non-blocking streams) |
IO |
Offloads to a worker process via WorkerPool |
MAIN |
Schedules on the main event loop |
use vosaka\foroutines\{RunBlocking, Launch, Dispatchers, Thread}; RunBlocking::new(function () { Launch::new(fn() => heavy_io_work(), Dispatchers::IO); });
Thread::await()
While RunBlocking automatically drains all pending tasks before returning, Thread::await() allows you to manually block and drive the event loop until all work (Launch jobs, WorkerPool tasks, and AsyncIO) is finished.
When do you need it?
- Inside
RunBlocking: If you want to ensure all background tasks (likeLaunchjobs) are completed before proceeding to the next line of code within the sameRunBlockingblock. - Outside
RunBlocking: When you are usingAsyncMainormain()and have scheduled tasks that need to be completed before the script exits, but you aren't using a blocking runner.
RunBlocking::new(function () { Launch::new(fn() => print("A")); Thread::await(); // Blocks here until "A" is printed print("B"); // Always prints after "A" });
WorkerPool
A pool of pre-spawned long-lived child processes. On Linux/macOS uses pcntl_fork() + Unix socket pairs; on Windows uses proc_open() + TCP loopback sockets.
use vosaka\foroutines\WorkerPool; WorkerPool::setPoolSize(8); $result = WorkerPool::addAsync(function () { return 'processed'; })->await();
Task Batching
When many small tasks are submitted, IPC round-trip overhead dominates. Task batching groups multiple tasks into a single message sent to each worker, dramatically reducing round-trips.
batchSize=1 (default): Parent ββTASK:AβββΆ Worker ββRESULT:AβββΆ Parent (1000 round-trips for 1000 tasks)
batchSize=5: Parent ββBATCH:[A,B,C,D,E]βββΆ Worker ββBATCH_RESULTS:[A,B,C,D,E]βββΆ Parent (200 round-trips)
use vosaka\foroutines\WorkerPool; // Group up to 5 tasks per worker message WorkerPool::setBatchSize(5);
| Batch Size | Behavior |
|---|---|
| 1 (default) | Original single-task protocol β lowest latency per task |
| 5β10 | Good balance for many small/fast tasks |
| 20β50 | Maximum throughput for trivial tasks |
Batching is fully backward compatible β when batchSize=1, the pool uses the original TASK:/RESULT: protocol.
Dynamic Pool Sizing
The pool can automatically scale between a minimum and maximum number of workers based on workload pressure.
use vosaka\foroutines\WorkerPool; WorkerPool::setPoolSize(4); // initial workers at boot WorkerPool::setDynamicScaling( enabled: true, minPoolSize: 2, // always keep at least 2 workers alive maxPoolSize: 8, // never exceed 8 workers idleTimeout: 10.0, // shut down a worker after 10s idle scaleUpCooldown: 0.5, // wait 0.5s between scale-ups scaleDownCooldown: 5.0, // wait 5s between scale-downs );
Scale-up: When all workers are busy and tasks are queued, a new worker is spawned (up to maxPoolSize).
Scale-down: When a worker has been idle longer than idleTimeout and the pool exceeds minPoolSize, it is shut down.
Workload spike: 2 workers β 4 β 6 β 8 (max)
Workload drops: 8 workers β 6 β 4 β 2 (min, after idle timeout)
When dynamic scaling is disabled (default), the pool behaves exactly as before β a fixed number of workers.
Worker Respawn Backoff
When a worker crashes repetitively, respawning uses exponential backoff (100ms β 200ms β β¦ max 30s) to prevent CPU spin. After 10 consecutive failures, the worker slot is removed (circuit-breaker).
// Customizable WorkerPoolState::$maxRespawnAttempts = 10; WorkerPoolState::$respawnBaseDelayMs = 100;
FiberPool
Reusable Fiber instances to reduce allocation overhead. Integrated into Launch, Async, RunBlocking.
use vosaka\foroutines\FiberPool; // Adjust global pool size FiberPool::setDefaultSize(20); // Direct usage (zero-alloc reuse after first run) $pool = new FiberPool(maxSize: 10); $result = $pool->run(fn() => heavyComputation());
Actor Model
use vosaka\foroutines\actor\{Actor, Message, ActorSystem}; class GreeterActor extends Actor { protected function receive(Message $msg): void { echo "Hello, {$msg->payload}!\n"; } } main(function () { RunBlocking::new(function () { $system = ActorSystem::new() ->register(new GreeterActor('greeter')); $system->startAll(); $system->send('greeter', Message::of('greet', 'World')); Delay::new(100); $system->stopAll(); }); });
Supervisor Tree
OTP-style supervision with automatic restart on child failure.
use vosaka\foroutines\supervisor\{Supervisor, RestartStrategy}; main(function () { RunBlocking::new(function () { Supervisor::new(RestartStrategy::ONE_FOR_ONE) ->child(fn() => workerA(), 'worker-a') ->child(fn() => workerB(), 'worker-b', maxRestarts: 5) ->start(); }); });
| Strategy | Behavior |
|---|---|
ONE_FOR_ONE |
Restart only the crashed child |
ONE_FOR_ALL |
Restart all children |
REST_FOR_ONE |
Restart crashed child + all started after it |
ForkProcess
On Linux/macOS, ForkProcess creates child processes by forking the current process instead of spawning a new interpreter:
| Strategy | Overhead | Closure Serialization |
|---|---|---|
ForkProcess (pcntl_fork) |
~1-5ms | Not needed (memory copied) |
Process (symfony/process) |
~50-200ms | Required |
Selection is automatic β Worker uses fork when available, falls back to symfony/process on Windows.
Platform Support
| Feature | Linux/macOS | Windows |
|---|---|---|
| Fibers (core) | β | β |
| FiberPool | β | β |
| AsyncIO (stream_select) | β | β |
| Channel (all transports) | β | β |
| Actor Model | β | β |
| Supervisor Tree | β | β |
| WorkerPool (fork mode) | β | β (uses socket mode) |
| WorkerPool (socket mode) | β | β |
| ForkProcess (pcntl_fork) | β | β (fallback to symfony/process) |
| Mutex (file lock) | β | β |
| Mutex (semaphore) | β (ext-sysvsem) | β |
| Mutex (APCu) | β (ext-apcu) | β (ext-apcu) |
Comparison with JavaScript Async
| Aspect | Node.js | VOsaka Foroutines |
|---|---|---|
| Runtime | libuv event loop (C) | PHP Fibers + stream_select |
| I/O model | Non-blocking by default | AsyncIO for streams; Dispatchers::IO for blocking APIs |
| Concurrency | Single-threaded + worker threads | Single process + child processes (fork/spawn) |
| Syntax | async/await (language-level) |
Async::new()->await() / Async::awaitAll() (library-level) |
| Worker pool | worker_threads |
WorkerPool with task batching + dynamic scaling |
| IPC channels | MessagePort |
Channel::create() (shared TCP pool) |
| Flow control | Node.js Streams | BackpressureStrategy (SUSPEND/DROP/ERROR) |
License
GNU Lesser General Public License v2.1
