darkjest / deferq
Async task manager with deduplication, result caching and callback notifications
Requires
- php: ^8.4
- psr/log: ^3.0
- psr/simple-cache: ^3.0
- ramsey/uuid: ^4.7
Requires (Dev)
- php-amqplib/php-amqplib: ^3.7
- phpunit/phpunit: ^11.0
- predis/predis: ^2.0
Suggests
- opis/closure: Required for serializing Closure callbacks
- php-amqplib/php-amqplib: Required for RabbitMQQueueAdapter
- predis/predis: Required for RedisQueueAdapter
This package is auto-updated.
Last update: 2026-03-03 20:49:56 UTC
README
Документация на русском (README.ru.md)
Async task manager for PHP 8.4 with built-in deduplication, result caching (PSR-16), and callback notifications. Submit heavy tasks (report generation, data exports, etc.), and DeferQ will ensure each unique task runs only once, cache results for subsequent requests, and notify your application when work is done.
Installation
composer require darkjest/deferq
For Redis queue support:
composer require predis/predis
For RabbitMQ queue support:
composer require php-amqplib/php-amqplib
Quick Start
1. Configure DeferQ
<?php use DeferQ\DeferQ; use DeferQ\Fingerprint\DefaultFingerprintGenerator; use DeferQ\Lock\CacheLock; use DeferQ\Queue\RedisQueueAdapter; use DeferQ\Result\CacheResultStore; use DeferQ\Store\CacheTaskStore; use Predis\Client; use Symfony\Component\Cache\Psr16Cache; use Symfony\Component\Cache\Adapter\RedisAdapter; $redis = new Client('tcp://127.0.0.1:6379'); $cache = new Psr16Cache(new RedisAdapter($redis)); $deferq = new DeferQ( queue: new RedisQueueAdapter($redis), taskStore: new CacheTaskStore($cache), resultStore: new CacheResultStore($cache), lock: new CacheLock($cache), fingerprinter: new DefaultFingerprintGenerator(), );
2. Register Task Handlers
<?php use DeferQ\Handler\TaskHandlerInterface; use DeferQ\Handler\TaskHandlerRegistry; use DeferQ\Task\Task; class ReportGenerateHandler implements TaskHandlerInterface { public function handle(Task $task): mixed { $year = $task->params['year']; $format = $task->params['format']; // ... heavy computation ... return ['url' => "/reports/report-{$year}.{$format}", 'rows' => 15000]; } } $handlers = new TaskHandlerRegistry(); $handlers->register('report.generate', new ReportGenerateHandler());
3. Dispatch a Task
<?php use DeferQ\Task\TaskStatus; $receipt = $deferq->dispatch( name: 'report.generate', params: ['year' => 2024, 'format' => 'xlsx'], resultTtl: 3600, ); match ($receipt->status) { TaskStatus::Completed => handleReady($receipt->result), TaskStatus::Running => pollLater($receipt->taskId), TaskStatus::Pending => pollLater($receipt->taskId), TaskStatus::Failed => handleError($receipt->taskId), };
4. Poll for Status
$receipt = $deferq->getStatus($taskId); if ($receipt->status === TaskStatus::Completed) { $result = $receipt->result; }
5. Run the Worker
<?php use DeferQ\Worker\Worker; use DeferQ\Worker\WorkerConfig; $worker = new Worker( queue: $queueAdapter, handlers: $handlers, taskStore: $taskStore, resultStore: $resultStore, config: new WorkerConfig( sleepMs: 1000, maxJobs: 500, maxMemoryMb: 128, taskTimeoutSeconds: 300, ), logger: $psrLogger, ); $worker->run();
Or use the CLI worker:
php bin/deferq-worker --bootstrap=worker-bootstrap.php --max-jobs=1000 --sleep=500
Deduplication
DeferQ prevents duplicate execution of identical tasks using fingerprinting:
- When you call
dispatch(), DeferQ generates a SHA-256 fingerprint from the task name and canonically sorted parameters. - If a result for this fingerprint already exists in cache, it returns immediately with
TaskStatus::Completed. - If a task with this fingerprint is already pending or running, the existing task's receipt is returned — no new task is created.
- Only if no cached result and no active task exist, a new task is created and queued.
This means 100 users requesting the same report simultaneously will trigger only a single execution.
// Both calls return the same task receipt (deduplication) $receipt1 = $deferq->dispatch('report.generate', ['year' => 2024, 'format' => 'xlsx']); $receipt2 = $deferq->dispatch('report.generate', ['format' => 'xlsx', 'year' => 2024]); // $receipt1->taskId === $receipt2->taskId
Parameter key ordering does not matter — ['a' => 1, 'b' => 2] and ['b' => 2, 'a' => 1] produce the same fingerprint.
Callbacks
Callbacks are invoked by the worker after a task completes and its result is saved to cache. Implement CallbackInterface for production use:
<?php use DeferQ\Callback\CallbackInterface; use DeferQ\Task\Task; class WebSocketNotifier implements CallbackInterface { public function __construct(private WebSocketServer $ws) {} public function __invoke(Task $task, mixed $result): void { $this->ws->send($task->id, json_encode($result)); } } $receipt = $deferq->dispatch( name: 'report.generate', params: ['year' => 2024], callback: new WebSocketNotifier($ws), );
Chain multiple callbacks with CallbackChain:
<?php use DeferQ\Callback\CallbackChain; $receipt = $deferq->dispatch( name: 'report.generate', params: ['year' => 2024], callback: new CallbackChain( new WebSocketNotifier($ws), new EmailNotifier($mailer), new MetricsRecorder($metrics), ), );
Callback failures are caught and logged — they never crash the worker.
Custom Queue Adapter
Implement QueueAdapterInterface to use any queue backend:
<?php use DeferQ\Queue\QueueAdapterInterface; use DeferQ\Task\Task; class DatabaseQueueAdapter implements QueueAdapterInterface { public function __construct(private PDO $pdo) {} public function push(Task $task): void { $stmt = $this->pdo->prepare( 'INSERT INTO deferq_queue (payload, created_at) VALUES (?, NOW())' ); $stmt->execute([json_encode($task->toArray())]); } public function pop(int $timeoutSeconds = 5): ?Task { // Fetch and lock the oldest unprocessed row $stmt = $this->pdo->prepare( 'SELECT id, payload FROM deferq_queue WHERE processing = 0 ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED' ); $stmt->execute(); $row = $stmt->fetch(PDO::FETCH_ASSOC); if (!$row) { return null; } $this->pdo->prepare('UPDATE deferq_queue SET processing = 1 WHERE id = ?') ->execute([$row['id']]); return Task::fromArray(json_decode($row['payload'], true)); } public function ack(Task $task): void { // Delete processed row } public function nack(Task $task): void { // Reset processing flag } }
CLI Worker Bootstrap
Create a worker-bootstrap.php file that returns a configured Worker instance:
<?php // worker-bootstrap.php require __DIR__ . '/vendor/autoload.php'; use DeferQ\Handler\TaskHandlerRegistry; use DeferQ\Lock\CacheLock; use DeferQ\Queue\RedisQueueAdapter; use DeferQ\Result\CacheResultStore; use DeferQ\Store\CacheTaskStore; use DeferQ\Worker\Worker; use DeferQ\Worker\WorkerConfig; use Predis\Client; use Psr\Log\NullLogger; // Your PSR-16 cache implementation $redis = new Client('tcp://127.0.0.1:6379'); $cache = /* your PSR-16 cache backed by Redis */; // Register handlers $handlers = new TaskHandlerRegistry(); $handlers->register('report.generate', new ReportGenerateHandler()); $handlers->register('export.csv', new CsvExportHandler()); // CLI overrides from command-line arguments $overrides = $GLOBALS['deferq_cli_overrides'] ?? []; return new Worker( queue: new RedisQueueAdapter($redis), handlers: $handlers, taskStore: new CacheTaskStore($cache), resultStore: new CacheResultStore($cache), config: new WorkerConfig( sleepMs: $overrides['sleepMs'] ?? 1000, maxJobs: $overrides['maxJobs'] ?? 0, maxMemoryMb: $overrides['maxMemoryMb'] ?? 128, taskTimeoutSeconds: $overrides['taskTimeoutSeconds'] ?? 300, ), logger: new NullLogger(), );
Then run:
php bin/deferq-worker --bootstrap=worker-bootstrap.php --max-jobs=1000 --sleep=500
Signal Handling
The CLI worker handles SIGTERM and SIGINT for graceful shutdown. When a signal is received, the worker finishes processing the current task before exiting.
Author
DarkJest
License
MIT