uppes/parallel

An asynchronous and parallel PHP process API.

1.0.11 2019-02-14 20:03 UTC

README

Build statusBuild StatuscodecovCodacy Badge

Parallel

An Asynchronous Parallel PHP process manager API.

This library will use PCNTL extension if available. Otherwise, will use default polling in our event loop implementation.

This works under Windows OS, which differs in it's fork of spatie/async . In that it allows running of different processes in parallel on Windows without any additional software.

This package is an complete rewrite of spatie/async. The old package following there implementation, but with windows support can be found here.

This package is part of an bigger library in development uppes/promiseplus, uppes/coroutine, and should be used when wanting non-blocking with an function normally blocking.

Installation

You can install the package via composer:

composer require uppes/parallel

Usage

include 'vendor/autoload.php';

use Async\Parallel\Parallel;

$parallel = new Parallel();

foreach ($things as $thing) {
        // the second argument is optional, it sets The maximum amount of time a process may take to finish in seconds.
    $parallel->add(function () use ($thing) {
        // Do a thing
    }, $optional)->then(function ($output) {
        // Handle success
    })->catch(function (\Throwable $exception) {
        // Handle exception
    });
}

$parallel->wait();

Event hooks

When creating asynchronous processes, you'll get an instance of ProcessInterface returned. You can add the following event hooks on a process.

$parallel
    ->add(function () {
        // the second argument is optional, it sets The maximum amount of time a process may take to finish in seconds. Defaults 300.
    }, int $timeout = 300)
    ->then(function ($output) {
        // On success, `$output` is returned by the process or callable you passed to the queue.
    })
    ->catch(function ($exception) {
        // When an exception is thrown from within a process, it's caught and passed here.
    })
;

Error handling

If an Exception or Error is thrown from within a child process, it can be caught per process by specifying a callback in the ->catch() method.

$parallel
    ->add(function () {
        // ...
    })
    ->catch(function ($exception) {
        // Handle the thrown exception for this child process.
    })
;

If there's no error handler added, the error will be thrown in the parent process when calling Parallel::await() or $parallel->wait().

If the child process would unexpectedly stop without throwing an Throwable, the output written to stderr will be wrapped and thrown as Async\Processor\ProcessorError in the parent process.

Working with tasks

Besides using closures, you can also work with a Task. A Task is useful in situations where you need more setup work in the child process. Because a child process is always bootstrapped from nothing, chances are you'll want to initialize eg. the dependency container before executing the task. The Task class makes this easier to do.

use Async\Parallel\Task;

class MyTask extends Task
{
    public function configure()
    {
        // Setup eg. dependency container, load config,...
    }

    public function run()
    {
        // Do the real work here.
    }
}

// Add the task
$parallel->add(new MyTask());

Simple tasks

If you want to encapsulate the logic of your task, but don't want to create a full blown Task object, you may also pass an invokable object to the Parallel.

class InvokableClass
{
    // ...

    public function __invoke()
    {
        // ...
    }
}

$parallel->add(new InvokableClass(/* ... */));

Parallel pool configuration

You're free to create as many parallel process pools as you want, each parallel pool has its own queue of processes it will handle.

A parallel pool is configurable by the developer:

use Async\Parallel\Parallel;

$parallel = (new Parallel())

// The maximum amount of processes which can run simultaneously.
    ->concurrency(20)

// Configure how long the loop should sleep before re-checking the process statuses in milliseconds.
    ->sleepTime(50000);

The Parallel class has a static method isPcntl you can call to check whether your platform is able to run asynchronous processes.

Behind the curtains

When using this package, you're probably wondering what's happening underneath the surface.

We're using the symfony/process component to create and manage child processes in PHP. By creating child processes on the fly, we're able to execute PHP scripts in parallel. This parallelism can improve performance significantly when dealing with multiple Synchronous I/O tasks, which don't really need to wait for each other. By giving these tasks a separate process to run on, the underlying operating system can take care of running them in parallel.

There's a caveat when dynamically spawning processes: you need to make sure that there won't be too many processes at once, or the application might crash. The Parallel class provided by this package takes care of handling as many processes as you want by scheduling and running them when it's possible.

That's the part that Parallel::async() or $parallel->add() does. Now let's look at what Parallel::await() or $parallel->wait() does.

When multiple processes are spawned, each can have a separate time to completion. One process might eg. have to wait for a HTTP call, while the other has to process large amounts of data. Sometimes you also have points in your code which have to wait until the result of a process is returned.

This is why we have to wait at a certain point in time: for all processes on a parallel pool to finish, so we can be sure it's safe to continue without accidentally killing the child processes which aren't done yet.

Waiting for all processes is done by using a while loop, which will wait until all processes are finished. Determining when a process is finished is done by using a listener on the SIGCHLD signal. This signal is emitted when a child process is finished by the OS kernel. As of PHP 7.1, there's much better support for listening and handling signals, making this approach more performant than eg. using process forks or sockets for communication. You can read more about it here.

On windows, an process status check is performed on each event loop tick.

When a process is finished, its success event is triggered, which you can hook into with the ->then() function. Likewise, when a process fails or times out, the loop will update that process' status and move on. When all processes are finished, the while loop will see that there's nothing more to wait for, and stop. This is the moment your parent process can continue to execute.

Comparison to other libraries

Repo spatie/async, has written a blog post containing more information about use cases for there package, as well as making comparisons to other asynchronous PHP libraries like ReactPHP and Amp: http://stitcher.io/blog/asynchronous-php.

Testing

composer test

License

The MIT License (MIT). Please see License File for more information.