hds-solutions/parallel-sdk

SDK to implement parallel php extension

v2.1.4 2024-03-27 17:42 UTC

This package is auto-updated.

Last update: 2024-11-04 23:42:57 UTC


README

An implementation of krakjoe/parallel PHP extension.

Latest stable version License Total Downloads Monthly Downloads Required PHP version

PHP 8.0 PHP 8.1 PHP 8.2 PHP 8.3 PHP 8.4

This library is designed to work even if the parallel extension isn't available. In that case, the tasks will be executed un sequential order. That allow that your code can be deployed in any environment, and if parallel is enabled you will get the advantage of parallel processing.

Installation

Dependencies

You need these dependencies to execute tasks in parallel.

  • PHP >= 8.0 with ZTS enabled
  • parallel PECL extension (v1.2.5 or higher)

Parallel extension documentation can be found on https://php.net/parallel.

Through composer

composer require hds-solutions/parallel-sdk

Usage

You should set the bootstrap file for the parallel threads. Setting the composer's autoloader is enough.

// check if extension is loaded to allow deploying even in environments where parallel isn't installed
if (extension_loaded('parallel')) {
    // set the path to composer's autoloader
    parallel\bootstrap(__DIR__.'/vendor/autoload.php');
}

Behind the scenes, the parallel extension creates an empty Runtime (thread) where the tasks are executed. Every Runtime is a clean, empty, isolated environment without any preloaded classes, functions, or autoloaders from the parent thread/process. This isolation ensures that each runtime starts with a minimal footprint. See references #1 and #2 for more info.

Then you define a Worker that will process the tasks. There are two options:

  1. Using an anonymous function as a Worker.
  2. Creating a class that extends from ParallelWorker and implements the process() method.

Then you can schedule tasks to run in parallel using Scheduler::runTask() method.

Bootstrap a Laravel app

Since ZTS is only available on the cli, you should set the bootstrap file for parallel threads in the artisan file.

#!/usr/bin/env php
<?php

define('LARAVEL_START', microtime(true));

require __DIR__.'/vendor/autoload.php';

$app = require_once __DIR__.'/bootstrap/app.php';

+ // check if parallel extension is loaded
+ if (extension_loaded('parallel')) {
+     // and register the bootstrap file for the threads
+     parallel\bootstrap(__DIR__.'/parallel.php');
+ }

Then, in the bootstrap file for the parallel threads, you just need to get an instance of the app and bootstrap the Laravel kernel. This way you will have all Laravel service providers registered. bootstrap/parallel.php:

<?php declare(strict_types=1);

require __DIR__.'/../vendor/autoload.php';

$app = require_once __DIR__.'/app.php';

$kernel = $app->make(Illuminate\Contracts\Console\Kernel::class);

// bootstrap the Kernel
$kernel->bootstrap();

Anonymous worker

Defining an anonymous function as a Worker to process the tasks.

use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::using(static function(int $number): int {
    // here you do some work with the received data
    // this portion of code will run on a separated thread
    
    // example process
    $microseconds = random_int(100, 500);
    echo sprintf("AnonymousWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds);
    usleep($microseconds * 1000);
    // end example process
    
    // the data returned will be available later
    return $number;
});

Worker instance

Creating a class that extends from ParallelWorker class. This could be useful for complex processes and to maintain your code clean.

ExampleWorker.php:

use HDSSolutions\Console\Parallel\ParallelWorker;

final class ExampleWorker extends ParallelWorker {

    protected function process(int $number = 0): int {
        // example process
        $microseconds = random_int(100, 500);
        echo sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds);
        usleep($microseconds * 1000);
        // end example process

        return $number;
    }

}
use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::using(ExampleWorker::class);

You can also send parameters to the Worker's constructor.

use HDSSolutions\Console\Parallel\ParallelWorker;

final class ExampleWorker extends ParallelWorker {

    public function __construct(
        private array $multipliers,
    ) {}

}
use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::using(ExampleWorker::class, [ 2, 4, 8 ]);

Schedule tasks

After defining a Worker, you can schedule tasks that will run in parallel.

use HDSSolutions\Console\Parallel\Scheduler;

foreach (range(1, 100) as $task_data) {
    try {
        // tasks will start as soon as a thread is available
        Scheduler::runTask($task_data);

    } catch (Throwable) {
        // if no Worker was defined, a RuntimeException will be thrown
        // also, Workers have some limitations, see Reference #3 for more info
    }
}

Check Tasks state

Every task has a state. There is also helper functions to check current Task state:

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

do {
    $all_processed = true;
    foreach (Scheduler::getTasks() as $task) {
        switch (true) {
            case $task->isPending():
                $all_processed = false;
                break;
    
            case $task->isBeingProcessed():
                $all_processed = false;
                break;
    
            case $task->wasProcessed():
                $result = $task->getOutput();
                break;
        }
    }
} while ($all_processed == false);

Wait for tasks completion

Instead of checking every task state, you can wait for all tasks to be processed before continue your code execution.

use HDSSolutions\Console\Parallel\Scheduler;

// This will pause execution until all tasks are processed
Scheduler::awaitTasksCompletion();

You can also specify a time limit for waiting. The process will pause until all tasks are processed or until max time has been reached, whatever comes first.

use HDSSolutions\Console\Parallel\Scheduler;

// Pause until all tasks are processed or until 15 minutes pass
Scheduler::awaitTasksCompletion(wait_until: new DateInterval('PT15M'));

Get processed tasks result

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

foreach (Scheduler::getTasks() as $task) {
    // you have access to the Worker class that was used to process the task
    $worker = $task->getWorkerClass();
    // and the result of the task processed
    $result = $task->getOutput();
}

Remove pending tasks

You can stop processing queued tasks if your process needs to stop earlier.

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

// this will remove tasks from the pending queue
Scheduler::removePendingTasks();

// after cleaning the queue, you should wait for tasks that are currently being processed to finish
Scheduler::awaitTasksCompletion();

$results = [];
$unprocessed_tasks = [];
foreach (Scheduler::getTasks() as $task) {
    if ($task->wasProcessed()) {
        $results[] = $task->getOutput();
    } else {
        // tasks that were not processed, will remain in the Pending state
        $unprocessed_tasks[] = $task;
    }
}

Remove a pending/running task

You can remove a specific task from the processing queue if you need to.

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

foreach (Scheduler::getTasks() as $task) {
    // if for some reason you want to remove a task, or just want to free memory when a task finishes
    if (someValidation($task) || $task->wasProcessed()) {
        // this will remove the task from the processing queue
        // IMPORTANT: if the task is already running, it will be stopped
        Scheduler::removeTask($task);
    }
}

Stop processing all tasks immediately

If you need to stop all right away, you can call the Scheduler::stop() method. This will stop processing all tasks immediately.

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

// this will stop processing tasks immediately
Scheduler::stop();

// in this state, Tasks should have 3 of the following states
foreach (Scheduler::getTasks() as $task) {
    switch (true) {
        case $task->isPending():
            // Task was never processed
            break;

        case $task->wasProcessed():
            // Task was processed by the Worker
            break;

        case $task->wasCancelled():
            // Task was cancelled while was being processed
            break;
    }
}

Specifying the No. of CPU Cores

You can control the maximum percentage or number of CPU cores to use by calling the following methods:

use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::setMaxCpuCountUsage(2);        // Use at max two CPU cores
Scheduler::setMaxCpuPercentageUsage(0.5); // Use at max 50% of the total of CPU cores

ProgressBar

Requirements

  • symfony/console package
  • Enable a ProgressBar for the worker calling the withProgress() method.
use HDSSolutions\Console\Parallel\Scheduler;

$tasks = range(1, 10);

Scheduler::using(ExampleWorker::class)
    ->withProgress(steps: count($tasks));

Usage from Worker

Available methods are:

  • setMessage(string $message)
  • advance(int $steps)
  • setProgress(int $step)
  • display()
  • clear()
use HDSSolutions\Console\Parallel\ParallelWorker;

final class ExampleWorker extends ParallelWorker {

    protected function process(int $number = 0): int {
        // example process
        $microseconds = random_int(100, 500);
        $this->setMessage(sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds));
        usleep($microseconds * 1000);
        $this->advance();
        // end example process

        return $number;
    }

}

Example output

 28 of 52: ExampleWorker >> Hello from task #123, I'll wait 604ms
 [===========================================>------------------------------------]  53%
 elapsed: 2 secs, remaining: 2 secs, ~13.50 items/s
 memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB

References

  1. parallel\bootstrap()
  2. parallel\Runtime
  3. Parallel\Runtime::run() Task Characteristics

Security Vulnerabilities

If you encounter any security-related issues, please feel free to raise a ticket on the issue tracker.

Contributing

Contributions are welcome! If you find any issues or would like to add new features or improvements, please feel free to submit a pull request.

Contributors

Licence

This library is open-source software licensed under the MIT License. Please see the License File for more information.