not-empty / omniq
OmniQ - Redis+Lua queue PHP SDK
Requires
- php: ^8.3
- ext-pcntl: *
- ext-redis: *
- not-empty/ulid-php-lib: ^7.0
This package is auto-updated.
Last update: 2026-05-03 13:59:54 UTC
README
PHP SDK for OmniQ, the Redis + Lua queue with deterministic behavior across languages.
This SDK follows the OmniQ core contract and uses the same Lua scripts as the reference implementations.
Status
Current implementation covers:
- publish / publishJson
- reserve
- heartbeat
- ackSuccess / ackFail
- promoteDelayed / reapExpired
- pause / resume / isPaused
- retryFailed / retryFailedBatch
- removeJob / removeJobsBatch
- childsInit / childAck
- consume
The consumer path has smoke coverage for:
- success flow
- failure flow
- heartbeat on long-running jobs
- lost lease handling
- graceful drain on
SIGTERM - fast exit on
SIGINTwithdrain=false - pause / resume
- grouped jobs and group limits
- child workflow primitives
Requirements
- PHP 8.3+
ext-redisext-pcntl- Redis or Valkey with Lua enabled
This SDK uses phpredis as the transport.
When using host/port, the transport attempts RedisCluster first and falls back to plain Redis when the target is not cluster-enabled, mirroring the Python SDK behavior.
Installation
composer require not-empty/omniq
Development Environment
This repository includes a local Docker setup:
docker compose up -d
docker compose exec omniq-php sh
composer install
Services:
omniq-redisomniq-valkeyomniq-php
Quick Start
Publish
<?php declare(strict_types=1); use Omniq\Client; require 'vendor/autoload.php'; $omniq = new Client( host: 'omniq-redis', port: 6379, ); $jobId = $omniq->publish( queue: 'demo', payload: ['hello' => 'world'], timeoutMs: 30000, ); echo $jobId . PHP_EOL;
Publish Structured Payload
<?php declare(strict_types=1); use Omniq\Client; require 'vendor/autoload.php'; final readonly class OrderCreated implements JsonSerializable { public function __construct( public string $orderId, public int $amount, public string $currency, ) { } public function jsonSerialize(): array { return [ 'order_id' => $this->orderId, 'amount' => $this->amount, 'currency' => $this->currency, ]; } } $omniq = new Client( host: 'omniq-redis', port: 6379, ); $jobId = $omniq->publishJson( queue: 'orders', payload: new OrderCreated( orderId: 'ORD-1', amount: 1000, currency: 'USD', ), maxAttempts: 5, timeoutMs: 60000, );
Consume
<?php declare(strict_types=1); use Omniq\Client; use Omniq\JobCtx; require 'vendor/autoload.php'; $omniq = new Client( host: 'omniq-redis', port: 6379, ); $omniq->consume( queue: 'demo', handler: static function (JobCtx $ctx): void { echo 'processing=' . $ctx->jobId . PHP_EOL; sleep(2); echo 'done=' . $ctx->jobId . PHP_EOL; }, verbose: true, drain: false, );
Consumer Notes
- The consumer is designed for CLI workers
- Heartbeat support requires
ext-pcntl - In the basic CLI
pcntlruntime, the consumer uses 2 persistent Redis connections per worker - One connection is used by the main consumer loop and one by the heartbeat helper process
- This is a PHP runtime tradeoff for safe heartbeat behavior in the plain CLI worker model
drain=truefinishes the current job and exits cleanly on stop signalsdrain=falseexits immediately onSIGINT
Handler Context
Inside handler(JobCtx $ctx):
$ctx->queue$ctx->jobId$ctx->payloadRaw$ctx->payload$ctx->attempt$ctx->maxAttempts$ctx->lockUntilMs$ctx->leaseToken$ctx->gid$ctx->exec
Example:
$omniq->consume( queue: 'demo', handler: static function (JobCtx $ctx): void { $isLastAttempt = $ctx->attempt >= $ctx->maxAttempts; var_dump($isLastAttempt); }, );
Queue Discovery
Use scanQueues() for queue discovery. It scans Redis for *:stats keys and is intended
for admin/bootstrap flows rather than hot-path UI polling.
<?php declare(strict_types=1); use Omniq\Client; use Omniq\QueueMonitor; require 'vendor/autoload.php'; $omniq = new Client( host: 'omniq-redis', port: 6379, ); $monitor = new QueueMonitor($omniq); $queues = $monitor->scanQueues(); var_dump($queues);
Queue names are validated in the SDK and may contain only letters, numbers, ., _, and -.
Administrative Operations
Retry Failed
$omniq->retryFailed('demo', '01ABC...');
Retry Failed Batch
$results = $omniq->retryFailedBatch('demo', ['01A...', '01B...']);
Remove Job
$omniq->removeJob('demo', '01ABC...', 'failed');
Remove Jobs Batch
$results = $omniq->removeJobsBatch('demo', 'failed', ['01A...', '01B...']);
Pause / Resume / IsPaused
$omniq->pause('demo'); $paused = $omniq->isPaused('demo'); $omniq->resume('demo');
Pause is flag-only and blocks only new reserves.
Grouped Jobs
$omniq->publish( queue: 'demo', payload: ['i' => 1], gid: 'company:acme', groupLimit: 1, ); $omniq->publish( queue: 'demo', payload: ['i' => 2], );
Behavior:
- FIFO inside the same group
- group concurrency limited by
groupLimit - other groups can still make progress in parallel
Child Workflows
The execution helper available on JobCtx exposes:
$ctx->exec->publish(...)$ctx->exec->pause(...)$ctx->exec->resume(...)$ctx->exec->isPaused(...)$ctx->exec->childsInit(...)$ctx->exec->childAck(...)
Parent Example
$omniq->consume( queue: 'documents', handler: static function (JobCtx $ctx): void { if (is_array($ctx->payload)) { $payload = $ctx->payload; } else { $payload = []; } $completionKey = (string) $payload['document_id']; $pages = (array) $payload['pages']; $ctx->exec->childsInit($completionKey, count($pages)); foreach ($pages as $page) { $ctx->exec->publish( queue: 'pages', payload: [ 'completion_key' => $completionKey, 'page' => $page, ], ); } }, );
Child Example
$omniq->consume( queue: 'pages', handler: static function (JobCtx $ctx): void { if (is_array($ctx->payload)) { $payload = $ctx->payload; } else { $payload = []; } $completionKey = (string) $payload['completion_key']; $remaining = $ctx->exec->childAck($completionKey); echo 'remaining=' . $remaining . PHP_EOL; }, );
Properties:
- idempotent decrement per child id
- safe under retries
- counter is deleted when it reaches zero
Examples
The examples/ directory mirrors the Python example matrix and is intended for both documentation and manual testing:
examples/simple/examples/max_attempts/examples/pause_resume/examples/childs/
Notes
- Payloads must be structured JSON
- The Lua scripts are vendored in
src/core/scripts - The PHP SDK is intentionally thin over Redis + Lua semantics
- The current worker runtime target is CLI PHP, not FrankenPHP yet
- Contract regression coverage lives in
omniq-core/validation