joby/smol-queue

A lightweight and simple job queue for PHP applications, backed by SQLite.

Installs: 0

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/joby/smol-queue

v1.0.0 2026-01-15 19:53 UTC

This package is auto-updated.

Last update: 2026-01-17 22:45:20 UTC


README

A lightweight PHP job queue library with parent/child relationships, automatic retry with exponential backoff, and priority scheduling.

Installation

composer require joby-lol/smol-queue

About

smol-queue provides a simple, persistent job queue backed by SQLite. Jobs are executed by workers, with support for:

  • Parent/child relationships: Chain jobs together and pass data between them
  • Automatic retry: Failed jobs retry with exponential backoff (1min, 5min, 10min, 20min... capped at 6 hours)
  • Priority scheduling: HIGH, NORMAL, and LOW priority jobs
  • Job tagging: Tag jobs for filtering and progress tracking
  • Delayed execution: Schedule jobs to run only after a specific time
  • Multi-worker support: Multiple workers can safely process jobs concurrently (but it is fundamentally designed for a small number of workers)
  • Automatic cleanup tools: Easily remove old completed/failed jobs after a configurable retention period

Basic Usage

use Joby\Smol\Queue\Queue;

$queue = new Queue('/path/to/queue.db');

// Add a job
$job = $queue->add(function() {
    // Do some work
    return 'result';
});

// Run worker (processes jobs for 50 seconds)
$queue->work(seconds: 50);

Adding Jobs

Simple Jobs

// Basic job
$job = $queue->add(function() {
    sendEmail($to, $subject, $body);
});

// Job with label
$job = $queue->add(
    function() { return processData(); },
    label: 'Process user data'
);

Jobs with Tags

Tag jobs to categorize them and enable filtered processing and progress tracking.

// Single tag
$job = $queue->add(
    function() { return processItem(); },
    tags: 'batch-123'
);

// Multiple tags
$job = $queue->add(
    function() { return importData(); },
    tags: ['import-2024', 'user-uploads', 'high-priority']
);

Jobs with Priority

use Joby\Smol\Queue\Priority;

// High priority (processed first)
$queue->add(
    function() { sendUrgentAlert(); },
    priority: Priority::HIGH
);

// Normal priority (default)
$queue->add(
    function() { processOrder(); },
    priority: Priority::NORMAL
);

// Low priority (processed last)
$queue->add(
    function() { cleanupOldFiles(); },
    priority: Priority::LOW
);

Scheduled Jobs

// Run 1 hour from now
$queue->add(
    function() { sendReminder(); },
    not_before: time() + 3600
);

// Run at specific time
$datetime = new DateTime('tomorrow 9:00');
$queue->add(
    function() { generateReport(); },
    not_before: $datetime
);

Parent/Child Relationships

Chain jobs together to create workflows. Child jobs receive their parent's return value as input and only run if the parent succeeds.

// Parent job returns data
$parent = $queue->add(function() {
    $data = fetchDataFromApi();
    return $data;
});

// Child job receives parent's result
$child = $queue->add(
    function($data) {
        processData($data);
        return 'processed';
    },
    parent: $parent
);

// Grandchild receives child's result
$grandchild = $queue->add(
    function($result) {
        notifyCompletion($result);
    },
    parent: $child
);

Failed Parents Block Children

If a parent job fails after exhausting all retries, its children remain in PENDING state but never execute. They're eventually removed when the parent is cleaned up (CASCADE DELETE).

$parent = $queue->add(function() {
    throw new Exception('Parent failed');
}, max_attempts: 1);

$child = $queue->add(
    function($data) {
        // This will never run
        return 'processed';
    },
    parent: $parent
);

// Parent fails, child stays PENDING indefinitely
// Both are eventually removed by cleanup()

Retry Logic

Failed jobs automatically retry with exponential backoff (default is 20 retries, spanning 80+ hours):

  • Attempt 1: Retry after 1 minute
  • Attempt 2: Retry after 5 minutes
  • Attempt 3: Retry after 10 minutes
  • Attempt 4: Retry after 20 minutes
  • Attempt 5: Retry after 40 minutes
  • Attempt 6: Retry after 80 minutes
  • Attempt 7+: Retry after 160, 320 minutes... (capped at 6 hours)
// Default: 20 retry attempts
$job = $queue->add(function() {
    unreliableOperation();
});

// Custom retry count
$job = $queue->add(
    function() { attemptApiCall(); },
    max_attempts: 5
);

// No retries (fail immediately)
$job = $queue->add(
    function() { criticalOperation(); },
    max_attempts: 1
);

Running Workers

Basic Worker

The following could be scheduled on a cron job to run every minute, and would spend up to 50 seconds out of every minute running jobs. Multiple workers are not very efficient, but will at least not step on each others' toes.

$queue = new Queue('/path/to/queue.db');

// Process jobs for 50 seconds (default)
$queue->work();

// Process jobs for 2 minutes
$queue->work(seconds: 120);

// Custom abandoned job timeout (default is 2x the work time)
$queue->work(seconds: 50, timeout: 300);

Tag-Filtered Workers

Workers can filter by tag to process only specific jobs. This is particularly useful for progress tracking and prioritizing certain batches.

// Process only jobs tagged 'batch-123' and exit when there are no more
$queue->work(seconds: 50, tag: 'batch-123');

// Exit immediately when no more tagged jobs and poll for more jobs up to the time limit
$queue->work(seconds: 50, tag: 'batch-123', polling: true);

Polling behavior

  • polling: false (default): Worker exits immediately when no jobs are available
  • polling: true: Worker sleeps and polls for new jobs until time runs out
// Background daemon - keeps polling
$queue->work(seconds: 590, polling: true);

// Batch processor - exit when done
$queue->work(seconds: 50, tag: 'import-batch', polling: false);

Job Tags

Managing Tags

// Get job's tags
$tags = $job->tags(); // ['batch-123', 'user-data']

// Add tag to existing job
$job->addTag('priority');

// Remove tag from job
$job->removeTag('batch-123');

Progress Tracking

Track progress of tagged job batches:

// Create batch of jobs with unique tag
$batchId = uniqid('batch-');
for ($i = 0; $i < 100; $i++) {
    $queue->add(
        function() use ($i) { return processItem($i); },
        tags: $batchId
    );
}

// Check progress
$total = $queue->countJobs($batchId);
$completed = $queue->countSuccessfulJobs($batchId);
$failed = $queue->countFailedJobs($batchId);

$progress = ($completed + $failed) / $total * 100;

Job Status and Data

Reading Job Information

$job = $queue->get($jobId);

// Status
$status = $job->status(); // Status enum: PENDING, RUNNING, SUCCEEDED, FAILED

// Execution info
$attempts = $job->attempts();
$maxAttempts = $job->max_attempts();
$createdAt = $job->created_at();
$finishedAt = $job->finished_at(); // null if not finished

// Worker info
$claimedBy = $job->claimed_by();
$claimedAt = $job->claimed_at();

// Results
$result = $job->resultData(); // Return value if succeeded
$error = $job->errorData();   // Exception data if failed

// Scheduling
$notBefore = $job->not_before();
$priority = $job->priority();

// Tags
$tags = $job->tags();

// Relationships
$parentId = $job->parentId();
$parent = $job->parent();

Job Results

When a job has completed successfully, its return value is available via resultData():

$job = $queue->add(function() {
    return ['processed' => 100, 'errors' => 0];
});

// After job completes
$result = $queue->get($job->id())->resultData();
// ['processed' => 100, 'errors' => 0]

Job Errors

When a job fails, exception information is captured:

$job = $queue->add(function() {
    throw new RuntimeException('API timeout');
});

// After job fails
$error = $queue->get($job->id())->errorData();
// [
//   'class' => 'RuntimeException',
//   'message' => 'API timeout',
//   'file' => '/path/to/file.php',
//   'line' => 42,
//   'trace' => '...',
// ]

Cleanup

You should periodically run the cleanup task to remove old completed and failed jobs to prevent database bloat. For example:

// Delete jobs finished more than 7 days ago (default)
$deleted = $queue->cleanup();

// Custom retention period (e.g., 24 hours)
$deleted = $queue->cleanup(age_seconds: 86400);

// Run via cron daily
$queue->cleanup(age_seconds: 86400 * 7);

When a parent job is deleted, all its children are automatically removed via CASCADE DELETE, even if they never ran.

Advanced Features

Abandoned Job Recovery

Workers automatically detect and retry jobs that were abandoned (claimed but not finished within the timeout):

// Job claimed by worker that crashed
$queue->work(
    seconds: 50,
    timeout: 120  // Jobs claimed >120 seconds ago are marked as failed
);

Database Access

For advanced queries or administration:

$pdo = $queue->pdo();

// Custom queries
$stmt = $pdo->query("
    SELECT * FROM jobs 
    WHERE status = 'failed' 
    AND finished_at IS NOT NULL
");

Usage Patterns

Simple Background Tasks

// Send email asynchronously
$queue->add(function() use ($userId) {
    $user = User::find($userId);
    Mail::send($user->email, 'Welcome!', $body);
});

// Process uploaded file
$queue->add(function() use ($filePath) {
    processImage($filePath);
    generateThumbnails($filePath);
});

Multi-Step Workflows

// Step 1: Download file
$download = $queue->add(function() use ($url) {
    $filePath = downloadFile($url);
    return $filePath;
}, label: 'Download file');

// Step 2: Process file
$process = $queue->add(
    function($filePath) {
        $data = parseFile($filePath);
        return $data;
    },
    parent: $download,
    label: 'Process file'
);

// Step 3: Store results
$store = $queue->add(
    function($data) {
        storeInDatabase($data);
    },
    parent: $process,
    label: 'Store results'
);

Priority Processing

// Critical: Process payment
$queue->add(
    function() use ($paymentId) {
        processPayment($paymentId);
    },
    priority: Priority::HIGH,
    label: "Process payment $paymentId"
);

// Normal: Send order confirmation
$queue->add(
    function() use ($orderId) {
        sendOrderConfirmation($orderId);
    },
    priority: Priority::NORMAL
);

// Low: Generate analytics
$queue->add(
    function() {
        updateAnalytics();
    },
    priority: Priority::LOW
);

Scheduled Reports

// Daily report at 8 AM
$tomorrow8am = (new DateTime('tomorrow 8:00'))->getTimestamp();

$queue->add(
    function() {
        $report = generateDailyReport();
        emailReport($report);
    },
    not_before: $tomorrow8am,
    label: 'Daily report'
);

Batch Processing with Progress Tracking

// Create batch with unique tag
$batchId = "import-" . date('Y-m-d-His');

// Add 1000 jobs to the batch
$items = getItemsToImport();
foreach ($items as $item) {
    $queue->add(
        function() use ($item) {
            importItem($item);
        },
        tags: $batchId,
        label: "Import item {$item->id}"
    );
}

// Spawn worker for this batch (exits when done)
exec("php worker.php $batchId > /dev/null 2>&1 &");

// Check progress in web request
$total = count($items);
$completed = $queue->countSuccessfulJobs($batchId);
$failed = $queue->countFailedJobs($batchId);
$pending = $queue->countPendingJobs($batchId);

echo json_encode([
    'total' => $total,
    'completed' => $completed,
    'failed' => $failed,
    'pending' => $pending,
    'progress' => ($completed + $failed) / $total * 100
]);

Reliable External API Calls

// Automatically retries with exponential backoff
$queue->add(
    function() use ($orderId) {
        $response = Http::post('https://api.example.com/orders', [
            'order_id' => $orderId
        ]);
        
        if (!$response->successful()) {
            throw new Exception('API call failed');
        }
        
        return $response->json();
    },
    max_attempts: 10,
    label: "Sync order $orderId"
);

User-Specific Job Processing

// Tag jobs by user
$userId = 123;
$queue->add(
    function() use ($userId) {
        generateUserReport($userId);
    },
    tags: "user-$userId",
    label: "Report for user $userId"
);

// Process all jobs for a specific user
$queue->work(seconds: 30, tag: "user-$userId");

Performance Considerations

Worker Configuration

// Short-running workers (web requests spawning workers)
$queue->work(seconds: 2);

// Long-running workers (cron/daemon, recommended)
$queue->work(seconds: 590, polling: true);

Database Maintenance

// Regular cleanup (daily cron)
$queue->cleanup(age_seconds: 86400 * 7);

// For heavy usage, consider shorter retention
$queue->cleanup(age_seconds: 86400 * 1); // Keep only 1 day

Requirements

Fully tested on PHP 8.3+, static analysis for PHP 8.1+.

License

MIT License - See LICENSE file for details.