kage3f/rux-concorrency

A lightweight, native PHP concorrency library using Fibers

Installs: 17

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/kage3f/rux-concorrency

v1.4.2 2025-12-23 00:04 UTC

This package is auto-updated.

Last update: 2025-12-23 00:05:30 UTC


README

Rux Logo

Rux Concurrency

Latest Stable Version License

A lightweight, zero-dependency native PHP concurrency library built on top of Fibers (PHP 8.1+). It provides a simple async/await implementation for cooperative multitasking.

Features

  • 🚀 Cooperative Multitasking: Run multiple I/O-bound tasks concurrently.
  • 🛠 Simple API: Familiar async, await, and wait functions.
  • 📦 Zero Dependencies: Pure PHP, no extensions like parallel or swoole required.
  • 🔌 Event Loop: Built-in scheduler with support for Timers and non-blocking I/O (Streams).
  • 🌐 Robust HTTP Client: High-level get() and get_json() with built-in support for:
    • SSL/TLS: Secure connections out of the box.
    • Redirects: Automatically follows 301, 302, 307, and 308 redirects.
    • Compression: Automatic GZIP decompression.
    • Chunked Encoding: Handles chunked transfer encoding seamlessly.
    • Validation: Throws detailed exceptions for non-2xx status codes or JSON failures.

Installation

Install the package via Composer:

composer require kage3f/rux-concorrency

API Documentation

This library exposes several helper functions in the Rux namespace.

Rux\async(callable $callback): Task

Starts a new asynchronous task.

  • What it does: Creates a new Task instance, adds it to the Scheduler, and starts its execution immediately until it hits a suspension point.
  • Returns: A Task instance.

Rux\await(Task $task): mixed

Waits for a task to complete.

  • What it does: Suspends the current fiber until the provided task is finished.
  • Returns: The task's return value. Rethrows any exception occurred inside the task.

Rux\wait(): void

Runs the task scheduler.

  • What it does: Starts the main loop that processes all tasks. This function is blocking and returns when all tasks are done.

Rux\sleep(int $milliseconds): void

Pauses execution without blocking the process.

  • What it does: Suspends the current fiber for a set period, allowing other tasks to run.

Rux\get(string $url): string

Performs a non-blocking GET request.

  • Features: Handles SSL, redirects, GZIP, and chunked encoding.
  • Throws: Exception on connection failure or non-2xx status codes.

Rux\get_json(string $url): array

Performs a GET request and decodes the JSON response.

  • Throws: Exception if the response is not a valid JSON or the request fails.

Rux\parallel_each(array $items, callable $callback, int $concurrency = 10): void

Processes an array in parallel.

  • What it does: Splits the array into chunks and runs each chunk in a separate asynchronous task. Useful for bulk database inserts or processing large datasets.

Rux\parallel_map(array $items, callable $callback, int $concurrency = 10): array

Maps an array in parallel and returns the results.

  • What it does: Similar to parallel_each, but collects and returns the results from all tasks.

Usage Examples

Basic Concurrency

use function Rux\async;
use function Rux\sleep;
use function Rux\wait;

async(function() {
    echo "Task 1: Starting...\n";
    sleep(1000); // Non-blocking sleep
    echo "Task 1: Finished!\n";
});

async(function() {
    echo "Task 2: Starting...\n";
    sleep(500);
    echo "Task 2: Finished!\n";
});

wait(); // Execute the scheduler

Parallel Processing (Database/Bulk)

use function Rux\async;
use function Rux\parallel_each;
use function Rux\wait;

async(function() use ($largeDataset) {
    // Processes the dataset using 10 concurrent workers
    parallel_each($largeDataset, function($chunk) {
        $db = getDbConnection();
        foreach ($chunk as $item) {
            $db->prepare("INSERT INTO table ...")->execute($item);
            \Rux\sleep(0); // Optional: yield to other tasks
        }
    }, concurrency: 10);
});

wait();

Async/Await Pattern

use function Rux\async;
use function Rux\await;
use function Rux\wait;

async(function() {
    $task1 = async(fn() => "Data from API 1");
    $task2 = async(fn() => "Data from API 2");

    // Suspends this fiber until the tasks are finished
    $res1 = await($task1);
    $res2 = await($task2);

    echo "$res1, $res2\n";
});

wait();

High-Level HTTP Requests

You don't need to deal with streams manually. The library provides simple helpers:

use function Rux\async;
use function Rux\get_json;
use function Rux\wait;

async(function() {
    try {
        $data = get_json('https://api.example.com/data');
        print_r($data);
    } catch (\Exception $e) {
        echo "Error: " . $e->getMessage();
    }
});

wait();

Real-World Examples

Check the examples/ directory for professional use cases:

  1. Dashboard Aggregator: Simulates fetching data from multiple microservices concurrently.
  2. Concurrent HTTP: Demonstrates fetching multiple external URLs using the high-level get() function.

How it works

RuxConcorrency uses PHP Fibers to pause and resume execution. When a task performs a non-blocking operation (like Rux\sleep or a non-blocking stream read), it suspends itself, allowing the Scheduler to run other pending tasks.

License

MIT License. Please see the LICENSE file for more information.