solophp/job-queue

A modern PHP job queue built on top of Doctrine DBAL with object-oriented job handling.

v1.0.0 2025-09-30 16:50 UTC

This package is auto-updated.

Last update: 2025-09-30 16:51:12 UTC


README

Latest Version on Packagist License PHP Version

A modern PHP job queue built on top of Doctrine DBAL with object-oriented job handling. Supports scheduled execution, retries, job expiration, indexed job types, automatic deletion of completed jobs, and type-safe job classes.

๐Ÿงฐ Features

  • Type-Safe Jobs โ€“ Object-oriented job classes with JobInterface
  • Job Retries โ€“ Configurable max retry attempts before marking as failed
  • Job Expiration โ€“ Automatic expiration via expires_at timestamp
  • Indexed Job Types โ€“ Fast filtering by type
  • Row-Level Locking โ€“ Prevents concurrent execution of the same job
  • Transactional Safety โ€“ All job operations are executed within a transaction
  • Optional Process Locking โ€“ Prevent overlapping workers using LockGuard
  • Optional Deletion on Success โ€“ Set deleteOnSuccess: true to automatically delete jobs after success
  • Dependency Injection โ€“ Jobs can receive dependencies through constructor

๐Ÿ“ฆ Installation

composer require solophp/job-queue

๐Ÿ“‹ Requirements

  • PHP: >= 8.2
  • Extensions:
    • ext-json - for JSON payload handling
    • ext-posix - for LockGuard process locking (optional)
  • Dependencies:
    • doctrine/dbal - for database operations

This package uses Doctrine DBAL for database abstraction, providing support for multiple database platforms (MySQL, PostgreSQL, SQLite, SQL Server, Oracle, etc.) with consistent API.

โš™๏ธ Setup

use Solo\JobQueue\JobQueue;
use Doctrine\DBAL\DriverManager;

$connectionParams = [
    'dbname' => 'test',
    'user' => 'username',
    'password' => 'password',
    'host' => 'localhost',
    'driver' => 'pdo_mysql',
];
$connection = DriverManager::getConnection($connectionParams);
$queue = new JobQueue($connection, table: 'jobs', maxRetries: 5, deleteOnSuccess: true);
$queue->install(); // creates the jobs table if not exists

๐Ÿš€ Usage

Create a Job Class

use Solo\Contracts\JobQueue\JobInterface;

class SendEmailJob implements JobInterface
{
    public function __construct(
        private string $to,
        private string $subject,
        private string $body
    ) {}

    public function handle(): void
    {
        // Send email logic here
        mail($this->to, $this->subject, $this->body);
    }
}

Push Jobs to Queue

$job = new SendEmailJob('user@example.com', 'Welcome!', 'Welcome to our service');
$jobId = $queue->push($job);

// With type for filtering
$jobId = $queue->push($job, 'email');

// Schedule for later
$scheduledJob = new SendEmailJob('user@example.com', 'Reminder', 'Don\'t forget!');
$queue->push($scheduledJob, 'email', new DateTimeImmutable('tomorrow'));

Process Jobs

$queue->processJobs(10); // Process up to 10 jobs

๐Ÿ”’ Using LockGuard (optional)

use Solo\JobQueue\LockGuard;

$lockFile = __DIR__ . '/storage/locks/my_worker.lock';
$lock = new LockGuard($lockFile);

if (!$lock->acquire()) {
    exit(0); // Another worker is already running
}

try {
    $queue->processJobs();
} finally {
    $lock->release(); // Optional, auto-released on shutdown
}

๐Ÿ”— Integration with Async Event-Dispatcher

JobQueue provides seamless integration with SoloPHP Async Event-Dispatcher through the built-in SoloJobQueueAdapter and AsyncEventJob classes.

Basic Setup

use Solo\AsyncEventDispatcher\{AsyncEventDispatcher, ReferenceListenerRegistry, ListenerReference};
use Solo\AsyncEventDispatcher\Adapter\SoloJobQueueAdapter;
use Solo\JobQueue\JobQueue;

// Setup JobQueue
$queue = new JobQueue($connection);
$queue->install();

// Setup AsyncEventDispatcher
$registry = new ReferenceListenerRegistry();
$registry->addReference(UserRegistered::class, new ListenerReference(SendWelcomeEmail::class, 'handle'));

$adapter = new SoloJobQueueAdapter($queue, $container);
$dispatcher = new AsyncEventDispatcher($registry, $adapter);

// Dispatch events - they'll be queued as jobs automatically
$dispatcher->dispatch(new UserRegistered('john@example.com'));

// Process async events
$queue->processJobs(10, 'async_event');

The AsyncEventJob is automatically used by the adapter to handle event deserialization and listener execution.

For complete async event processing setup, refer to the Async Event-Dispatcher documentation.

๐Ÿงช API Methods

Method Description
install() Create the jobs table
push(JobInterface $job, ?string $type = null, ?DateTimeImmutable $scheduledAt = null, ?DateTimeImmutable $expiresAt = null) Push job to the queue with optional type filtering
addJob(array $payload, ?DateTimeImmutable $scheduledAt = null, ?DateTimeImmutable $expiresAt = null, ?string $type = null) Add job to the queue using raw payload data
getPendingJobs(int $limit = 10, ?string $onlyType = null) Retrieve ready-to-run jobs, optionally filtered by type
markCompleted(int $jobId) Mark job as completed
markFailed(int $jobId, string $error = '') Mark job as failed with error message
processJobs(int $limit = 10, ?string $onlyType = null) Process pending jobs

๐Ÿ“„ License

This project is open-sourced under the MIT license.