solophp/task-queue

Enhanced PHP task queue using Solo Database with retry mechanism and task expiration support.

v1.1.0 2025-03-24 06:43 UTC

This package is auto-updated.

Last update: 2025-03-24 06:45:12 UTC


README

Version
License

A lightweight PHP task queue built on top of the Solo Database.
Supports scheduled execution, retries, task expiration, indexed task types, automatic deletion of completed tasks, and optional process-level locking via LockGuard.

๐Ÿ“ฆ Installation

composer require solophp/task-queue

โš™๏ธ Setup

use Solo\Queue\TaskQueue;

$queue = new TaskQueue($db, table: 'tasks', maxRetries: 5, deleteOnSuccess: true);
$queue->install(); // creates the tasks table if not exists

๐Ÿš€ Usage

Add a task:

$taskId = $queue->addTask(
    'email_notification',
    ['type' => 'email_notification', 'user_id' => 123, 'template' => 'welcome'],
    new DateTimeImmutable('tomorrow') // optional, defaults to now
);

Process all tasks:

$queue->processPendingTasks(function (string $name, array $payload) {
    match ($name) {
        'email_notification' => sendEmail($payload),
        'push_notification' => sendPush($payload),
        default => throw new RuntimeException("Unknown task: $name")
    };
});

Process only specific type:

$queue->processPendingTasks(function (string $name, array $payload) {
    sendEmail($payload);
}, 20, 'email_notification'); // only tasks where payload_type column = 'email_notification'

๐Ÿ”’ Using LockGuard (optional)

use Solo\TaskQueue\LockGuard;

$lockFile = __DIR__ . '/storage/locks/my_worker.lock';
$lock = new LockGuard($lockFile); // path to lock file

if (!$lock->acquire()) {
    exit(0); // Another worker is already running
}

try {
    $queue->processPendingTasks(...);
} finally {
    $lock->release(); // Optional, auto-released on shutdown
}

๐Ÿงฐ Features

  • Task Retries โ€“ Configurable max retry attempts before marking as failed
  • Task Expiration โ€“ Automatic expiration via expires_at timestamp
  • Indexed Task Types โ€“ Fast filtering by payload_type
  • Row-Level Locking โ€“ Prevents concurrent execution of the same task
  • Transactional Safety โ€“ All task operations are executed within a transaction
  • Optional Process Locking โ€“ Prevent overlapping workers using LockGuard
  • Optional Deletion on Success โ€“ Set deleteOnSuccess: true to automatically delete tasks after success

๐Ÿงช API Methods

Method Description
install() Create the tasks table
addTask(string $name, array $payload, ?DateTimeImmutable $scheduledAt = null, ?DateTimeImmutable $expiresAt = null) Add task to the queue (default schedule: now)
getPendingTasks(int $limit = 10, ?string $onlyType = null) Retrieve ready-to-run tasks, optionally filtered by type
markCompleted(int $taskId) Mark task as completed
markFailed(int $taskId, string $error = '') Mark task as failed with error message
processPendingTasks(callable $callback, int $limit = 10, ?string $onlyType = null) Process pending tasks with a custom handler

๐Ÿ“„ License

This project is open-sourced under the MIT license.