hds-solutions / parallel-sdk
SDK to implement parallel php extension
Requires
- php: >=8.0
Requires (Dev)
- phpunit/phpunit: ^9.6
- roave/security-advisories: dev-latest
- symfony/console: ^6.0
Suggests
- ext-parallel: Allows to run multi-threaded processes
- symfony/console: Allows usage of a shared ProgressBar between the Workers
README
An implementation of krakjoe/parallel PHP extension.
This library is designed to work even if the parallel
extension isn't available. In that case, the tasks will be executed un sequential order.
That allow that your code can be deployed in any environment, and if parallel
is enabled you will get the advantage of parallel processing.
Installation
Dependencies
You need these dependencies to execute tasks in parallel.
- PHP >= 8.0 with ZTS enabled
- parallel PECL extension (v1.2.5 or higher)
Parallel extension documentation can be found on https://php.net/parallel.
Through composer
composer require hds-solutions/parallel-sdk
Usage
You should set the bootstrap file for the parallel threads. Setting the composer's autoloader is enough.
// check if extension is loaded to allow deploying even in environments where parallel isn't installed if (extension_loaded('parallel')) { // set the path to composer's autoloader parallel\bootstrap(__DIR__.'/vendor/autoload.php'); }
Behind the scenes, the parallel extension creates an empty Runtime (thread) where the tasks are executed. Every Runtime is a clean, empty, isolated environment without any preloaded classes, functions, or autoloaders from the parent thread/process. This isolation ensures that each runtime starts with a minimal footprint. See references #1 and #2 for more info.
Then you define a Worker
that will process the tasks. There are two options:
- Using an anonymous function as a
Worker
. - Creating a class that extends from
ParallelWorker
and implements theprocess()
method.
Then you can schedule tasks to run in parallel using Scheduler::runTask()
method.
Bootstrap a Laravel app
Since ZTS is only available on the cli, you should set the bootstrap file for parallel threads in the artisan
file.
#!/usr/bin/env php <?php define('LARAVEL_START', microtime(true)); require __DIR__.'/vendor/autoload.php'; $app = require_once __DIR__.'/bootstrap/app.php'; + // check if parallel extension is loaded + if (extension_loaded('parallel')) { + // and register the bootstrap file for the threads + parallel\bootstrap(__DIR__.'/parallel.php'); + }
Then, in the bootstrap file for the parallel threads, you just need to get an instance of the app and bootstrap the Laravel kernel. This way you will have all Laravel service providers registered.
bootstrap/parallel.php
:
<?php declare(strict_types=1); require __DIR__.'/../vendor/autoload.php'; $app = require_once __DIR__.'/app.php'; $kernel = $app->make(Illuminate\Contracts\Console\Kernel::class); // bootstrap the Kernel $kernel->bootstrap();
Anonymous worker
Defining an anonymous function as a Worker
to process the tasks.
use HDSSolutions\Console\Parallel\Scheduler; Scheduler::using(static function(int $number): int { // here you do some work with the received data // this portion of code will run on a separated thread // example process $microseconds = random_int(100, 500); echo sprintf("AnonymousWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); usleep($microseconds * 1000); // end example process // the data returned will be available later return $number; });
Worker instance
Creating a class that extends from ParallelWorker
class. This could be useful for complex processes and to maintain your code clean.
ExampleWorker.php
:
use HDSSolutions\Console\Parallel\ParallelWorker; final class ExampleWorker extends ParallelWorker { protected function process(int $number = 0): int { // example process $microseconds = random_int(100, 500); echo sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); usleep($microseconds * 1000); // end example process return $number; } }
use HDSSolutions\Console\Parallel\Scheduler; Scheduler::using(ExampleWorker::class);
You can also send parameters to the Worker's constructor.
use HDSSolutions\Console\Parallel\ParallelWorker; final class ExampleWorker extends ParallelWorker { public function __construct( private array $multipliers, ) {} }
use HDSSolutions\Console\Parallel\Scheduler; Scheduler::using(ExampleWorker::class, [ 2, 4, 8 ]);
Schedule tasks
After defining a Worker, you can schedule tasks that will run in parallel.
use HDSSolutions\Console\Parallel\Scheduler; foreach (range(1, 100) as $task_data) { try { // tasks will start as soon as a thread is available Scheduler::runTask($task_data); } catch (Throwable) { // if no Worker was defined, a RuntimeException will be thrown // also, Workers have some limitations, see Reference #3 for more info } }
Check Tasks state
Every task has a state. There is also helper functions to check current Task state:
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; do { $all_processed = true; foreach (Scheduler::getTasks() as $task) { switch (true) { case $task->isPending(): $all_processed = false; break; case $task->isBeingProcessed(): $all_processed = false; break; case $task->wasProcessed(): $result = $task->getOutput(); break; } } } while ($all_processed == false);
Wait for tasks completion
Instead of checking every task state, you can wait for all tasks to be processed before continue your code execution.
use HDSSolutions\Console\Parallel\Scheduler; // This will pause execution until all tasks are processed Scheduler::awaitTasksCompletion();
You can also specify a time limit for waiting. The process will pause until all tasks are processed or until max time has been reached, whatever comes first.
use HDSSolutions\Console\Parallel\Scheduler; // Pause until all tasks are processed or until 15 minutes pass Scheduler::awaitTasksCompletion(wait_until: new DateInterval('PT15M'));
Get processed tasks result
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; foreach (Scheduler::getTasks() as $task) { // you have access to the Worker class that was used to process the task $worker = $task->getWorkerClass(); // and the result of the task processed $result = $task->getOutput(); }
Remove pending tasks
You can stop processing queued tasks if your process needs to stop earlier.
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; // this will remove tasks from the pending queue Scheduler::removePendingTasks(); // after cleaning the queue, you should wait for tasks that are currently being processed to finish Scheduler::awaitTasksCompletion(); $results = []; $unprocessed_tasks = []; foreach (Scheduler::getTasks() as $task) { if ($task->wasProcessed()) { $results[] = $task->getOutput(); } else { // tasks that were not processed, will remain in the Pending state $unprocessed_tasks[] = $task; } }
Remove a pending/running task
You can remove a specific task from the processing queue if you need to.
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; foreach (Scheduler::getTasks() as $task) { // if for some reason you want to remove a task, or just want to free memory when a task finishes if (someValidation($task) || $task->wasProcessed()) { // this will remove the task from the processing queue // IMPORTANT: if the task is already running, it will be stopped Scheduler::removeTask($task); } }
Stop processing all tasks immediately
If you need to stop all right away, you can call the Scheduler::stop()
method. This will stop processing all tasks immediately.
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; // this will stop processing tasks immediately Scheduler::stop(); // in this state, Tasks should have 3 of the following states foreach (Scheduler::getTasks() as $task) { switch (true) { case $task->isPending(): // Task was never processed break; case $task->wasProcessed(): // Task was processed by the Worker break; case $task->wasCancelled(): // Task was cancelled while was being processed break; } }
Specifying the No. of CPU Cores
You can control the maximum percentage or number of CPU cores to use by calling the following methods:
use HDSSolutions\Console\Parallel\Scheduler; Scheduler::setMaxCpuCountUsage(2); // Use at max two CPU cores Scheduler::setMaxCpuPercentageUsage(0.5); // Use at max 50% of the total of CPU cores
ProgressBar
Requirements
symfony/console
package- Enable a ProgressBar for the worker calling the
withProgress()
method.
use HDSSolutions\Console\Parallel\Scheduler; $tasks = range(1, 10); Scheduler::using(ExampleWorker::class) ->withProgress(steps: count($tasks));
Usage from Worker
Available methods are:
setMessage(string $message)
advance(int $steps)
setProgress(int $step)
display()
clear()
use HDSSolutions\Console\Parallel\ParallelWorker; final class ExampleWorker extends ParallelWorker { protected function process(int $number = 0): int { // example process $microseconds = random_int(100, 500); $this->setMessage(sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); usleep($microseconds * 1000); $this->advance(); // end example process return $number; } }
Example output
28 of 52: ExampleWorker >> Hello from task #123, I'll wait 604ms [===========================================>------------------------------------] 53% elapsed: 2 secs, remaining: 2 secs, ~13.50 items/s memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB
References
Security Vulnerabilities
If you encounter any security-related issues, please feel free to raise a ticket on the issue tracker.
Contributing
Contributions are welcome! If you find any issues or would like to add new features or improvements, please feel free to submit a pull request.
Contributors
Licence
This library is open-source software licensed under the MIT License. Please see the License File for more information.