n0nag0n / simple-job-queue
A simple library for interfacing with other job queue providers that gives you plenty of flexibility. Supports MySQL, PostgreSQL, SQLite, and Beanstalkd.
Requires
- php: ^7.4 || ^8.0
- ext-pdo: *
Requires (Dev)
- ext-pdo_sqlite: *
- phpunit/phpunit: ^9.6
- rregeer/phpunit-coverage-check: ^0.3.1
Suggests
- pda/pheanstalk: ^4.0 for Beanstalkd queue support
This package is auto-updated.
Last update: 2026-06-14 01:06:31 UTC
README
I wanted/needed a simple job queue that could be used on a database, but also used with other adapters like beanstalkd/redis/etc if needed. Didn't really see a good option for a standalone job queue for a database.
Install
composer require n0nag0n/simple-job-queue
Requirements: PHP 7.4 or higher.
Note on Beanstalkd: The
pda/pheanstalkpackage is only needed if you use the Beanstalkd adapter. It is listed undersuggestincomposer.json(not a hard requirement) so pure database users are not forced to install it.
Usage
In order for this to work, you need a way to add jobs to the queue and a way to process the jobs (a worker). Below are examples of how to add a job to the queue and how to process the job.
Adding a new job
MySQL
<?php use n0nag0n\Job_Queue; // default is mysql based job queue $Job_Queue = new Job_Queue('mysql', [ 'mysql' => [ 'table_name' => 'new_table_name', // default is job_queue_jobs 'use_compression' => false // default is true to use COMPRESS() and UNCOMPRESS() for payload ] ]); $PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass'); $Job_Queue->addQueueConnection($PDO); $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
PostgreSQL
<?php use n0nag0n\Job_Queue; // default is pgsql based job queue $Job_Queue = new Job_Queue('pgsql', [ 'pgsql' => [ 'table_name' => 'new_table_name', // default is job_queue_jobs ] ]); $PDO = new PDO('pgsql:dbname=testdb;host=127.0.0.1', 'user', 'pass'); $Job_Queue->addQueueConnection($PDO); $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
SQLite3
<?php use n0nag0n\Job_Queue; // default is sqlite based job queue $Job_Queue = new Job_Queue('sqlite', [ 'sqlite' => [ 'table_name' => 'new_table_name' // default is job_queue_jobs ] ]); $PDO = new PDO('sqlite:example.db'); $Job_Queue->addQueueConnection($PDO); $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
Beanstalkd
<?php use n0nag0n\Job_Queue; // default is beanstalkd based job queue $Job_Queue = new Job_Queue('beanstalkd'); $pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1'); $Job_Queue->addQueueConnection($pheanstalk); $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
Running a worker
See example_worker.php for file or see below:
<?php $Job_Queue = new n0nag0n\Job_Queue('mysql'); $PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass'); $Job_Queue->addQueueConnection($PDO); $Job_Queue->watchPipeline('send_important_emails'); while(true) { $job = $Job_Queue->getNextJobAndReserve(); // adjust to whatever makes you sleep better at night (for database queues only, beanstalkd does not need this if statement) if(empty($job)) { usleep(500000); continue; } echo "Processing {$job['id']}\n"; $payload = json_decode($job['payload'], true); try { // NOTE: doSomethingThatDoesSomething is an example placeholder for your own job handler. $result = doSomethingThatDoesSomething($payload); if($result === true) { $Job_Queue->deleteJob($job); } else { // this takes it out of the ready queue and puts it in another queue that can be picked up and "kicked" later. $Job_Queue->buryJob($job); } } catch(Exception $e) { $Job_Queue->buryJob($job); } }
Handling long processes
Supervisord is going to be your jam. Look up the many, many articles on how to implement this.
Configurable options
Several behaviors can be tuned via the $options array passed to the constructor (or setOptions()):
stale_timeout(int, seconds): How long to wait before a reserved job is considered "stale" and can be re-claimed by another worker (DB backends only). Default:300(5 minutes). This default preserves the historical behavior exactly.
Example:
$Job_Queue = new Job_Queue('mysql', [ 'stale_timeout' => 120, // 2 minutes 'mysql' => [ 'use_compression' => true ] ]);
Additional methods (additive, non-breaking)
New methods have been added that do not change any existing behavior:
releaseJob($job)— Un-reserves a job that was previously obtained viagetNextJobAndReserve(), making it available again immediately (useful if processing was abandoned).getReadyCount(?string $pipeline = null): intgetBuriedCount(?string $pipeline = null): int
These work for all supported backends. Pass a pipeline name to query a specific one, or omit to use the currently selected pipeline.
See the source or tests for usage examples. All new functionality follows the same "select/watch pipeline + connection" contract as the rest of the library.
Testing
PHPUnit Tests with sqlite3 examples for the time being.
vendor/bin/phpunit
# or via composer
composer test
composer test-coverage # requires Xdebug; enforces 100% coverage