
1.1.2 2020-09-01 02:34 UTC

This package is auto-updated.

Last update: 2024-09-13 05:29:37 UTC


Parallel worker pool uses the PHP parallel extension to provide a simple interface for dealing with parallelization of tasks.


The WorkerPool requires an implementation of the WorkFactoryInterface which is responsible for creating the consumer and producer closures. A producer closure must return a Generator.

Composer installation

composer require hdvianna/parallel-workerpool

Runing with Docker

docker-compose up

Docker compose builds an environment with the needed extensions installed and create a bind mount to the current directory.


In this example 10 workers will sleep for n milliseconds, each time they consume the work generated by the WorkFactory.

use hdvianna\Concurrent\WorkFactoryInterface;
use hdvianna\Concurrent\WorkerPool;

(new WorkerPool(new class implements WorkFactoryInterface {
    public function createWorkGeneratorClosure(): \Closure
        return function () {
            for ($i = 0; $i < 100; $i++) {
                $work = new \stdClass();
                $work->time = mt_rand(300, 1000);
                $work->id = $i;
                yield $work;

    public function createWorkConsumerClosure(): \Closure
        return function($work) {
            printf("[$work->id]: Sleeping for %d milliseconds ...%s", $work->time, PHP_EOL);
            usleep($work->time * 1000);
            printf("[$work->id]: Woke up after %d milliseconds ...%s", $work->time, PHP_EOL);

}, 10))->run();

Synchronizing data

Data can be synchronized by using lock and unlock closures sent to the worker functions. The shared data are received from the $lock closure and sent to the $unlock closure. The last value sent can be get invoking the WorkerPool::lastValue()

use hdvianna\Concurrent\WorkFactoryInterface;
use hdvianna\Concurrent\WorkerPool;

$sharedData = 700;
$works = 1000;

$pool = new WorkerPool((new class ($sharedData, $works) implements WorkFactoryInterface {

     * @var int
    private $sharedData;

     * @var int
    private $works;

     *  constructor.
     * @param int $sharedData
     * @param int $works
    public function __construct($sharedData, $works)
        $this->works = $works;
        $this->sharedData = $sharedData;

    public function createWorkGeneratorClosure(): \Closure
        $workers = $this->works;
        return function () use ($workers) {
            for ($i = 0; $i < $workers; $i++) {
                $work = new \stdClass();
                $work->value = 1;
                yield $work;

    public function createWorkConsumerClosure(): \Closure
        $initialValue = $this->sharedData;
        //Use the $lock and $unlock closures to synchronize data 
        return function ($work, $lock, $unlock) use ($initialValue) {
            /*Synchronize the data. Will block and wait for data. 
            $lock will return the last value*/
            $shared = $lock();            
            if (!isset($shared)) {
                //Data was not initialized 
                $shared = $initialValue;
            $shared += $work->value;
            //Unlocks sending the new data.

}), 10);
//Get the last value sent to the unlock closure
$result = $pool->lastValue();
echo("\$result equals to \$works + \$sharedData?" . PHP_EOL);
echo("($result equals to $works + $sharedData?)" . PHP_EOL);
echo(assert($result === ($works + $sharedData)) ? "Yes!": "No =(").PHP_EOL;