joby / smol-queue
A lightweight and simple job queue for PHP applications, backed by SQLite.
Installs: 0
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/joby/smol-queue
Requires
- php: >=8.1
- ext-pdo_sqlite: *
- joby/smol-uid: ^1.0
- laravel/serializable-closure: ^2.0
Requires (Dev)
- php: >=8.3
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^12.1
README
A lightweight PHP job queue library with parent/child relationships, automatic retry with exponential backoff, and priority scheduling.
Installation
composer require joby-lol/smol-queue
About
smol-queue provides a simple, persistent job queue backed by SQLite. Jobs are executed by workers, with support for:
- Parent/child relationships: Chain jobs together and pass data between them
- Automatic retry: Failed jobs retry with exponential backoff (1min, 5min, 10min, 20min... capped at 6 hours)
- Priority scheduling: HIGH, NORMAL, and LOW priority jobs
- Job tagging: Tag jobs for filtering and progress tracking
- Delayed execution: Schedule jobs to run only after a specific time
- Multi-worker support: Multiple workers can safely process jobs concurrently (but it is fundamentally designed for a small number of workers)
- Automatic cleanup tools: Easily remove old completed/failed jobs after a configurable retention period
Basic Usage
use Joby\Smol\Queue\Queue; $queue = new Queue('/path/to/queue.db'); // Add a job $job = $queue->add(function() { // Do some work return 'result'; }); // Run worker (processes jobs for 50 seconds) $queue->work(seconds: 50);
Adding Jobs
Simple Jobs
// Basic job $job = $queue->add(function() { sendEmail($to, $subject, $body); }); // Job with label $job = $queue->add( function() { return processData(); }, label: 'Process user data' );
Jobs with Tags
Tag jobs to categorize them and enable filtered processing and progress tracking.
// Single tag $job = $queue->add( function() { return processItem(); }, tags: 'batch-123' ); // Multiple tags $job = $queue->add( function() { return importData(); }, tags: ['import-2024', 'user-uploads', 'high-priority'] );
Jobs with Priority
use Joby\Smol\Queue\Priority; // High priority (processed first) $queue->add( function() { sendUrgentAlert(); }, priority: Priority::HIGH ); // Normal priority (default) $queue->add( function() { processOrder(); }, priority: Priority::NORMAL ); // Low priority (processed last) $queue->add( function() { cleanupOldFiles(); }, priority: Priority::LOW );
Scheduled Jobs
// Run 1 hour from now $queue->add( function() { sendReminder(); }, not_before: time() + 3600 ); // Run at specific time $datetime = new DateTime('tomorrow 9:00'); $queue->add( function() { generateReport(); }, not_before: $datetime );
Parent/Child Relationships
Chain jobs together to create workflows. Child jobs receive their parent's return value as input and only run if the parent succeeds.
// Parent job returns data $parent = $queue->add(function() { $data = fetchDataFromApi(); return $data; }); // Child job receives parent's result $child = $queue->add( function($data) { processData($data); return 'processed'; }, parent: $parent ); // Grandchild receives child's result $grandchild = $queue->add( function($result) { notifyCompletion($result); }, parent: $child );
Failed Parents Block Children
If a parent job fails after exhausting all retries, its children remain in PENDING state but never execute. They're eventually removed when the parent is cleaned up (CASCADE DELETE).
$parent = $queue->add(function() { throw new Exception('Parent failed'); }, max_attempts: 1); $child = $queue->add( function($data) { // This will never run return 'processed'; }, parent: $parent ); // Parent fails, child stays PENDING indefinitely // Both are eventually removed by cleanup()
Retry Logic
Failed jobs automatically retry with exponential backoff (default is 20 retries, spanning 80+ hours):
- Attempt 1: Retry after 1 minute
- Attempt 2: Retry after 5 minutes
- Attempt 3: Retry after 10 minutes
- Attempt 4: Retry after 20 minutes
- Attempt 5: Retry after 40 minutes
- Attempt 6: Retry after 80 minutes
- Attempt 7+: Retry after 160, 320 minutes... (capped at 6 hours)
// Default: 20 retry attempts $job = $queue->add(function() { unreliableOperation(); }); // Custom retry count $job = $queue->add( function() { attemptApiCall(); }, max_attempts: 5 ); // No retries (fail immediately) $job = $queue->add( function() { criticalOperation(); }, max_attempts: 1 );
Running Workers
Basic Worker
The following could be scheduled on a cron job to run every minute, and would spend up to 50 seconds out of every minute running jobs. Multiple workers are not very efficient, but will at least not step on each others' toes.
$queue = new Queue('/path/to/queue.db'); // Process jobs for 50 seconds (default) $queue->work(); // Process jobs for 2 minutes $queue->work(seconds: 120); // Custom abandoned job timeout (default is 2x the work time) $queue->work(seconds: 50, timeout: 300);
Tag-Filtered Workers
Workers can filter by tag to process only specific jobs. This is particularly useful for progress tracking and prioritizing certain batches.
// Process only jobs tagged 'batch-123' and exit when there are no more $queue->work(seconds: 50, tag: 'batch-123'); // Exit immediately when no more tagged jobs and poll for more jobs up to the time limit $queue->work(seconds: 50, tag: 'batch-123', polling: true);
Polling behavior
polling: false(default): Worker exits immediately when no jobs are availablepolling: true: Worker sleeps and polls for new jobs until time runs out
// Background daemon - keeps polling $queue->work(seconds: 590, polling: true); // Batch processor - exit when done $queue->work(seconds: 50, tag: 'import-batch', polling: false);
Job Tags
Managing Tags
// Get job's tags $tags = $job->tags(); // ['batch-123', 'user-data'] // Add tag to existing job $job->addTag('priority'); // Remove tag from job $job->removeTag('batch-123');
Progress Tracking
Track progress of tagged job batches:
// Create batch of jobs with unique tag $batchId = uniqid('batch-'); for ($i = 0; $i < 100; $i++) { $queue->add( function() use ($i) { return processItem($i); }, tags: $batchId ); } // Check progress $total = $queue->countJobs($batchId); $completed = $queue->countSuccessfulJobs($batchId); $failed = $queue->countFailedJobs($batchId); $progress = ($completed + $failed) / $total * 100;
Job Status and Data
Reading Job Information
$job = $queue->get($jobId); // Status $status = $job->status(); // Status enum: PENDING, RUNNING, SUCCEEDED, FAILED // Execution info $attempts = $job->attempts(); $maxAttempts = $job->max_attempts(); $createdAt = $job->created_at(); $finishedAt = $job->finished_at(); // null if not finished // Worker info $claimedBy = $job->claimed_by(); $claimedAt = $job->claimed_at(); // Results $result = $job->resultData(); // Return value if succeeded $error = $job->errorData(); // Exception data if failed // Scheduling $notBefore = $job->not_before(); $priority = $job->priority(); // Tags $tags = $job->tags(); // Relationships $parentId = $job->parentId(); $parent = $job->parent();
Job Results
When a job has completed successfully, its return value is available via resultData():
$job = $queue->add(function() { return ['processed' => 100, 'errors' => 0]; }); // After job completes $result = $queue->get($job->id())->resultData(); // ['processed' => 100, 'errors' => 0]
Job Errors
When a job fails, exception information is captured:
$job = $queue->add(function() { throw new RuntimeException('API timeout'); }); // After job fails $error = $queue->get($job->id())->errorData(); // [ // 'class' => 'RuntimeException', // 'message' => 'API timeout', // 'file' => '/path/to/file.php', // 'line' => 42, // 'trace' => '...', // ]
Cleanup
You should periodically run the cleanup task to remove old completed and failed jobs to prevent database bloat. For example:
// Delete jobs finished more than 7 days ago (default) $deleted = $queue->cleanup(); // Custom retention period (e.g., 24 hours) $deleted = $queue->cleanup(age_seconds: 86400); // Run via cron daily $queue->cleanup(age_seconds: 86400 * 7);
When a parent job is deleted, all its children are automatically removed via CASCADE DELETE, even if they never ran.
Advanced Features
Abandoned Job Recovery
Workers automatically detect and retry jobs that were abandoned (claimed but not finished within the timeout):
// Job claimed by worker that crashed $queue->work( seconds: 50, timeout: 120 // Jobs claimed >120 seconds ago are marked as failed );
Database Access
For advanced queries or administration:
$pdo = $queue->pdo(); // Custom queries $stmt = $pdo->query(" SELECT * FROM jobs WHERE status = 'failed' AND finished_at IS NOT NULL ");
Usage Patterns
Simple Background Tasks
// Send email asynchronously $queue->add(function() use ($userId) { $user = User::find($userId); Mail::send($user->email, 'Welcome!', $body); }); // Process uploaded file $queue->add(function() use ($filePath) { processImage($filePath); generateThumbnails($filePath); });
Multi-Step Workflows
// Step 1: Download file $download = $queue->add(function() use ($url) { $filePath = downloadFile($url); return $filePath; }, label: 'Download file'); // Step 2: Process file $process = $queue->add( function($filePath) { $data = parseFile($filePath); return $data; }, parent: $download, label: 'Process file' ); // Step 3: Store results $store = $queue->add( function($data) { storeInDatabase($data); }, parent: $process, label: 'Store results' );
Priority Processing
// Critical: Process payment $queue->add( function() use ($paymentId) { processPayment($paymentId); }, priority: Priority::HIGH, label: "Process payment $paymentId" ); // Normal: Send order confirmation $queue->add( function() use ($orderId) { sendOrderConfirmation($orderId); }, priority: Priority::NORMAL ); // Low: Generate analytics $queue->add( function() { updateAnalytics(); }, priority: Priority::LOW );
Scheduled Reports
// Daily report at 8 AM $tomorrow8am = (new DateTime('tomorrow 8:00'))->getTimestamp(); $queue->add( function() { $report = generateDailyReport(); emailReport($report); }, not_before: $tomorrow8am, label: 'Daily report' );
Batch Processing with Progress Tracking
// Create batch with unique tag $batchId = "import-" . date('Y-m-d-His'); // Add 1000 jobs to the batch $items = getItemsToImport(); foreach ($items as $item) { $queue->add( function() use ($item) { importItem($item); }, tags: $batchId, label: "Import item {$item->id}" ); } // Spawn worker for this batch (exits when done) exec("php worker.php $batchId > /dev/null 2>&1 &"); // Check progress in web request $total = count($items); $completed = $queue->countSuccessfulJobs($batchId); $failed = $queue->countFailedJobs($batchId); $pending = $queue->countPendingJobs($batchId); echo json_encode([ 'total' => $total, 'completed' => $completed, 'failed' => $failed, 'pending' => $pending, 'progress' => ($completed + $failed) / $total * 100 ]);
Reliable External API Calls
// Automatically retries with exponential backoff $queue->add( function() use ($orderId) { $response = Http::post('https://api.example.com/orders', [ 'order_id' => $orderId ]); if (!$response->successful()) { throw new Exception('API call failed'); } return $response->json(); }, max_attempts: 10, label: "Sync order $orderId" );
User-Specific Job Processing
// Tag jobs by user $userId = 123; $queue->add( function() use ($userId) { generateUserReport($userId); }, tags: "user-$userId", label: "Report for user $userId" ); // Process all jobs for a specific user $queue->work(seconds: 30, tag: "user-$userId");
Performance Considerations
Worker Configuration
// Short-running workers (web requests spawning workers) $queue->work(seconds: 2); // Long-running workers (cron/daemon, recommended) $queue->work(seconds: 590, polling: true);
Database Maintenance
// Regular cleanup (daily cron) $queue->cleanup(age_seconds: 86400 * 7); // For heavy usage, consider shorter retention $queue->cleanup(age_seconds: 86400 * 1); // Keep only 1 day
Requirements
Fully tested on PHP 8.3+, static analysis for PHP 8.1+.
License
MIT License - See LICENSE file for details.