jaapieaapie1 / rabbitmq-ext
A PHP extension for RabbitMQ, written in Rust using ext-php-rs and lapin
Package info
github.com/jaapieaapie1/rabbitmq-ext
Language:Rust
Type:php-ext
Ext name:ext-rabbitmq_ext
pkg:composer/jaapieaapie1/rabbitmq-ext
Requires
- php: >=8.1
Requires (Dev)
- pestphp/pest: ^3.0
README
A PHP extension for consuming RabbitMQ queues, written in Rust using ext-php-rs and lapin.
Why
The benefit of this approach compared to implementing an RabbitMQ client in PHP
is that we can multithread and thus keep sending heartbeats even though the php
process is busy doing other things.
This makes it so the connection stays open even when you are processing a job
that takes 30 minutes.
Support
This package officially supports and is tested against PHP 8.1 - 8.5.
Installing
To install the extension use PHP's official extension installer PIE.
The extension is published on packagist.
For most architecture php combinations prebuilt binaries are available
(only PIE version > 1.4 supports downloading prebuilt binaries)
If your specific setup is not prebuilt
Rust's buildtools
are required to build this library.
pie install jaapieaapie1/rabbitmq-ext
For IDE's and PHPStan there are stubs available as a composer package.
composer require --dev jaapieaapie1/rabbitmq-ext-stubs
Usage
Consuming messages
use RabbitMQ\Connection; $connection = new Connection('amqp://guest:guest@localhost:5672'); $consumer = $connection->consume('my-queue', prefetchCount: 50); $consumer->each(function ($message) { echo $message->getBody() . PHP_EOL; $message->ack(); return true; // return false to stop consuming }); $connection->close();
Consuming with a timeout
$consumer = $connection->consume('my-queue'); while ($message = $consumer->next(timeoutMs: 5000)) { echo $message->getRoutingKey() . ': ' . $message->getBody() . PHP_EOL; $message->ack(); } // null returned — timed out or consumer was cancelled $connection->close();
Publishing messages
$connection = new Connection('amqp://guest:guest@localhost:5672'); // Publish and wait for broker confirmation $connection->publish('my-exchange', 'routing.key', '{"event":"order.created"}', [ 'x-request-id' => 'abc-123', ]); // Fire-and-forget (confirmed before connection closes) $connection->publishAsync('my-exchange', 'routing.key', 'payload'); $connection->close();
Handling failures
$consumer = $connection->consume('my-queue'); $consumer->each(function ($message) { try { processMessage($message->getBody()); $message->ack(); } catch (\Throwable $e) { $message->nack(requeue: false); // dead-letter the message } return true; });