phpdot/rabbitmq

RabbitMQ client for PHP: publish, consume, retry, dead letter, topology.

Maintainers

Package info

github.com/phpdot/rabbitmq

pkg:composer/phpdot/rabbitmq

Statistics

Installs: 4

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

v1.3.0 2026-05-03 10:25 UTC

This package is auto-updated.

Last update: 2026-05-03 10:26:19 UTC


README

RabbitMQ messaging for PHP: publish, consume, retry, dead letter.

Install

composer require phpdot/rabbitmq

Quick Start

use PHPdot\RabbitMQ\RabbitMQConnection;
use PHPdot\RabbitMQ\Config\RabbitMQConfig;
use PHPdot\RabbitMQ\Message;
use PHPdot\RabbitMQ\Enum\TaskStatus;

$config = new RabbitMQConfig(
    host: 'localhost',
    exchanges: [
        'tasks' => ['type' => 'direct', 'durable' => true],
    ],
    queues: [
        'tasks.process' => [
            'bindings' => [['exchange' => 'tasks', 'routing_key' => 'task.new']],
            'durable' => true,
        ],
    ],
);

$conn = new RabbitMQConnection($config);

// Publish
$conn->message('{"task":"send_email"}')
    ->publish('tasks', 'task.new');

// Consume
$conn->consume('tasks.process')
    ->execute(function (Message $msg): TaskStatus {
        processTask(json_decode($msg->body(), true));
        return TaskStatus::SUCCESS;
    });

Architecture

graph TD
    subgraph RabbitMQConnection
        direction TB
        CONN[Connect, reconnect, channel, topology]
        subgraph Publishers & Consumers
            direction LR
            PUB[Publisher<br/><br/>Fluent builder:<br/>compress, trace,<br/>priority, headers]
            CON[Consumer<br/><br/>basic_consume loop<br/>with retry/dead<br/>letter handling]
        end
        TOPO[Topology Manager<br/><br/>Declares exchanges, queues, bindings,<br/>retry queues — cached after first use]
        CONN --> PUB
        CONN --> CON
        PUB --> TOPO
        CON --> TOPO
    end
Loading

Publishing

// Simple
$conn->message('{"order_id": 123}')
    ->publish('orders', 'order.created');

// Full-featured
$conn->message(json_encode($data))
    ->retry(5)
    ->priority(8)
    ->compress()
    ->header(['traceparent' => $traceHeader])
    ->header(['x-source' => 'api-gateway'])
    ->publish('orders', 'order.created');

Auto-set properties:

Property Default
message_id UUIDv7
timestamp time()
app_id gethostname()
content_type Auto-detected (JSON or text)
delivery_mode 2 (persistent)

Consuming

$conn->consume('orders.process')
    ->prefetch(10)
    ->onRetry(function (Message $msg, int $count): void {
        echo "Retry #{$count}: {$msg->messageId()}\n";
    })
    ->onDead(function (Message $msg, string $reason): void {
        echo "Dead: {$msg->messageId()}{$reason}\n";
    })
    ->execute(function (Message $msg): TaskStatus {
        $data = json_decode($msg->body(), true);

        if ($data === null) {
            return TaskStatus::DEAD;     // malformed, don't retry
        }

        try {
            processOrder($data);
            return TaskStatus::SUCCESS;  // done
        } catch (TemporaryException $e) {
            return TaskStatus::RETRY;    // try again
        } catch (PermanentException $e) {
            return TaskStatus::DEAD;     // give up
        }
    });

Three return values. No ambiguity:

  • SUCCESS — ack, done
  • RETRY — nack to retry queue, try again later
  • DEAD — ack + forward to dead letter exchange

Unhandled exceptions are caught and dead-lettered automatically. The consumer never crashes.

Retry & Dead Letter

Retry Flow

graph TD
    A[orders.queue] -->|nack| B[orders.queue.retry.exchange]
    B --> C["orders.queue.retry<br/>(TTL queue, e.g. 500ms)"]
    C -->|TTL expires| A
Loading

Enable in config:

'queues' => [
    'orders.process' => [
        'bindings' => [['exchange' => 'orders', 'routing_key' => 'order.created']],
        'retry' => ['enable' => true, 'delay_ms' => 500],
        'dead' => 'dead-letters',
        'durable' => true,
    ],
],

Retry infrastructure (exchange, TTL queue, bindings) is created automatically on first use.

Dead Letter

When max retries exceeded or TaskStatus::DEAD returned, the message is forwarded to the dead letter exchange with:

  • x-failed-queue — original queue name
  • x-failed-reason — failure description
  • x-failed-timestamp — Unix timestamp

Replay

After fixing a bug, replay dead-lettered messages back to their original queue. Same callback pattern as consuming — return an enum, the library handles the rest.

use PHPdot\RabbitMQ\Enum\ReplayAction;

$result = $conn->replay('orders.dead')
    ->limit(100)
    ->execute(function (Message $msg): ReplayAction {
        echo "[{$msg->messageId()}] {$msg->failedReason()}\n";

        // Bad payload — clean up DB and discard permanently
        if ($msg->failedReason() === 'Invalid payload') {
            Order::where('message_id', $msg->messageId())->delete();
            return ReplayAction::REMOVE;
        }

        // Timeout errors — bug is fixed, send it back
        if (str_contains($msg->failedReason(), 'timeout')) {
            return ReplayAction::REPLAY;
        }

        // Unknown — leave in DLQ for investigation
        return ReplayAction::SKIP;
    });

echo "Replayed: {$result->replayed}, Removed: {$result->removed}, Skipped: {$result->skipped}\n";

Three actions per message:

  • REPLAY — ack + republish to original exchange with original message ID. Dead-letter metadata stripped, retry counter reset.
  • REMOVE — ack + discard permanently. Use the callback to clean up related data before it's gone.
  • SKIP — nack with requeue. Message stays in DLQ for later.

Uses basic_get (pull mode) — processes available messages and stops. No infinite loop.

Compression

// Publish compressed
$conn->message($largePayload)
    ->compress()
    ->publish('data', 'import.batch');

// Consume — transparent decompression
$conn->consume('data.process')
    ->execute(function (Message $msg): TaskStatus {
        $body = $msg->body();  // already decompressed
        return TaskStatus::SUCCESS;
    });

Trace Propagation

Pass trace context as plain string headers. No coupling to any tracing library.

// Publish with trace
$conn->message($payload)
    ->header(['traceparent' => $tracelog->getTraceparent()->toHeader()])
    ->publish('orders', 'order.created');

// Consume with trace
$conn->consume('orders.process')
    ->execute(function (Message $msg): TaskStatus {
        $traceparent = $msg->header('traceparent');  // '' if missing
        // Reconnect trace in your framework layer
        return TaskStatus::SUCCESS;
    });

Configuration

$config = new RabbitMQConfig(
    host: 'rabbitmq.internal',
    port: 5672,
    username: 'app',
    password: 'secret',
    vhost: '/',
    timeoutMs: 30000,
    maxRetries: 3,
    retryDelayMs: 1000,
    exchanges: [
        'orders' => ['type' => 'direct', 'durable' => true],
        'notifications' => ['type' => 'fanout', 'durable' => true],
        'dead' => ['type' => 'direct', 'durable' => true],
    ],
    queues: [
        'orders.process' => [
            'bindings' => [
                ['exchange' => 'orders', 'routing_key' => 'order.created'],
                ['exchange' => 'orders', 'routing_key' => 'order.updated'],
            ],
            'retry' => ['enable' => true, 'delay_ms' => 500],
            'dead' => 'dead',
            'durable' => true,
        ],
    ],
);

RabbitMQConnection Resilience

Auto-reconnect with exponential backoff:

Attempt 1: wait 1s
Attempt 2: wait 2s
Attempt 3: wait 4s
→ ConnectionException if all fail

Message API

$msg->body();              // message content (decompressed)
$msg->messageId();         // UUIDv7 message ID
$msg->queue();             // queue name
$msg->header('key');       // header value or ''
$msg->headers();           // all headers as array
$msg->maxRetries();        // x-retries-max value
$msg->priority();          // 0-10
$msg->originalExchange();  // x-original-exchange
$msg->originalRoutingKey();// x-original-routing-key
$msg->failedReason();      // x-failed-reason (on dead letters)

CLI Commands

When installed in a phpdot app (with phpdot/console), these commands auto-discover and become available under your CLI entry point:

rabbitmq:status              Show broker connectivity and topology drift
rabbitmq:queues              Show queue depths and consumer counts
rabbitmq:topology:declare    Declare exchanges, queues, and bindings from config
rabbitmq:peek                Inspect messages without consuming them
rabbitmq:replay              Replay messages from a dead-letter queue
rabbitmq:dlq:analyze         Group dead-letter messages by reason and origin

rabbitmq:status

Connection check + topology drift detection — confirms the broker is up, credentials work, and every exchange/queue defined in config/rabbitmq.php is actually declared.

$ php bin/console rabbitmq:status

RabbitMQ Status

  Host           localhost:5672
  Vhost          /
  User           app
  Connection     ✓ ok (8.2 ms)

Topology in config/rabbitmq.php:

  ✓ exchange  events                          (direct, durable)
  ✓ queue     orders                          (durable)
  ✗ queue     payments                        NOT declared on broker

1 resource(s) missing — run `rabbitmq:topology:declare` to fix.

rabbitmq:queues

Queue depths and consumer counts — basic observability without opening the management UI. Supports --filter=substring and --watch (refresh every second).

$ php bin/console rabbitmq:queues

╔══════════════════╦══════════╦═══════════╗
║ Queue            ║ Messages ║ Consumers ║
╠══════════════════╬══════════╬═══════════╣
║ orders           ║      127 ║         3 ║
║ orders.dlq       ║       12 ║         0 ║
║ payments         ║       45 ║         2 ║
╚══════════════════╩══════════╩═══════════╝

3 queues, 184 messages, 5 consumers

rabbitmq:topology:declare

Applies the exchanges/queues/bindings from config/rabbitmq.php to the broker. Idempotent — existing matching resources are skipped. Supports --dry-run (preview) and --force (drop+recreate mismatched queues, DESTRUCTIVE — confirms before dropping unless --no-interaction).

rabbitmq:peek <queue> [--limit=5]

Inspects messages in a queue without consuming them. Each peeked message is returned to the queue via basic_reject with requeue.

rabbitmq:replay <queue> [--limit=10] [--dry-run]

Wraps the Replayer class — requeues dead-letter messages back to their original exchange. --dry-run previews what would replay without acking.

rabbitmq:dlq:analyze <queue> [--limit=500]

Samples a dead-letter queue and groups messages by death reason and original routing-key — one-screen incident triage.

Analysis of orders.dlq (12 messages sampled)

Reason for dead-lettering:
  expired                   7  ████████████░░░░░░░░  58%
  rejected                  4  ██████░░░░░░░░░░░░░░  33%
  maxlen                    1  ██░░░░░░░░░░░░░░░░░░   9%

By original routing-key:
  order.created             9  ███████████████░░░░░  75%
  order.updated             3  █████░░░░░░░░░░░░░░░  25%

Package Structure

src/
├── RabbitMQConnection.php   Main entry point
├── RabbitMQConnector.php    phpdot/pool ConnectorInterface adapter
├── Publisher.php            Fluent message builder
├── Consumer.php             Message consumer with retry/dead letter
├── Replayer.php             Dead letter queue replay
├── Message.php              Immutable inbound message DTO
├── Config/
│   └── RabbitMQConfig.php   Connection and topology configuration
├── Enum/
│   ├── TaskStatus.php       SUCCESS, RETRY, DEAD
│   └── ReplayAction.php     REPLAY, REMOVE, SKIP
├── Result/
│   └── ReplayResult.php     Replay outcome counts
├── Topology/
│   └── TopologyManager.php  Exchange/queue/binding declaration
├── Cli/Command/
│   ├── StatusCommand.php
│   ├── QueuesCommand.php
│   ├── TopologyDeclareCommand.php
│   ├── PeekCommand.php
│   ├── ReplayCommand.php
│   └── DlqAnalyzeCommand.php
└── Exception/
    ├── RabbitMQException.php   Base exception
    ├── ConnectionException.php
    ├── PublishException.php
    └── ConsumeException.php

Development

composer test        # PHPUnit (unit tests only)
composer test-all    # PHPUnit (including integration, needs RabbitMQ)
composer analyse     # PHPStan level 10
composer cs-fix      # PHP-CS-Fixer
composer check       # All three (unit + analyse + cs-check)

License

MIT