gyselroth / mongodb-php-task-scheduler
Asynchronous task scheduler for PHP based on MongoDB
Installs: 2 518
Dependents: 0
Suggesters: 0
Security: 0
Stars: 11
Watchers: 4
Forks: 4
Open Issues: 10
Requires
- php: >=7.1
- ext-mongodb: *
- ext-pcntl: *
- ext-posix: *
- ext-sysvmsg: *
- league/event: ^2.2
- mongodb/mongodb: ^1.4.0
- psr/container: *
- psr/log: 1.*
Requires (Dev)
- friendsofphp/php-cs-fixer: *
- helmich/mongomock: dev-master#6925c67
- phpstan/phpstan: ^0.8.5
- phpunit/phpunit: ^8.5
- dev-master
- v4.x-dev
- v4.0.17
- v4.0.16
- v4.0.16-beta4
- v4.0.16-beta3
- v4.0.16-beta2
- v4.0.16-beta1
- v4.0.15
- v4.0.15-beta1
- v4.0.14
- v4.0.13
- v4.0.12
- v4.0.11
- v4.0.10
- v4.0.9
- v4.0.8
- v4.0.7
- v4.0.6
- v4.0.5
- v4.0.4
- v4.0.3
- v4.0.2
- v4.0.1
- v4.0.0
- v4.0.0-beta18
- v4.0.0-beta17
- v4.0.0-beta16
- v4.0.0-beta15
- v4.0.0-beta14
- v4.0.0-beta13
- v4.0.0-beta12
- v4.0.0-beta11
- v4.0.0-beta10
- v4.0.0-beta9
- v4.0.0-beta8
- v4.0.0-beta7
- v4.0.0-beta6
- v4.0.0-beta5
- v4.0.0-beta4
- v4.0.0-beta3
- v4.0.0-beta2
- v4.0.0-beta1
- v4.0.0-alpha7
- v4.0.0-alpha6
- v4.0.0-alpha5
- v4.0.0-alpha4
- v4.0.0-alpha3
- v4.0.0-alpha2
- v4.0.0-alpha1
- v3.x-dev
- v3.3.0
- v3.3.0-beta.1
- v3.3.0-alpha.3
- v3.3.0-alpha.2
- v3.3.0-alpha.1
- v3.2.2
- v3.2.1
- v3.2.0
- v3.1.0
- v3.0.2
- v3.0.1
- v3.0.0
- v3.0.0-beta8
- v3.0.0-beta7
- v3.0.0-beta6
- v3.0.0-beta5
- v3.0.0-beta4
- v3.0.0-beta3
- v3.0.0-beta2
- v3.0.0-beta1
- v3.0.0-alpha1
- v2.0.5
- v2.0.4
- v2.0.3
- v2.0.2
- v2.0.1
- v2.0.0
- 1.x-dev
- v1.0.3
- v1.0.2
- v1.0.1
- v1.0.0
- v0.0.4
- v0.0.3
- v0.0.2
- v0.0.1
- dev-dependabot/composer/symfony/process-5.4.46
- dev-dev
This package is not auto-updated.
Last update: 2024-11-15 09:56:32 UTC
README
Parallel task scheduler for PHP using MongoDB as distribution queue. Execute parallel tasks the easy way. This library has built-in support for clustered systems and multi core cpu. You can start up multiple worker nodes and they will load balance the available jobs with the principal first comes first serves. Each node will also spawn a (dynamically) configurable number of child processes to use all available resources. Moreover it is possible to schedule jobs at certain times, endless intervals as well as rescheduling if jobs fail. This brings a real world implementation for parallel process management to PHP. You are also able to sync child tasks and much more nice stuff.
Features
- Parallel tasks
- Cluster support
- Multi core support
- Load balancing
- Failover
- Scalable
- Sync tasks between each other
- Abort running tasks
- Timeout jobs
- Retry and intervals
- Schedule tasks at specific times
- Signal management
- Intercept events
- Progress support
- Auto detection of orphaned jobs
v4
This is the documentation for the current major version v4. You may check the upgrade guide if you want to upgrade from v3 or even an older version. The documentation for v3 is available here.
Table of Contents
- Features
- Why?
- How does it work (The short way please)?
- Requirements
- Download
- Changelog
- Contribute
- Terms
- Documentation
- Create job
- Initialize scheduler
- Spool job
- Execute jobs
- Manage jobs
- Handling of failed jobs
- Job progess
- Asynchronous programming
- Listen for Events
- Advanced job options
- Add job if not exists
- Advanced worker manager options
- Advanced queue node options
- Using a PSR-11 DIC
- Data Persistency
- Signal handling
- Real world examples
Why?
PHP isn't a multithreaded language and neither can it handle (most) tasks asynchronous. Sure there is pthreads and pcntl but those are only usable in cli mode (or should only be used there). Using this library you are able to write taks which can be executed in parallel by the same or any other system.
How does it work (The short way please)?
A job is scheduled via a task scheduler and gets written into a central message queue (MongoDB). All Queue nodes will get notified in (soft) realtime that a new job is available. The queue node will forward the job via a internal systemv message queue to the worker manager. The worker manager decides if a new worker needs to be spawned. At last, one worker will execute the task according the principle first come first serves. If no free slots are available the job will wait in the queue and get executed as soon as there is a free slot. A job may be rescheduled if it failed. There are lots of more features available, continue reading.
Requirements
- Posix system (Basically every linux)
- MongoDB server >= 3.6
- MongoDB replication set (May also be just a single MongoDB node)
- PHP >= 7.1
- PHP pcntl extension
- PHP posix extension
- PHP mongodb extension
- PHP sysvmsg extension
Note: This library will only work on *nix systems. There is no windows support and there will most likely never be.
Download
The package is available at packagist
To install the package via composer execute:
composer require gyselroth/php-task-scheduler
Changelog
A changelog is available here.
Contribute
We are glad that you would like to contribute to this project. Please follow the given terms.
Terms
You may encounter the follwing terms in this readme or elsewhere:
Install
If your app gets built using a docker container you must use at least the following build options:
FROM php:7.4 RUN docker-php-ext-install pcntl sysvmsg RUN pecl install mongodb && docker-php-ext-enable mongodb pcntl sysvmsg
Documentation
For a better understanding how this library works, we're going to implement a mail job. Of course you can implement any kind of job.
Create job
It is quite easy to create a task, you just need to implement TaskScheduler\JobInterface. In this example we're going to implement a job called MailJob which sends mail using zend-mail.
Note: You can use TaskScheduler\AbstractJob to implement the required default methods by TaskScheduler\JobInterface. The only thing then you need to implement is start() which does the actual job (sending mail).
class MailJob extends TaskScheduler\AbstractJob { /** * {@inheritdoc} */ public function start(): bool { $transport = new Zend\Mail\Transport\Sendmail(); $mail = Message::fromString($this->data); $this->transport->send($mail); return true; } }
Initialize scheduler
You need an instance of a MongoDB\Database and a Psr\Log\LoggerInterface compatible logger to initialize the scheduler.
$mongodb = new MongoDB\Client('mongodb://localhost:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
Spool job
Now let us create a mail and deploy it to the task scheduler which we have initialized right before:
$mail = new Message(); $mail->setSubject('Hello...'); $mail->setBody('World'); $mail->setFrom('root@localhost', 'root'); $scheduler->addJob(MailJob::class, $mail->toString());
This is the whole magic, our scheduler now got its first job, awesome!
Execute jobs
But now we need to execute those queued jobs.
That's where the queue nodes come into play.
Those nodes listen in (soft) realtime for new jobs and will load balance those jobs.
Create worker factory
You will need to create your own worker node factory in your app namespace which gets called to spawn new child processes. This factory gets called during a new fork is spawned. This means if it gets called, you are in a new process and you will need to bootstrap your application from scratch (Or just the things you need for a worker).
Note: Both a worker manager and a worker itself are spawned in own forks from the queue node process.
Queue node (TaskScheduler\Queue)
|
|-- Worker Manager (TaskScheduler\WorkerManager)
|
|-- Worker (TaskScheduler\Worker)
|-- Worker (TaskScheduler\Worker)
|-- Worker (TaskScheduler\Worker)
|-- ...
For both a worker manager and a worker a new fork means you will need to bootstrap the class from scratch.
Note: Theoretically you can reuse existing connections, objects and so on by setting those via the constructor of your worker factory since the factory gets initialized in main(). But this will likely lead to errors and strange app behaviours and is not supported.
For better understanding: if there is a configuration file where you have stored your configs like a MongoDB uri, in the factory you will need to parse this configuration again and create a new mongodb instance. Or you may be using a PSR-11 container, the container needs to be created from scratch in the factory (A new dependency tree). You may pass an instance of a dic (compatible to Psr\Container\ContainerInterface) as fifth argument to TaskScheduler\Worker (or advanced worker manager options as third argument for a TaskScheduler\WorkerManager (Advanced worker manager options). See more at Using a DIC).
class WorkerFactory extends TaskScheduler\WorkerFactoryInterface { /** * {@inheritdoc} */ public function buildWorker(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker { $mongodb = new MongoDB\Client('mongodb://localhost:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger); } /** * {@inheritdoc} */ public function buildManager(): TaskScheduler\WorkerManager { $logger = new \A\Psr4\Compatible\Logger(); return new TaskScheduler\WorkerManager($this, $logger); } }
Create queue node
Let us write a new queue node. The queue node must be started as a separate process! You should provide an easy way to start such queue nodes, there are multiple ways to achieve this. The easiest way is to just create a single php script which can be started via cli.
$mongodb = new MongoDB\Client('mongodb://localhost:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $worker_factory = My\App\WorkerFactory(); #An instance of our previously created worker factory $queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger);
And then start the magic:
$queue->process();
Note: TaskScheduler\Queue::process() is a blocking call.
Our mail gets sent as soon as a queue node is running and started some workers.
Usually you want those nodes running at all times! They act like invisible execution nodes behind your app.
Manage jobs
Get jobs
You may want to retrieve all scheduled jobs:
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $scheduler->getJobs();
By default you will receive all jobs with the status:
- WAITING
- PROCESSING
- POSTPONED
You may pass an optional query to query specific jobs.
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $jobs = $scheduler->getJobs([ 'status' => TaskScheduler\JobInterface::STATUS_DONE, '$or' => [ ['class' => 'MyApp\\MyTask1'], ['class' => 'MyApp\\MyTask2'], ] ]); foreach($jobs as $job) { echo $job->getId()." done\n"; }
Cancel job
It is possible to cancel jobs waiting in the queue as well as kill jobs which are actually running.
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $scheduler->cancelJob(MongoDB\BSON\ObjectId $job_id);
If you cancel a job with the status PROCESSING, the job gets killed by force and my corrupt data. You have been warned. (This is similar as the job ends with status TIMEOUT). The only difference is that a timeout job gets rescheduled if it has retry > 0 or has a configured interval. A canceled job will not get rescheduled. You will need to create a new job manually for that.
Modify jobs
It is not possible to modify a scheduled job by design. You need to cancel the job and append a new one.
Note: This is likely to be changed with v4 which will feature persistence for jobs.
Flush queue
While it is not possible to modify/remove jobs it is possible to flush the entire queue.
Note: This is not meant to be called regularly. There may be a case where you need to flush all jobs because of an upgrade. Running queue nodes will detect this and will listen for newly spooled jobs.
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $scheduler->flush();
Handling of failed jobs
A job is acknowledged as failed if the job throws an exception of any kind. If we have a look at our mail job again, but this time it will throw an exception:
class MailJob extends TaskScheduler\AbstractJob { /** * {@inheritdoc} */ public function start(): bool { $transport = new Zend\Mail\Transport\Sendmail(); $mail = Message::fromString($this->data); $this->transport->send($mail); throw new \Exception('i am an exception'); return true; } }
This will lead to a FAILED job as soon as this job gets executed.
Note: It does not matter if you return
true
orfalse
, only an uncaught exception will result to a FAILED job, however you should always returntrue
.
The scheduler has an integrated handling of failed jobs. You may specify to automatically reschedule a job if it failed. The following will reschedule the job up to 5 times (If it ended with status FAILED) with an interval of 30s.
$scheduler->addJob(MailJob::class, $mail->toString(), [ TaskScheduler\Scheduler::OPTION_RETRY => 5, TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 30, ]);
This will queue our mail to be executed in one hour from now and it will re-schedule the job up to three times if it fails with an interval of one minute.
Alive ping and Job progress
TaskScheduler has built-in support to update the progress of a job from your job implementation.
By default a job starts at 0
(%) and ends with progress 100
(%). Note that the progress is floating number.
You may increase the progress made within your job.
Important:
Note that by default the scheduler takes a job after 30s as orphaned and reschedules it.
You may change the 30s globally during the Scheduler initialization or keep calling ->updateProgress()
within your task implementation.
Calling updateProgress
with or without a progress acts like a keep alive ping for the scheduler and should be called in your task if it a long running task which contains a loop. If there is no loop you should still call this method in some form of intervals to keep your task alive.
Set a progress as percentage value is not required, if not set the task keeps beeing at 0% and set to 100% if finished.
Let us have a look how this works with a job which copies a file from a to b.
class CopyFileJob extends TaskScheduler\AbstractJob { /** * {@inheritdoc} */ public function start(): bool { $source = $this->data['source']; $dest = $this->data['destination']; $size = filesize($source); $f = fopen($source, 'r'); $t = fopen($dest, 'w'); $read = 0; while($chunk = fread($f, 4096)) { $read += fwrite($t, $chunk); $this->updateProgress($read/$size*100); } } }
The current progress may be available using the process interface:
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $p = $scheduler->getJob(MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69')); $p->getProgress();
Note There is a rate limit for the progress updates which is by default 500ms. You may change the rate limit by configuring the
TaskScheduler::OPTION_PROGRESS_RATE_LIMIT
to something else and to 0 if you do not want a rate limit at all.
Asynchronous programming
Have a look at this example:
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $scheduler->addJob(MyTask::class, 'foobar') ->wait();
This will force main() (Your process) to wait until the task MyTask::class
was executed. (Either with status DONE, FAILED, CANCELED, TIMEOUT).
Here is more complex example:
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $stack = []; $stack[] = $scheduler->addJob(MyTask::class, 'foobar'); $stack[] = $scheduler->addJob(MyTask::class, 'barfoo'); $stack[] = $scheduler->addJob(OtherTask::class, 'barefoot'); $scheduler->waitFor($stack); //some other important stuff here
This will wait for all three jobs to be finished before continuing.
Important note:
If you are programming in http mode (incoming http requests) and your app needs to deploy tasks it is good practice not to wait!.
Best practice is to return a HTTP 202 code instead. If the client needs to know the result of those jobs you may return
the process id's and send a 2nd request which then waits and returns the status of those jobs or the client may get its results via a persistent connection or websockets.
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $stack = $scheduler->getJobs([ '_id' => ['$in' => $job_ids_from_http_request] ]); $scheduler->waitFor(iterator_to_array($stack)); //do stuff
You may also intercept the wait if any process results in an exception:
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $stack = []; $stack[] = $scheduler->addJob(MyTask::class, 'foobar'); $stack[] = $scheduler->addJob(MyTask::class, 'barfoo'); $stack[] = $scheduler->addJob(OtherTask::class, 'barefoot'); try { $scheduler->waitFor($stack, Scheduler::OPTION_THROW_EXCEPTION); } catch(\Exception $e) { //error handling }
Listen for events
You may bind to the scheduler and listen for any changes and do stuff :)
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $jobs = $scheduler->listen(function(TaskScheduler\Process $process) { echo "status of ".$process->getId().' change to '.$process->getStatus(); });
It is also possible to filter such events, this example will only get notified for events occured in a specific job.
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $jobs = $scheduler->listen(function(TaskScheduler\Process $process) { echo "status of ".$process->getId().' change to '.$process->getStatus(); }, [ '_id' => new MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69') ]);
Note: listen() is a blocking call, you may exit the listener and continue with main() if you return a boolean
true
in the listener callback.
Bind events
Besides the simple listener method for the Scheduler you may bind event listeneres to your TaskScheduler\Queue
and/or TaskScheduler\Scheduler
.
For example:
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $stack = []; $stack[] = $scheduler->addJob(MyTask::class, 'foobar'); $stack[] = $scheduler->addJob(MyTask::class, 'barfoo'); $stack[] = $scheduler->addJob(OtherTask::class, 'barefoot'); $scheduler->on('waiting', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is waiting'; })->on('done', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is finished'; })->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is '.$p->getStats(); }); $scheduler->waitFor($stack);
Note: You need to to bind your listeneres before calling
Scheduler::waitFor()
since that is a synchronous blocking call.
You may bind listeneres to the same events in your queue nodes:
$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger); $queue->on('timeout', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is timed out'; })->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is '.$p->getStats(); }); $queue->process();
Note: You need to to bind your listeneres before calling
Queue::process()
since that is a synchronous blocking call.
Events
You may bind for the following events:
Custom event emitter
Under the hood both TaskScheduler\Queue
and TaskScheduler\Scheduler
use League\Event\Emitter
as event emitter.
You may create both instances with your own Leage Event emitter instance:
$emitter = new League\Event\Emitter(); //Queue $queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger, $emitter); //Scheduler $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, [], $emitter);
Advanced job options
TaskScheduler\Scheduler::addJob()/TaskScheduler\Scheduler::addJobOnce() also accept a third option (options) which let you set more advanced options for the job:
Note: Be careful with timeouts since it will kill your running job by force. You have been warned. You shall always use a native timeout in a function if supported.
Let us add our mail job example again with some custom options:
Note: We are using the OPTION_ constansts here, you may also just use the names documented above.
$mongodb = new MongoDB\Client('mongodb://localhost:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $mail = new Message(); $mail->setSubject('Hello...'); $mail->setBody('World'); $mail->setFrom('root@localhost', 'root'); $scheduler->addJob(MailJob::class, $mail->toString(), [ TaskScheduler\Scheduler::OPTION_AT => time()+3600, TaskScheduler\Scheduler::OPTION_RETRY => 3, TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 60, ]);
This will queue our mail to be executed in one hour from now and it will re-schedule the job up to three times if it fails with an interval of one minute.
Add job if not exists
What you also can do is adding the job only if it has not been queued yet.
Instead using addJob()
you can use addJobOnce()
, the scheduler then verifies if it got the same job already queued. If not, the job gets added.
The scheduler compares the type of job (MailJob
in this case) and the data submitted ($mail->toString()
in this case).
Note: The job gets rescheduled if options get changed.
$scheduler->addJobOnce(MailJob::class, $mail->toString(), [ TaskScheduler\Scheduler::OPTION_AT => time()+3600, TaskScheduler\Scheduler::OPTION_RETRY => 3, ]);
By default TaskScheduler\Scheduler::addJobOnce()
does compare the job class, the submitted data and the process status (either PROCESSING, WAITING or POSTPONED).
If you do not want to check the data, you may set TaskScheduler\Scheduler::OPTION_IGNORE_DATA
to true
. This will tell the scheduler to only reschedule the job of the given class
if the data changed. This is quite useful if a job of the given class must only be queued once.
Note: This option does not make sense in the mail example we're using here. A mail can have different content. But it may happen that you have job which clears a temporary storage every 24h:
$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 3600], [ TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true, TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24, ]);
If max_age changes, the old job gets canceled and a new one gets queued. If TaskScheduler\Scheduler::OPTION_IGNORE_DATA
is not set here we will end up with two jobs of the type MyApp\CleanTemp::class
.
$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 1800], [ TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true, TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24, ]);
Of course it is also possible to query such job manually, cancel them and reschedule. This will achieve the same as above:
$jobs = $scheduler->getJobs([ 'class' => MyApp\CleanTemp::class, 'status' => ['$lte' => TaskScheduler\JobInterface::STATUS_PROCESSING] ]); foreach($jobs as $job) { $scheduler->cancelJob($job->getId()); } $scheduler->addJob(MyApp\CleanTemp::class, ['max_age' => 1800], [ TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24, ]);
Advanced scheduler options
You may set those job options as global defaults for the whole scheduler. Custom options and defaults can be set for jobs during initialization or by calling Scheduler::setOptions().
$mongodb = new MongoDB\Client('mongodb://localhost:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, null, [ TaskScheduler\Scheduler::OPTION_JOB_QUEUE_SIZE => 1000000000, TaskScheduler\Scheduler::OPTION_EVENT_QUEUE_SIZE => 5000000000, TaskScheduler\Scheduler::OPTION_DEFAULT_RETRY => 3 ]);
You may also change those options afterwards:
$scheduler->setOptions([ TaskScheduler\Scheduler::OPTION_DEFAULT_RETRY => 2 ]);
Note: Changing default job options will not affect any existing jobs.
Note: It is important to choose a queue size (job_queue_size and event_queue_size) which fits into your setup.
Advanced worker manager options
While you already know, that you need a worker factory to spawn the worker manager, you may specify advanced options for it! Here is our worker factory again, but this time we specify some more options:
class WorkerFactory extends TaskScheduler\WorkerFactoryInterface { /** * {@inheritdoc} */ public function buildWorker(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker { $mongodb = new MongoDB\Client('mongodb://localhost:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger); } /** * {@inheritdoc} */ public function buildManager(): TaskScheduler\WorkerManager { $logger = new \A\Psr4\Compatible\Logger(); return new TaskScheduler\WorkerManager($this, $logger, [ TaskScheduler\WorkerManager::OPTION_MIN_CHILDREN => 10, TaskScheduler\WorkerManager::OPTION_PM => 'static' ]); } }
Worker handling is done by specifying the option pm
while dynamically spawning workers is the default mode.
Process management modes (pm
):
- dynamic (start min_children forks at startup and dynamically create new children if required until max_children is reached)
- static (start min_children nodes, (max_children is ignored))
- ondemand (Do not start any children at startup (min_children is ignored), bootstrap a worker for each job but no more than max_children. After a job is done (Or failed, canceled, timeout), the worker dies.
The default is dynamic
. Usually dynamic
makes sense. You may need static
in a container provisioned world whereas the number of queue nodes is determined
from the number of outstanding jobs. For example you may be using Kubernetes autoscaling.
Note: The number of actual child processes can be higher if jobs are scheduled with the option Scheduler::OPTION_FORCE_SPAWN.
Using a PSR-11 DIC
Optionally one can pass a Psr\Container\ContainerInterface to the worker nodes which then get called to create job instances. You probably already get it, but here is the worker factory again. This time it passes an instance of a PSR-11 container to worker nodes. And if you already using a container it makes perfectly sense to request the manager from it. (Of course you may also request a worker instances from it if your container implementation supports parameters at runtime (The worker id). Note: This will be an incompatible container implementation from the PSR-11 specification.)
class WorkerFactory extends TaskScheduler\WorkerFactoryInterface { /** * {@inheritdoc} */ public function build(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker { $mongodb = new MongoDB\Client('mongodb://localhost:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $dic = new \A\Psr11\Compatible\Dic(); return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger, $dic); } /** * {@inheritdoc} */ public function buildManager(): TaskScheduler\WorkerManager { $dic = new \A\Psr11\Compatible\Dic(); return $dic->get(TaskScheduler\WorkerManager::class); } }
Signal handling
Terminating queue nodes is possible of course. They even manage to reschedule running jobs. You just need to send a SIGTERM to the process. The queue node then will transmit this the worker manager while the worker manager will send it to all running workers and they will save their state and nicely exit. A worker also saves its state if the worker process directly receives a SIGTERM. If a SIGKILL was used to terminate the queue node (or worker) the state can not be saved and you might get zombie jobs (Jobs with the state PROCESSING but no worker will actually process those jobs). No good sysadmin will terminate running jobs by using SIGKILL, it is not acceptable and may only be used if you know what you are doing.
You should as well avoid using never ending blocking functions in your job, php can't handle signals if you do that.
Real world examples
Add your project here, a PR will be most welcome.