kynx / swoole-processor
Execute tasks in multi-process environment
Requires
- php: ~8.3.0 || ~8.4.0
- ext-sockets: *
- ext-swoole: *
Requires (Dev)
- laminas/laminas-coding-standard: ^3.0
- phpunit/phpunit: ^10.5.41
- psalm/plugin-phpunit: 0.19.2
- squizlabs/php_codesniffer: ^3.11
- swoole/ide-helper: ^5.1
- vimeo/psalm: ^6.0
This package is auto-updated.
Last update: 2025-01-27 13:43:07 UTC
README
Run CLI batch jobs in coroutines across multiple processes.
Based on Swoole, the processor is ideal for running a large number of IO-intensive operations.
Install
composer require kynx/swoole-processor
Use
Create a JobProvider
class that returns Job
objects containing the payload you want to process:
use Kynx\Swoole\Processor\Job\Job; use Kynx\Swoole\Processor\Job\JobProviderInterface; class JobProvider implements JobProviderInterface { public function getIterator(): Generator { foreach (range(0, 10) as $payload) { // NOTE: your payload must be serializable!! yield new Job($payload); } } }
Create a Worker
class to handle the jobs:
use Kynx\Swoole\Processor\Job\WorkerInterface; class Worker implement WorkerInterface { public function init(int $workerId): void { // perform any initialisation needed } public function run(Job $job): Job { // do work on payload echo "Got payload: " . $job->getPayload() . "\n"; // return job with result of process return $job->withResult("Processed: " . $job->getPayload()); } }
If required, create a CompletionHandler
to do any post-processing:
use Kynx\Swoole\Processor\Job\CompletionHandlerInterface; class CompletionHandler implements CompletionHandlerInterface { public function complete(Job $job): void { // mark job as complete echo "Completed: " . $job->getResult() . "\n"; } }
Create a script to run the jobs in parallel:
use Kynx\Swoole\Processor\Config; use Kynx\Swoole\Processor\Processor; use Throwable; $processor = new Processor( new Config(), new JobProvider(), new Worker(), new CompletionHandler() ); // this will block until all jobs are processed $result = $processor->run(); return $result === true ? 0 : 1;
If you don't need to handle completion, omit the fourth parameter.
How it works
The JobProvider
runs in its own process, and the resulting jobs are fired at a Swoole server listening on a local
socket. Both it and the the CompletionHandler
are blocking. If the JobProvider
returns a large number of jobs or
performs time-consuming operations, return a Generator so jobs can be started as soon as possible. The
CompletionHandler
should be fast.
The server spawns a number of processes to handle the jobs - by default one less than the number of CPU cores detected.
The process runs your Worker
inside a coroutine, ensuring IO operations do not block. Because of this, ensure all IO
uses a Connection Pool, covered in more detail below.
You can control the number of simultaneous jobs the processor will spawn by setting the concurrency
parameter on the
the Config
you pass to the constructor. It defaults to 10, with a maximum of workers
x maxCoroutines
.
Caveats
Job
payloads and results must be serializable and, but default, together should be less than 2M when serialized- A
Worker
should be stateless. If you need share data between workers, use a Table. - Uncaught exceptions will crash the process, causing it it re-spawn. For this reason your
Worker::run()
is called inside atry ... catch
block. However the exception is discarded: if you care about where your program is going wrong, catch all exceptions inside yourWorker
and handle them yourself.