kage3f / rux-concorrency
A lightweight, native PHP concorrency library using Fibers
Installs: 17
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/kage3f/rux-concorrency
Requires
- php: >=8.1
README
Rux Concurrency
A lightweight, zero-dependency native PHP concurrency library built on top of Fibers (PHP 8.1+). It provides a simple async/await implementation for cooperative multitasking.
Features
- 🚀 Cooperative Multitasking: Run multiple I/O-bound tasks concurrently.
- 🛠 Simple API: Familiar
async,await, andwaitfunctions. - 📦 Zero Dependencies: Pure PHP, no extensions like
parallelorswoolerequired. - 🔌 Event Loop: Built-in scheduler with support for Timers and non-blocking I/O (Streams).
- 🌐 Robust HTTP Client: High-level
get()andget_json()with built-in support for:- SSL/TLS: Secure connections out of the box.
- Redirects: Automatically follows 301, 302, 307, and 308 redirects.
- Compression: Automatic GZIP decompression.
- Chunked Encoding: Handles chunked transfer encoding seamlessly.
- Validation: Throws detailed exceptions for non-2xx status codes or JSON failures.
Installation
Install the package via Composer:
composer require kage3f/rux-concorrency
API Documentation
This library exposes several helper functions in the Rux namespace.
Rux\async(callable $callback): Task
Starts a new asynchronous task.
- What it does: Creates a new
Taskinstance, adds it to the Scheduler, and starts its execution immediately until it hits a suspension point. - Returns: A
Taskinstance.
Rux\await(Task $task): mixed
Waits for a task to complete.
- What it does: Suspends the current fiber until the provided task is finished.
- Returns: The task's return value. Rethrows any exception occurred inside the task.
Rux\wait(): void
Runs the task scheduler.
- What it does: Starts the main loop that processes all tasks. This function is blocking and returns when all tasks are done.
Rux\sleep(int $milliseconds): void
Pauses execution without blocking the process.
- What it does: Suspends the current fiber for a set period, allowing other tasks to run.
Rux\get(string $url): string
Performs a non-blocking GET request.
- Features: Handles SSL, redirects, GZIP, and chunked encoding.
- Throws:
Exceptionon connection failure or non-2xx status codes.
Rux\get_json(string $url): array
Performs a GET request and decodes the JSON response.
- Throws:
Exceptionif the response is not a valid JSON or the request fails.
Rux\parallel_each(array $items, callable $callback, int $concurrency = 10): void
Processes an array in parallel.
- What it does: Splits the array into chunks and runs each chunk in a separate asynchronous task. Useful for bulk database inserts or processing large datasets.
Rux\parallel_map(array $items, callable $callback, int $concurrency = 10): array
Maps an array in parallel and returns the results.
- What it does: Similar to
parallel_each, but collects and returns the results from all tasks.
Usage Examples
Basic Concurrency
use function Rux\async; use function Rux\sleep; use function Rux\wait; async(function() { echo "Task 1: Starting...\n"; sleep(1000); // Non-blocking sleep echo "Task 1: Finished!\n"; }); async(function() { echo "Task 2: Starting...\n"; sleep(500); echo "Task 2: Finished!\n"; }); wait(); // Execute the scheduler
Parallel Processing (Database/Bulk)
use function Rux\async; use function Rux\parallel_each; use function Rux\wait; async(function() use ($largeDataset) { // Processes the dataset using 10 concurrent workers parallel_each($largeDataset, function($chunk) { $db = getDbConnection(); foreach ($chunk as $item) { $db->prepare("INSERT INTO table ...")->execute($item); \Rux\sleep(0); // Optional: yield to other tasks } }, concurrency: 10); }); wait();
Async/Await Pattern
use function Rux\async; use function Rux\await; use function Rux\wait; async(function() { $task1 = async(fn() => "Data from API 1"); $task2 = async(fn() => "Data from API 2"); // Suspends this fiber until the tasks are finished $res1 = await($task1); $res2 = await($task2); echo "$res1, $res2\n"; }); wait();
High-Level HTTP Requests
You don't need to deal with streams manually. The library provides simple helpers:
use function Rux\async; use function Rux\get_json; use function Rux\wait; async(function() { try { $data = get_json('https://api.example.com/data'); print_r($data); } catch (\Exception $e) { echo "Error: " . $e->getMessage(); } }); wait();
Real-World Examples
Check the examples/ directory for professional use cases:
- Dashboard Aggregator: Simulates fetching data from multiple microservices concurrently.
- Concurrent HTTP: Demonstrates fetching multiple external URLs using the high-level
get()function.
How it works
RuxConcorrency uses PHP Fibers to pause and resume execution. When a task performs a non-blocking operation (like Rux\sleep or a non-blocking stream read), it suspends itself, allowing the Scheduler to run other pending tasks.
License
MIT License. Please see the LICENSE file for more information.