phpdot/queue

RabbitMQ messaging for PHP: publish, consume, retry, dead letter.

Maintainers

Package info

github.com/phpdot/queue

pkg:composer/phpdot/queue

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

v1.0.0 2026-04-03 13:27 UTC

This package is auto-updated.

Last update: 2026-04-03 13:46:34 UTC


README

RabbitMQ messaging for PHP: publish, consume, retry, dead letter.

Install

composer require phpdot/queue

Quick Start

use PHPdot\Queue\Connection;
use PHPdot\Queue\Config\ConnectionConfig;
use PHPdot\Queue\Message;
use PHPdot\Queue\Enum\TaskStatus;

$config = new ConnectionConfig(
    host: 'localhost',
    exchanges: [
        'tasks' => ['type' => 'direct', 'durable' => true],
    ],
    queues: [
        'tasks.process' => [
            'bindings' => [['exchange' => 'tasks', 'routing_key' => 'task.new']],
            'durable' => true,
        ],
    ],
);

$conn = new Connection($config);

// Publish
$conn->message('{"task":"send_email"}')
    ->publish('tasks', 'task.new');

// Consume
$conn->consume('tasks.process')
    ->execute(function (Message $msg): TaskStatus {
        processTask(json_decode($msg->body(), true));
        return TaskStatus::SUCCESS;
    });

Architecture

graph TD
    subgraph Connection
        direction TB
        CONN[Connect, reconnect, channel, topology]
        subgraph Publishers & Consumers
            direction LR
            PUB[Publisher<br/><br/>Fluent builder:<br/>compress, trace,<br/>priority, headers]
            CON[Consumer<br/><br/>basic_consume loop<br/>with retry/dead<br/>letter handling]
        end
        TOPO[Topology Manager<br/><br/>Declares exchanges, queues, bindings,<br/>retry queues — cached after first use]
        CONN --> PUB
        CONN --> CON
        PUB --> TOPO
        CON --> TOPO
    end
Loading

Publishing

// Simple
$conn->message('{"order_id": 123}')
    ->publish('orders', 'order.created');

// Full-featured
$conn->message(json_encode($data))
    ->retry(5)
    ->priority(8)
    ->compress()
    ->header(['traceparent' => $traceHeader])
    ->header(['x-source' => 'api-gateway'])
    ->publish('orders', 'order.created');

Auto-set properties:

Property Default
message_id UUIDv7
timestamp time()
app_id gethostname()
content_type Auto-detected (JSON or text)
delivery_mode 2 (persistent)

Consuming

$conn->consume('orders.process')
    ->prefetch(10)
    ->onRetry(function (Message $msg, int $count): void {
        echo "Retry #{$count}: {$msg->messageId()}\n";
    })
    ->onDead(function (Message $msg, string $reason): void {
        echo "Dead: {$msg->messageId()}{$reason}\n";
    })
    ->execute(function (Message $msg): TaskStatus {
        $data = json_decode($msg->body(), true);

        if ($data === null) {
            return TaskStatus::DEAD;     // malformed, don't retry
        }

        try {
            processOrder($data);
            return TaskStatus::SUCCESS;  // done
        } catch (TemporaryException $e) {
            return TaskStatus::RETRY;    // try again
        } catch (PermanentException $e) {
            return TaskStatus::DEAD;     // give up
        }
    });

Three return values. No ambiguity:

  • SUCCESS — ack, done
  • RETRY — nack to retry queue, try again later
  • DEAD — ack + forward to dead letter exchange

Unhandled exceptions are caught and dead-lettered automatically. The consumer never crashes.

Retry & Dead Letter

Retry Flow

graph TD
    A[orders.queue] -->|nack| B[orders.queue.retry.exchange]
    B --> C["orders.queue.retry<br/>(TTL queue, e.g. 500ms)"]
    C -->|TTL expires| A
Loading

Enable in config:

'queues' => [
    'orders.process' => [
        'bindings' => [['exchange' => 'orders', 'routing_key' => 'order.created']],
        'retry' => ['enable' => true, 'delay_ms' => 500],
        'dead' => 'dead-letters',
        'durable' => true,
    ],
],

Retry infrastructure (exchange, TTL queue, bindings) is created automatically on first use.

Dead Letter

When max retries exceeded or TaskStatus::DEAD returned, the message is forwarded to the dead letter exchange with:

  • x-failed-queue — original queue name
  • x-failed-reason — failure description
  • x-failed-timestamp — Unix timestamp

Replay

After fixing a bug, replay dead-lettered messages back to their original queue. Same callback pattern as consuming — return an enum, the library handles the rest.

use PHPdot\Queue\Enum\ReplayAction;

$result = $conn->replay('orders.dead')
    ->limit(100)
    ->execute(function (Message $msg): ReplayAction {
        echo "[{$msg->messageId()}] {$msg->failedReason()}\n";

        // Bad payload — clean up DB and discard permanently
        if ($msg->failedReason() === 'Invalid payload') {
            Order::where('message_id', $msg->messageId())->delete();
            return ReplayAction::REMOVE;
        }

        // Timeout errors — bug is fixed, send it back
        if (str_contains($msg->failedReason(), 'timeout')) {
            return ReplayAction::REPLAY;
        }

        // Unknown — leave in DLQ for investigation
        return ReplayAction::SKIP;
    });

echo "Replayed: {$result->replayed}, Removed: {$result->removed}, Skipped: {$result->skipped}\n";

Three actions per message:

  • REPLAY — ack + republish to original exchange with original message ID. Dead-letter metadata stripped, retry counter reset.
  • REMOVE — ack + discard permanently. Use the callback to clean up related data before it's gone.
  • SKIP — nack with requeue. Message stays in DLQ for later.

Uses basic_get (pull mode) — processes available messages and stops. No infinite loop.

Compression

// Publish compressed
$conn->message($largePayload)
    ->compress()
    ->publish('data', 'import.batch');

// Consume — transparent decompression
$conn->consume('data.process')
    ->execute(function (Message $msg): TaskStatus {
        $body = $msg->body();  // already decompressed
        return TaskStatus::SUCCESS;
    });

Trace Propagation

Pass trace context as plain string headers. No coupling to any tracing library.

// Publish with trace
$conn->message($payload)
    ->header(['traceparent' => $tracelog->getTraceparent()->toHeader()])
    ->publish('orders', 'order.created');

// Consume with trace
$conn->consume('orders.process')
    ->execute(function (Message $msg): TaskStatus {
        $traceparent = $msg->header('traceparent');  // '' if missing
        // Reconnect trace in your framework layer
        return TaskStatus::SUCCESS;
    });

Configuration

$config = new ConnectionConfig(
    host: 'rabbitmq.internal',
    port: 5672,
    username: 'app',
    password: 'secret',
    vhost: '/',
    timeoutMs: 30000,
    maxRetries: 3,
    retryDelayMs: 1000,
    exchanges: [
        'orders' => ['type' => 'direct', 'durable' => true],
        'notifications' => ['type' => 'fanout', 'durable' => true],
        'dead' => ['type' => 'direct', 'durable' => true],
    ],
    queues: [
        'orders.process' => [
            'bindings' => [
                ['exchange' => 'orders', 'routing_key' => 'order.created'],
                ['exchange' => 'orders', 'routing_key' => 'order.updated'],
            ],
            'retry' => ['enable' => true, 'delay_ms' => 500],
            'dead' => 'dead',
            'durable' => true,
        ],
    ],
);

Connection Resilience

Auto-reconnect with exponential backoff:

Attempt 1: wait 1s
Attempt 2: wait 2s
Attempt 3: wait 4s
→ ConnectionException if all fail

Message API

$msg->body();              // message content (decompressed)
$msg->messageId();         // UUIDv7 message ID
$msg->queue();             // queue name
$msg->header('key');       // header value or ''
$msg->headers();           // all headers as array
$msg->maxRetries();        // x-retries-max value
$msg->priority();          // 0-10
$msg->originalExchange();  // x-original-exchange
$msg->originalRoutingKey();// x-original-routing-key
$msg->failedReason();      // x-failed-reason (on dead letters)

Package Structure

src/
├── Connection.php          Main entry point
├── Publisher.php            Fluent message builder
├── Consumer.php             Message consumer with retry/dead letter
├── Replayer.php             Dead letter queue replay
├── Message.php              Immutable inbound message DTO
├── Config/
│   └── ConnectionConfig.php Connection and topology configuration
├── Enum/
│   ├── TaskStatus.php       SUCCESS, RETRY, DEAD
│   └── ReplayAction.php     REPLAY, REMOVE, SKIP
├── Result/
│   └── ReplayResult.php     Replay outcome counts
├── Topology/
│   └── TopologyManager.php  Exchange/queue/binding declaration
└── Exception/
    ├── QueueException.php   Base exception
    ├── ConnectionException.php
    ├── PublishException.php
    └── ConsumeException.php

Development

composer test        # PHPUnit (unit tests only)
composer test-all    # PHPUnit (including integration, needs RabbitMQ)
composer analyse     # PHPStan level 10
composer cs-fix      # PHP-CS-Fixer
composer check       # All three (unit + analyse + cs-check)

License

MIT