solophp / task-queue
Enhanced PHP task queue using Solo Database with retry mechanism and task expiration support.
v1.1.0
2025-03-24 06:43 UTC
Requires
- php: >=8.2
- ext-json: *
- ext-posix: *
- solophp/database: ^2.8
README
A lightweight PHP task queue built on top of the Solo Database.
Supports scheduled execution, retries, task expiration, indexed task types, automatic deletion of completed tasks, and optional process-level locking via LockGuard
.
๐ฆ Installation
composer require solophp/task-queue
โ๏ธ Setup
use Solo\Queue\TaskQueue; $queue = new TaskQueue($db, table: 'tasks', maxRetries: 5, deleteOnSuccess: true); $queue->install(); // creates the tasks table if not exists
๐ Usage
Add a task:
$taskId = $queue->addTask( 'email_notification', ['type' => 'email_notification', 'user_id' => 123, 'template' => 'welcome'], new DateTimeImmutable('tomorrow') // optional, defaults to now );
Process all tasks:
$queue->processPendingTasks(function (string $name, array $payload) { match ($name) { 'email_notification' => sendEmail($payload), 'push_notification' => sendPush($payload), default => throw new RuntimeException("Unknown task: $name") }; });
Process only specific type:
$queue->processPendingTasks(function (string $name, array $payload) { sendEmail($payload); }, 20, 'email_notification'); // only tasks where payload_type column = 'email_notification'
๐ Using LockGuard
(optional)
use Solo\TaskQueue\LockGuard; $lockFile = __DIR__ . '/storage/locks/my_worker.lock'; $lock = new LockGuard($lockFile); // path to lock file if (!$lock->acquire()) { exit(0); // Another worker is already running } try { $queue->processPendingTasks(...); } finally { $lock->release(); // Optional, auto-released on shutdown }
๐งฐ Features
- Task Retries โ Configurable max retry attempts before marking as failed
- Task Expiration โ Automatic expiration via
expires_at
timestamp - Indexed Task Types โ Fast filtering by
payload_type
- Row-Level Locking โ Prevents concurrent execution of the same task
- Transactional Safety โ All task operations are executed within a transaction
- Optional Process Locking โ Prevent overlapping workers using
LockGuard
- Optional Deletion on Success โ Set
deleteOnSuccess: true
to automatically delete tasks after success
๐งช API Methods
Method | Description |
---|---|
install() |
Create the tasks table |
addTask(string $name, array $payload, ?DateTimeImmutable $scheduledAt = null, ?DateTimeImmutable $expiresAt = null) |
Add task to the queue (default schedule: now) |
getPendingTasks(int $limit = 10, ?string $onlyType = null) |
Retrieve ready-to-run tasks, optionally filtered by type |
markCompleted(int $taskId) |
Mark task as completed |
markFailed(int $taskId, string $error = '') |
Mark task as failed with error message |
processPendingTasks(callable $callback, int $limit = 10, ?string $onlyType = null) |
Process pending tasks with a custom handler |
๐ License
This project is open-sourced under the MIT license.