bingo-soft/concurrent

Concurrent programming constructs for PHP

1.5.0 2024-05-08 10:53 UTC

This package is auto-updated.

Last update: 2024-12-22 20:18:52 UTC


README

Latest Stable Version Minimum PHP Version License: MIT Scrutinizer Code Quality

Concurrent

Concurrent programming constructs for PHP

Installation

Install library, using Composer:

composer require bingo-soft/concurrent

Example 1 (simple pool)

$pool = new DefaultPoolExecutor(3); //only three active processes in the pool
$task1 = new TestTask("task 1");
$task2 = new TestTask("task 2");
$task3 = new TestTask("task 3");
$task4 = new TestTask("task 4");
$task4 = new TestTask("task 5");

$pool->execute($task1);
$pool->execute($task2);
$pool->execute($task3);
$pool->execute($task4); //task for is waiting for an empty slot in the pool
$pool->execute($task5); //task for is waiting for an empty slot in the pool
$pool->shutdown(); //shutdown pool with all processes attached

Example 2 (CountDownLatch)

    //IPC is implemented via server socket on default port 1081? can be changed
    
    $initializer = new ComponentInitializer(4);
        
    // Simulate component initialization
    $initializer->initializeComponent(function () {
        fwrite(STDERR, getmypid() . ": Process is sleeping for 1 second\n");
        sleep(1);
    });
    $initializer->initializeComponent(function () {
        fwrite(STDERR, getmypid() . ": Process is sleeping for 2 seconds\n");
        sleep(2);
    });
    $initializer->initializeComponent(function () {
        fwrite(STDERR, getmypid() . ": Process is sleeping for 3 seconds\n");
        sleep(3);
    });
    $initializer->initializeComponent(function () {
        fwrite(STDERR, getmypid() . ": Process is sleeping for 4 seconds\n");
        sleep(4);
    });      

    $start = hrtime(true);
    $initializer->awaitInitialization();
    $end = hrtime(true);

    //Less than 5 seconds total
    assert(floor(($end - $start) / 1000000) < 5000);

Example 3 (ForkJoinPool with recursive task)

    //SumTask.php with recursive task

    use Concurrent\ThreadInterface;
    use Concurrent\Task\RecursiveTask;

    class SumTask extends RecursiveTask
    {
        public $start;
        public $end;
        private const THRESHOLD = 10000; // Arbitrary threshold to split tasks
        public $timestamp;

        public function __construct(int $start, int $end)
        {
            parent::__construct();
            $this->start = $start;
            $this->end = $end;
            $this->timestamp = hrtime(true);
        }

        public function castResult($result)
        {
            return intval($result);
        }

        public function __serialize(): array
        {
            return [
                'xid' => $this->xid,
                'start' => $this->start,
                'end' => $this->end,
                'timestamp' => $this->timestamp,
                'result' => self::$result->get($this->xid, 'result')
            ];
        }

        public function __unserialize(array $data): void
        {
            $this->xid = $data['xid'];
            $this->start = $data['start'];
            $this->end = $data['end'];
            $this->timestamp = $data['timestamp'];
        }

        public function compute(ThreadInterface $worker, ...$args)
        {
            $length = $this->end - $this->start;
            if ($length <= self::THRESHOLD) { // Base case
                $sum = 0;
                for ($i = $this->start; $i <= $this->end; $i++) {
                    $sum += $i;
                }            
                return $sum;
            } else { // Recursive case
                $mid = $this->start + ($this->end - $this->start) / 2;
                $leftTask = new SumTask($this->start, $mid);
                $rightTask = new SumTask($mid + 1, $this->end);
                $leftTask->fork($worker); // Fork the first task  
                $firstHalf = $rightTask->compute($worker, ...$args);
                $secondHalf = $leftTask->join($worker, ...$args); // Join results
                    return $firstHalf + $secondHalf;
            }
        }
    }

    // ===== Usage

    //Initialize pool, can reset default port of inter process communication
    $pool = ForkJoinPool::commonPool(/*1081*/);

    //Invoke recursive task on pool
    $result = $pool->invoke(new SumTask(1, 300000));
    assert(45000150000, $result); 

Example 4 (CompletableFuture based on multiprocessing)

    //4.1 
    $inputValue = 4;

    //issue non-blocking calls executed on process pool
    $future = CompletableFuture::supplyAsync(function () use ($inputValue) {
        $res = $inputValue * $inputValue;
        return $res;
    }, $executor)->thenApplyAsync(function ($result) {
        usleep(100000);
        $res = $result * 2;
        return $res;
    }, $executor)->thenApplyAsync(function ($result) {
        $res = $result * 2;
        return $res;
    }, $executor);

    //blocking call to get resulting value
    assert($future->get() == 64);


    //4.2

    $inputValue = 4;

    $future = CompletableFuture::supplyAsync(function () use ($inputValue) {
        $res = $inputValue * $inputValue;
        return $res;
    })->thenApplyAsync(function ($result) {
        usleep(100000);
        $res = $result * 2;
        return $res;
    });
    
    //non-blocking call that runs after first two non-blocking calls - can be used for logging etc.
    $future->thenRunAsync(function () {
        fwrite(STDERR, "Running a follow-up background task...\n");
    });

    assert($future->get() == 32);

    //4.3

    $future1 = CompletableFuture::supplyAsync(function () {
        return 2;
    });
    $future2 = CompletableFuture::supplyAsync(function () {
        return 3;
    });

    //combine results of two futures and make non-blocking computation
    $resultFuture = $future1->thenCombineAsync($future2, function ($result1, $result2) {
        return $result1 + $result2;
    });

    assert($resultFuture->join() == 5);

    //4.4

    $future1 = CompletableFuture::runAsync(function () {
        // Simulate a task
        usleep(100000);
    });
    $future2 = CompletableFuture::runAsync(function () {
        // Simulate a task
        usleep(200000);
    });

    //non-blocking task running after both futures complete
    $combinedFuture = $future1->runAfterBothAsync($future2, function () {
        fwrite(STDERR, "Running a follow-up background task...\n");
    });

    $combinedFuture->join();

    //4.5

    $future1 = CompletableFuture::supplyAsync(function () {
        usleep(100000);
        return 2;
    });
    $future2 = CompletableFuture::supplyAsync(function () {
        return 1;
    });
 
    //non-blocking task running after either of two futures
    $resultFuture = $future1->applyToEitherAsync($future2, function ($result) {
        return $result * 2;
    });

    //4.6

    $future1 = CompletableFuture::runAsync(function () {
        // Simulate a task that takes longer
        usleep(200000);
    });
    $future2 = CompletableFuture::runAsync(function () {
        // Simulate a quicker start
        usleep(100000);
    });

    //non-blocking background task running after either of two futures
    $combinedFuture = $future1->runAfterEitherAsync($future2, function () {
        fwrite(STDERR, "Running a follow-up background task...\n");
    });

    //4.7

    //combine results of two non-blocking tasks
    $future = CompletableFuture::supplyAsync(function () {
        return "Hello";
    })->thenApplyAsync(function ($result) {
        return $result . " World";
    })->whenCompleteAsync(function ($result, $exception) {
        if ($exception == null) {
            assert(strpos($result, "World") !== false);
        }
    });

    //4.8

    $future1 = CompletableFuture::supplyAsync(function () {
        return "Hello";
    });
    $future2 = CompletableFuture::supplyAsync(function () {
        return "World";
    });

    //future that waits for all provided futures to complete
    $combinedFuture = CompletableFuture::allOf($future1, $future2);
    $combinedFuture->get();

    //4.9

     $future1 = CompletableFuture::supplyAsync(function () {
        //usleep(200000);
        return "Hello";
    });
    $future2 = CompletableFuture::supplyAsync(function () {
        return "World";
    });

    //future that waits any of the provided futures to complete
    $anyOfFuture = CompletableFuture::anyOf($future1, $future2);

Example 5 (ScheduledPoolExecutor, cycling jobs)

use Concurrent\Executor\ScheduledPoolExecutor;

//create pool with 4 workers (processes)
$executor = new ScheduledPoolExecutor(4);

$futures = [];
//15 parallel cycling jobs
for ($i = 1; $i < 15; $i += 1) {
    
    //period in milliseconds
    $period = rand(3, 15);
    $future = $executor->scheduleAtFixedRate(function () use ($period) {
        fwrite(STDERR, getmypid() . ": task executed, period = $period (ms), current time = " . hrtime(true) . " (ns)\n");
    }, 0, $period, TimeUnit::MILLISECONDS);
    $futures[] = $future;
}

//sleep for 10 seconds and cancel execution
sleep(10);
foreach ($futures as $future) {
    $future->cancel(false);
}

Example 6 (ScheduledPoolExecutor, delayed jobs)

use Concurrent\Executor\ScheduledPoolExecutor;

//create pool with 4 workers (processes)
$executor = new ScheduledPoolExecutor(4);

$future1 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 1 executed, delay = 100 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 100, TimeUnit::MILLISECONDS);

$future2 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 2 executed, delay = 100 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 100, TimeUnit::MILLISECONDS);

$future3 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 3 executed, delay = 200 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 200, TimeUnit::MILLISECONDS);

$future4 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 4 executed, delay = 200 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 200, TimeUnit::MILLISECONDS);

$future5 = $executor->schedule(function () {
    fwrite(STDERR, getmypid() . ": delayed task 5 executed, delay = 1000 (ms), current time = " . hrtime(true) . " (ns)\n");
}, 1000, TimeUnit::MILLISECONDS);

Running tests

./vendor/bin/phpunit ./tests

Running docker container with examples

cd example
docker-compose build
docker-compose up

To test core usage, you can edit TestTask - change value 2000 in run method to 8000 and then run container again. When container is running check how cores are used - use top command, then press f and turn on p (Last used Cpu). You will see, that while container is running, different processor cores are used.

Dependencies

The library depends on Swoole extension and on GMP extension - the last one for handling large numbers.