kislayphp/queue

Native PHP distributed job queue with a standalone queue server, producer client, worker runtime, retries, and DLQ support

Maintainers

Package info

github.com/KislayPHP/queue

Documentation

Language:Shell

Type:php-ext

Ext name:ext-kislayphp_queue

pkg:composer/kislayphp/queue

Statistics

Installs: 7

Dependents: 0

Suggesters: 7

Stars: 0

Open Issues: 0

0.0.4 2026-03-15 19:29 UTC

This package is not auto-updated.

Last update: 2026-03-16 02:31:54 UTC


README

PHP Version License Build Status PIE

Native PHP distributed job queue for long-running services. Phase 1 ships a standalone queue server, producer client, worker client, retries, delayed jobs, and DLQ support.

Part of the KislayPHP ecosystem.

What It Is

kislayphp/queue now has two modes:

  • Kislay\Queue\Server for the standalone queue node
  • Kislay\Queue\Client for producers and operational reads
  • Kislay\Queue\Worker for consumers
  • Kislay\Queue\Job for ack/nack control inside a handler
  • Kislay\Queue\Queue as the legacy local in-process queue for development fallback

Delivery model in 0.0.4:

  • at-least-once delivery
  • one leased job per worker fetch
  • retries with backoff
  • delayed jobs
  • dead-letter queue support
  • in-memory server state only

Install

pie install kislayphp/queue:0.0.4

Enable in php.ini:

extension=kislayphp_queue.so

5-Minute Quickstart

1. Start the queue server

<?php
$server = new Kislay\Queue\Server();
$server->declare('emails', [
    'visibility_timeout_ms' => 30000,
    'max_attempts' => 5,
    'retry_backoff_ms' => 1000,
    'dead_letter_queue' => 'emails.dlq',
]);
$server->listen('0.0.0.0', 9020);
$server->run();

2. Push a job

<?php
$client = new Kislay\Queue\Client('http://127.0.0.1:9020');

$jobId = $client->push('emails', [
    'to' => 'user@example.com',
    'subject' => 'Welcome',
], [
    'headers' => ['trace_id' => 'trace-1'],
    'max_attempts' => 5,
]);

var_dump($jobId);

3. Run a worker

<?php
$worker = new Kislay\Queue\Worker('http://127.0.0.1:9020');

$worker->consume('emails', function (Kislay\Queue\Job $job) {
    $payload = $job->payload();
    $headers = $job->headers();

    sendEmail($payload['to'], $payload['subject']);

    return true;
}, [
    'worker_id' => 'emails-worker-1',
    'lease_ms' => 30000,
]);

Architecture

+------------------+        push         +----------------------+
| Producer Service | ------------------> | Kislay\Queue\Server |
| Client           |                     | owns queue state     |
+------------------+                     | leases jobs          |
                                         | retries / DLQ        |
+------------------+        fetch        +----------+-----------+
| Worker Service   | <------------------------------+
| Kislay\Queue\Worker |         ack / nack          |
+------------------+                                 |
                                                     v
                                            +------------------+
                                            | queue.dlq        |
                                            +------------------+

Public API

namespace Kislay\Queue;

class Server {
    public function __construct(array $options = []);
    public function listen(string $host, int $port): bool;
    public function run(): void;
    public function stop(): bool;
    public function declare(string $queue, ?array $options = null): bool;
    public function stats(?string $queue = null): array;
}

class Client {
    public function __construct(string $baseUrl, array $options = []);
    public function push(string $queue, mixed $payload, ?array $options = null): string;
    public function pushBatch(string $queue, array $jobs): array;
    public function stats(string $queue): array;
    public function purge(string $queue): int;
}

class Worker {
    public function __construct(string $baseUrl, array $options = []);
    public function consume(string $queue, callable $handler, ?array $options = null): bool;
    public function stop(): bool;
}

class Job {
    public function id(): string;
    public function queue(): string;
    public function payload(): mixed;
    public function headers(): array;
    public function attempts(): int;
    public function maxAttempts(): int;
    public function availableAt(): int;
    public function ack(): bool;
    public function nack(?bool $requeue = true, ?int $delayMs = null): bool;
    public function release(?int $delayMs = null): bool;
}

Legacy local queue API remains available:

$queue = new Kislay\Queue\Queue();
$queue->enqueue('jobs', ['task' => 'send_email']);
$job = $queue->dequeue('jobs');

Queue Semantics

0.0.4 semantics are intentionally explicit:

  • push() creates a job in ready or delayed
  • Worker::consume() fetches one leased job at a time
  • if the handler returns true, the worker acks the job
  • if the handler returns false, the worker nacks the job
  • if a handler throws, the worker nacks the job and stops with the exception still raised
  • if retries are exhausted, the job moves to the configured DLQ

Operational Notes

What 0.0.4 is good for:

  • background task processing between services
  • queue semantics for microservice workloads
  • development and early production validation
  • explicit retry / DLQ flows

What 0.0.4 does not do yet:

  • durable persistence across queue server restarts
  • worker concurrency inside one consume() loop
  • distributed replication
  • exactly-once delivery

If the queue server process stops, in-memory jobs are lost. That is the correct tradeoff for this phase; do not market it as durable yet.

End-to-End Example

See example.php for a minimal runnable server / producer / worker flow.

Ecosystem Fit

Use this module for job queue semantics.

Use eventbus for fanout and realtime pub/sub.

Typical flow:

HTTP request -> queue push -> worker processes job -> eventbus emits completion event

Docs

License

Apache License 2.0