phpdot / queue
RabbitMQ messaging for PHP: publish, consume, retry, dead letter.
Requires
- php: >=8.3
- php-amqplib/php-amqplib: ^3.0
- psr/log: ^3.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.94
- phpstan/phpstan: ^2.0
- phpstan/phpstan-strict-rules: ^2.0
- phpunit/phpunit: ^11.0
README
RabbitMQ messaging for PHP: publish, consume, retry, dead letter.
Install
composer require phpdot/queue
Quick Start
use PHPdot\Queue\Connection; use PHPdot\Queue\Config\ConnectionConfig; use PHPdot\Queue\Message; use PHPdot\Queue\Enum\TaskStatus; $config = new ConnectionConfig( host: 'localhost', exchanges: [ 'tasks' => ['type' => 'direct', 'durable' => true], ], queues: [ 'tasks.process' => [ 'bindings' => [['exchange' => 'tasks', 'routing_key' => 'task.new']], 'durable' => true, ], ], ); $conn = new Connection($config); // Publish $conn->message('{"task":"send_email"}') ->publish('tasks', 'task.new'); // Consume $conn->consume('tasks.process') ->execute(function (Message $msg): TaskStatus { processTask(json_decode($msg->body(), true)); return TaskStatus::SUCCESS; });
Architecture
graph TD
subgraph Connection
direction TB
CONN[Connect, reconnect, channel, topology]
subgraph Publishers & Consumers
direction LR
PUB[Publisher<br/><br/>Fluent builder:<br/>compress, trace,<br/>priority, headers]
CON[Consumer<br/><br/>basic_consume loop<br/>with retry/dead<br/>letter handling]
end
TOPO[Topology Manager<br/><br/>Declares exchanges, queues, bindings,<br/>retry queues — cached after first use]
CONN --> PUB
CONN --> CON
PUB --> TOPO
CON --> TOPO
end
Loading
Publishing
// Simple $conn->message('{"order_id": 123}') ->publish('orders', 'order.created'); // Full-featured $conn->message(json_encode($data)) ->retry(5) ->priority(8) ->compress() ->header(['traceparent' => $traceHeader]) ->header(['x-source' => 'api-gateway']) ->publish('orders', 'order.created');
Auto-set properties:
| Property | Default |
|---|---|
message_id |
UUIDv7 |
timestamp |
time() |
app_id |
gethostname() |
content_type |
Auto-detected (JSON or text) |
delivery_mode |
2 (persistent) |
Consuming
$conn->consume('orders.process') ->prefetch(10) ->onRetry(function (Message $msg, int $count): void { echo "Retry #{$count}: {$msg->messageId()}\n"; }) ->onDead(function (Message $msg, string $reason): void { echo "Dead: {$msg->messageId()} — {$reason}\n"; }) ->execute(function (Message $msg): TaskStatus { $data = json_decode($msg->body(), true); if ($data === null) { return TaskStatus::DEAD; // malformed, don't retry } try { processOrder($data); return TaskStatus::SUCCESS; // done } catch (TemporaryException $e) { return TaskStatus::RETRY; // try again } catch (PermanentException $e) { return TaskStatus::DEAD; // give up } });
Three return values. No ambiguity:
- SUCCESS — ack, done
- RETRY — nack to retry queue, try again later
- DEAD — ack + forward to dead letter exchange
Unhandled exceptions are caught and dead-lettered automatically. The consumer never crashes.
Retry & Dead Letter
Retry Flow
graph TD
A[orders.queue] -->|nack| B[orders.queue.retry.exchange]
B --> C["orders.queue.retry<br/>(TTL queue, e.g. 500ms)"]
C -->|TTL expires| A
Loading
Enable in config:
'queues' => [ 'orders.process' => [ 'bindings' => [['exchange' => 'orders', 'routing_key' => 'order.created']], 'retry' => ['enable' => true, 'delay_ms' => 500], 'dead' => 'dead-letters', 'durable' => true, ], ],
Retry infrastructure (exchange, TTL queue, bindings) is created automatically on first use.
Dead Letter
When max retries exceeded or TaskStatus::DEAD returned, the message is forwarded to the dead letter exchange with:
x-failed-queue— original queue namex-failed-reason— failure descriptionx-failed-timestamp— Unix timestamp
Replay
After fixing a bug, replay dead-lettered messages back to their original queue. Same callback pattern as consuming — return an enum, the library handles the rest.
use PHPdot\Queue\Enum\ReplayAction; $result = $conn->replay('orders.dead') ->limit(100) ->execute(function (Message $msg): ReplayAction { echo "[{$msg->messageId()}] {$msg->failedReason()}\n"; // Bad payload — clean up DB and discard permanently if ($msg->failedReason() === 'Invalid payload') { Order::where('message_id', $msg->messageId())->delete(); return ReplayAction::REMOVE; } // Timeout errors — bug is fixed, send it back if (str_contains($msg->failedReason(), 'timeout')) { return ReplayAction::REPLAY; } // Unknown — leave in DLQ for investigation return ReplayAction::SKIP; }); echo "Replayed: {$result->replayed}, Removed: {$result->removed}, Skipped: {$result->skipped}\n";
Three actions per message:
- REPLAY — ack + republish to original exchange with original message ID. Dead-letter metadata stripped, retry counter reset.
- REMOVE — ack + discard permanently. Use the callback to clean up related data before it's gone.
- SKIP — nack with requeue. Message stays in DLQ for later.
Uses basic_get (pull mode) — processes available messages and stops. No infinite loop.
Compression
// Publish compressed $conn->message($largePayload) ->compress() ->publish('data', 'import.batch'); // Consume — transparent decompression $conn->consume('data.process') ->execute(function (Message $msg): TaskStatus { $body = $msg->body(); // already decompressed return TaskStatus::SUCCESS; });
Trace Propagation
Pass trace context as plain string headers. No coupling to any tracing library.
// Publish with trace $conn->message($payload) ->header(['traceparent' => $tracelog->getTraceparent()->toHeader()]) ->publish('orders', 'order.created'); // Consume with trace $conn->consume('orders.process') ->execute(function (Message $msg): TaskStatus { $traceparent = $msg->header('traceparent'); // '' if missing // Reconnect trace in your framework layer return TaskStatus::SUCCESS; });
Configuration
$config = new ConnectionConfig( host: 'rabbitmq.internal', port: 5672, username: 'app', password: 'secret', vhost: '/', timeoutMs: 30000, maxRetries: 3, retryDelayMs: 1000, exchanges: [ 'orders' => ['type' => 'direct', 'durable' => true], 'notifications' => ['type' => 'fanout', 'durable' => true], 'dead' => ['type' => 'direct', 'durable' => true], ], queues: [ 'orders.process' => [ 'bindings' => [ ['exchange' => 'orders', 'routing_key' => 'order.created'], ['exchange' => 'orders', 'routing_key' => 'order.updated'], ], 'retry' => ['enable' => true, 'delay_ms' => 500], 'dead' => 'dead', 'durable' => true, ], ], );
Connection Resilience
Auto-reconnect with exponential backoff:
Attempt 1: wait 1s
Attempt 2: wait 2s
Attempt 3: wait 4s
→ ConnectionException if all fail
Message API
$msg->body(); // message content (decompressed) $msg->messageId(); // UUIDv7 message ID $msg->queue(); // queue name $msg->header('key'); // header value or '' $msg->headers(); // all headers as array $msg->maxRetries(); // x-retries-max value $msg->priority(); // 0-10 $msg->originalExchange(); // x-original-exchange $msg->originalRoutingKey();// x-original-routing-key $msg->failedReason(); // x-failed-reason (on dead letters)
Package Structure
src/
├── Connection.php Main entry point
├── Publisher.php Fluent message builder
├── Consumer.php Message consumer with retry/dead letter
├── Replayer.php Dead letter queue replay
├── Message.php Immutable inbound message DTO
├── Config/
│ └── ConnectionConfig.php Connection and topology configuration
├── Enum/
│ ├── TaskStatus.php SUCCESS, RETRY, DEAD
│ └── ReplayAction.php REPLAY, REMOVE, SKIP
├── Result/
│ └── ReplayResult.php Replay outcome counts
├── Topology/
│ └── TopologyManager.php Exchange/queue/binding declaration
└── Exception/
├── QueueException.php Base exception
├── ConnectionException.php
├── PublishException.php
└── ConsumeException.php
Development
composer test # PHPUnit (unit tests only) composer test-all # PHPUnit (including integration, needs RabbitMQ) composer analyse # PHPStan level 10 composer cs-fix # PHP-CS-Fixer composer check # All three (unit + analyse + cs-check)
License
MIT