not-empty/omniq

OmniQ - Redis+Lua queue PHP SDK

Maintainers

Package info

github.com/not-empty/omniq-php

pkg:composer/not-empty/omniq

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

4.0.0 2026-05-03 13:56 UTC

This package is auto-updated.

Last update: 2026-05-03 13:59:54 UTC


README

PHP SDK for OmniQ, the Redis + Lua queue with deterministic behavior across languages.

This SDK follows the OmniQ core contract and uses the same Lua scripts as the reference implementations.

Status

Current implementation covers:

  • publish / publishJson
  • reserve
  • heartbeat
  • ackSuccess / ackFail
  • promoteDelayed / reapExpired
  • pause / resume / isPaused
  • retryFailed / retryFailedBatch
  • removeJob / removeJobsBatch
  • childsInit / childAck
  • consume

The consumer path has smoke coverage for:

  • success flow
  • failure flow
  • heartbeat on long-running jobs
  • lost lease handling
  • graceful drain on SIGTERM
  • fast exit on SIGINT with drain=false
  • pause / resume
  • grouped jobs and group limits
  • child workflow primitives

Requirements

  • PHP 8.3+
  • ext-redis
  • ext-pcntl
  • Redis or Valkey with Lua enabled

This SDK uses phpredis as the transport. When using host/port, the transport attempts RedisCluster first and falls back to plain Redis when the target is not cluster-enabled, mirroring the Python SDK behavior.

Installation

composer require not-empty/omniq

Development Environment

This repository includes a local Docker setup:

docker compose up -d
docker compose exec omniq-php sh
composer install

Services:

  • omniq-redis
  • omniq-valkey
  • omniq-php

Quick Start

Publish

<?php

declare(strict_types=1);

use Omniq\Client;

require 'vendor/autoload.php';

$omniq = new Client(
    host: 'omniq-redis',
    port: 6379,
);

$jobId = $omniq->publish(
    queue: 'demo',
    payload: ['hello' => 'world'],
    timeoutMs: 30000,
);

echo $jobId . PHP_EOL;

Publish Structured Payload

<?php

declare(strict_types=1);

use Omniq\Client;

require 'vendor/autoload.php';

final readonly class OrderCreated implements JsonSerializable
{
    public function __construct(
        public string $orderId,
        public int $amount,
        public string $currency,
    ) {
    }

    public function jsonSerialize(): array
    {
        return [
            'order_id' => $this->orderId,
            'amount' => $this->amount,
            'currency' => $this->currency,
        ];
    }
}

$omniq = new Client(
    host: 'omniq-redis',
    port: 6379,
);

$jobId = $omniq->publishJson(
    queue: 'orders',
    payload: new OrderCreated(
        orderId: 'ORD-1',
        amount: 1000,
        currency: 'USD',
    ),
    maxAttempts: 5,
    timeoutMs: 60000,
);

Consume

<?php

declare(strict_types=1);

use Omniq\Client;
use Omniq\JobCtx;

require 'vendor/autoload.php';

$omniq = new Client(
    host: 'omniq-redis',
    port: 6379,
);

$omniq->consume(
    queue: 'demo',
    handler: static function (JobCtx $ctx): void {
        echo 'processing=' . $ctx->jobId . PHP_EOL;
        sleep(2);
        echo 'done=' . $ctx->jobId . PHP_EOL;
    },
    verbose: true,
    drain: false,
);

Consumer Notes

  • The consumer is designed for CLI workers
  • Heartbeat support requires ext-pcntl
  • In the basic CLI pcntl runtime, the consumer uses 2 persistent Redis connections per worker
  • One connection is used by the main consumer loop and one by the heartbeat helper process
  • This is a PHP runtime tradeoff for safe heartbeat behavior in the plain CLI worker model
  • drain=true finishes the current job and exits cleanly on stop signals
  • drain=false exits immediately on SIGINT

Handler Context

Inside handler(JobCtx $ctx):

  • $ctx->queue
  • $ctx->jobId
  • $ctx->payloadRaw
  • $ctx->payload
  • $ctx->attempt
  • $ctx->maxAttempts
  • $ctx->lockUntilMs
  • $ctx->leaseToken
  • $ctx->gid
  • $ctx->exec

Example:

$omniq->consume(
    queue: 'demo',
    handler: static function (JobCtx $ctx): void {
        $isLastAttempt = $ctx->attempt >= $ctx->maxAttempts;
        var_dump($isLastAttempt);
    },
);

Queue Discovery

Use scanQueues() for queue discovery. It scans Redis for *:stats keys and is intended for admin/bootstrap flows rather than hot-path UI polling.

<?php

declare(strict_types=1);

use Omniq\Client;
use Omniq\QueueMonitor;

require 'vendor/autoload.php';

$omniq = new Client(
    host: 'omniq-redis',
    port: 6379,
);

$monitor = new QueueMonitor($omniq);
$queues = $monitor->scanQueues();

var_dump($queues);

Queue names are validated in the SDK and may contain only letters, numbers, ., _, and -.

Administrative Operations

Retry Failed

$omniq->retryFailed('demo', '01ABC...');

Retry Failed Batch

$results = $omniq->retryFailedBatch('demo', ['01A...', '01B...']);

Remove Job

$omniq->removeJob('demo', '01ABC...', 'failed');

Remove Jobs Batch

$results = $omniq->removeJobsBatch('demo', 'failed', ['01A...', '01B...']);

Pause / Resume / IsPaused

$omniq->pause('demo');
$paused = $omniq->isPaused('demo');
$omniq->resume('demo');

Pause is flag-only and blocks only new reserves.

Grouped Jobs

$omniq->publish(
    queue: 'demo',
    payload: ['i' => 1],
    gid: 'company:acme',
    groupLimit: 1,
);

$omniq->publish(
    queue: 'demo',
    payload: ['i' => 2],
);

Behavior:

  • FIFO inside the same group
  • group concurrency limited by groupLimit
  • other groups can still make progress in parallel

Child Workflows

The execution helper available on JobCtx exposes:

  • $ctx->exec->publish(...)
  • $ctx->exec->pause(...)
  • $ctx->exec->resume(...)
  • $ctx->exec->isPaused(...)
  • $ctx->exec->childsInit(...)
  • $ctx->exec->childAck(...)

Parent Example

$omniq->consume(
    queue: 'documents',
    handler: static function (JobCtx $ctx): void {
        if (is_array($ctx->payload)) {
            $payload = $ctx->payload;
        } else {
            $payload = [];
        }
        $completionKey = (string) $payload['document_id'];
        $pages = (array) $payload['pages'];

        $ctx->exec->childsInit($completionKey, count($pages));

        foreach ($pages as $page) {
            $ctx->exec->publish(
                queue: 'pages',
                payload: [
                    'completion_key' => $completionKey,
                    'page' => $page,
                ],
            );
        }
    },
);

Child Example

$omniq->consume(
    queue: 'pages',
    handler: static function (JobCtx $ctx): void {
        if (is_array($ctx->payload)) {
            $payload = $ctx->payload;
        } else {
            $payload = [];
        }
        $completionKey = (string) $payload['completion_key'];

        $remaining = $ctx->exec->childAck($completionKey);

        echo 'remaining=' . $remaining . PHP_EOL;
    },
);

Properties:

  • idempotent decrement per child id
  • safe under retries
  • counter is deleted when it reaches zero

Examples

The examples/ directory mirrors the Python example matrix and is intended for both documentation and manual testing:

  • examples/simple/
  • examples/max_attempts/
  • examples/pause_resume/
  • examples/childs/

Notes

  • Payloads must be structured JSON
  • The Lua scripts are vendored in src/core/scripts
  • The PHP SDK is intentionally thin over Redis + Lua semantics
  • The current worker runtime target is CLI PHP, not FrankenPHP yet
  • Contract regression coverage lives in omniq-core/validation