activecollab / jobsqueue
Simple jobs queue
Installs: 60 282
Dependents: 1
Suggesters: 1
Security: 0
Stars: 5
Watchers: 4
Forks: 2
Open Issues: 1
Requires
- php: >=8.0
- ext-json: *
- activecollab/databaseconnection: ^5.0
- doctrine/inflector: ^2.0
- psr/log: ^1.0
Requires (Dev)
- ext-mysqli: *
- friendsofphp/php-cs-fixer: ^2.0
- monolog/monolog: ~1.0
- phpunit/phpunit: ^9.0
- pimple/pimple: ~3.0
Suggests
- ext-mysqli: *
- ext-posix: *
- symfony/console: Use Symfony console to easily create CLI jobs consumer
- dev-master
- 5.0.1
- 5.0.0
- 4.1.1
- 4.1.0
- 4.0.0
- 3.0.3
- 3.0.2
- 3.0.1
- 3.0.0
- 2.1.1
- v2.0.x-dev
- 2.0.1
- 2.0.0
- 1.2.0
- 1.1.1
- 1.1.0
- 1.0.12
- 1.0.11
- 1.0.10
- 1.0.9
- 1.0.8
- 1.0.7
- 1.0.6
- 1.0.5
- 1.0.4
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
- 0.1.7
- 0.1.6
- 0.1.5
- 0.1.4
- 0.1.3
- 0.1.2
- 0.1.1
- 0.1.0
- dev-dependabot/composer/symfony/process-5.4.46
- dev-set-locale-for-escapeshellargs
- dev-array-in-argument
- dev-increase-delay-limit
- dev-fix-get-channels
- dev-release/0.1
This package is auto-updated.
Last update: 2025-01-06 20:46:59 UTC
README
Reason for existence: it's light, with very few dependencies. It can be used with cron + database powered queues for people who are not allowed to run a proper messaging or job management server. Or you can execute jobs through proper messaging or job manager with it.
Installation
To install it, use Composer:
{ "require": { "activecollab/jobsqueue": "^1.0.0" } }
Basic Usage
This library uses three elements:
- Dispatcher is here to dispatch jobs,
- Queues make sure that jobs can be queued,
- Jobs perform the actual work.
As a demonstration, we'll create a simple job that increments a number:
<?php use ActiveCollab\JobsQueue\Jobs\Job; class Inc extends Job { /** * Increment a number */ public function execute(): mixed { return $this->getData()['number'] + 1; } }
Tip: To fail an attempt, just throw an exception from within execute()
method.
Now, lets create a dispatcher instance that manages one MySQL powered queue:
<?php use ActiveCollab\JobsQueue\JobsDispatcher; use ActiveCollab\JobsQueue\Queue\MySqlQueue; use mysqli; use RuntimeException; $database_link = new MySQLi('localhost', 'root', '', 'activecollab_jobs_queue_test'); if ($database_link->connect_error) { throw new RuntimeException('Failed to connect to database. MySQL said: ' . $database_link->connect_error); } $queue = new MySqlQueue($database_link); // Not required but gives you flexibility with failure handling $queue->onJobFailure(function(Job $job, Exception $reason) { throw new Exception('Job ' . get_class($job) . ' failed', 0, $reason); }); $dispatcher = new JobsDispatcher($queue);
Lets add a job to the queue:
$dispatcher->dispatch(new Inc([ 'number' => 123 ]));
Code that executes jobs from the queue will get this job as next available job:
$next_in_line = $dispatcher->getQueue()->nextInLine(); $dispatcher->getQueue()->execute($next_in_line);
To run a job and wait for the result, use execute()
instead of dispatch()
:
$result = $dispatcher->execute(new Inc([ 'number' => 123 ]));
When called like this, jobs are executed right away. execute()
suppresses exceptions by default, so you should set $silent
to false
if you want exceptions to bubble out:
$result = $dispatcher->execute(new Inc([ ‘number’ => 123 ]), false);
Job Properties
When constructing a new Job
instance, you can set an array of job data, as well as following job properties:
priority
- Value between 0 and 4294967295 that determins how important the job is (a job with higher value has higher priority). Default is 0 (job is not a priority),attempts
- Number of attempts before job is considered to fail and is removed from the queue. Value can be between 1 and 256. Default is 1 (try once and fail if it does not go well),delay
- Number of seconds to wait before first execution (in case whenfirst_attempt_delay
is not set), as well as retries if the job fails and needs to be retried. Value can be between 1 and 7776000 (90 days). Default is 0 (no delay),first_attempt_delay
- Number of seconds to wait before the first job execution.
$job = new Inc([ 'number' => 123, 'priority' => Job::HAS_HIGHEST_PRIORITY, 'attempts' => 5, 'delay' => 5, 'first_attempt_delay' => 1 ]);
Accessing Properties in a Job
Once in an job's execute()
method, you can access job properties using getData()
method:
public function execute(): mixed { print_r($this->getData()); // Print all job properties print $this->getData('number') . "\n"; // Print only number }
Batches
Jobs can be added to the queue in batches. Once in a batch, job queue will execute them as any other job, but you will be able to track progress of batch:
$batch = $dispatcher->batch('Testing batch', function(BatchInterface &$batch) { for ($i = 1; $i <= 1000; $i++) { $batch->dispatch(new Inc(['number' => $i])); } }); sleep(1); print $batch->countJobs() . " jobs in a batch\n"; print $batch->countPendingJobs() . " batch jobs still pending for execution\n"; print $batch->countFailedJobs() . " batch jobs have failed to complete\n"; print $batch->countCompletedJobs() . " batch jobs were completed successfully\n";
All batches have name, so they are easy to find using command line tools.
Channels
In some situations, it is useful to have multiple channels and consumer listening on them. For example, you can have a consumer on a mailing server listening only on mail
channel, but not listening on other channels (which jobs it is not suited to perform).
By default, all jobs go to main channel (QueueInterface::MAIN_CHANNEL
), but channel can be specified when job is added to the queue:
$dispatcher->registerChannels('new'); $dispatcher->execute(new Inc(['number' => 123]), 'new');
By default, dispatcher will throw an exception if you try to add a job to an unknown channel. This can be turned off:
$dispatcher->exceptionOnUnregisteredChannel(false); // This job will end up in the 'main' channel, but exception will not be thrown $dispatcher->execute(new Inc(['number' => 123]), 'unknown channel');
Background Process
Jobs can report that they launched a process:
class ListAndForget extends Job { /** * Report that we launched a background process */ public function execute(): mixed { $output = []; exec("nohup ls -la > /dev/null 2>&1 & echo $!", $output); $pid = (integer) $output[1]; if ($pid > 0) { $this->reportBackgroundProcess($pid); } } }
When they do, queue clean up and maintenance routines will not consider this job as stuck as long as process with the given PID is running. When process is done (we can't find it), job is considered to be done.
Information about jobs that launched processes can be found using QueueInterface::getBackgroundProcesses()
method. This method returns an array, where each record in an array contains a job ID, job type and process ID:
print_r($dispatcher->getQueue()->getBackgroundProcesses());
will output something like this:
Array
(
[0] => Array
(
[id] => 1
[type] => ActiveCollab\JobsQueue\Test\Jobs\ProcessLauncher
[process_id] => 12345
)
)
Note: Process reporting and watching is not supported on Windows systems at the moment.
Executing CLI commands from jobs
If you need a job to simply run CLI command, there is a handy trait ExecuteCliCommand
. You can pass a command call signature, arguments and environment variables to the command.
Example usage:
class RunCommand extends Job { use ExecuteCliCommand; public function __construct(array $data = null) { $this->validateCommand($data); parent::__construct($data); } public function execute(): mixed { $data['command'] = 'php foobar.php' $data['command_environement_variables'] = ['foo' => 'bar']; $data['command_arguments'] = ['--baz' => 1]; return $this->prepareCommandFromData($this->getData()); } }
will produce a CLI command:
export FOO='bar' && php foobar.php --baz=1
Version 5.0 upgrade
Backward compatibility notes:
- Check all calls to
JobsDispatcher::batch()
method, and make sure that callbacks do not expect$batch
to be passed on by reference. - Remove
ActiveCollab\JobsQueue\DispatcherInterface
andActiveCollab\JobsQueue\Dispatcher
if used in client code.
To do
- Add logging to all relevant methods in MySQL queue
- Implement job quarantine