jaapieaapie1/rabbitmq-ext

A PHP extension for RabbitMQ, written in Rust using ext-php-rs and lapin

Maintainers

Package info

github.com/jaapieaapie1/rabbitmq-ext

Language:Rust

Type:php-ext

Ext name:ext-rabbitmq_ext

pkg:composer/jaapieaapie1/rabbitmq-ext

Statistics

Installs: 12

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

1.0.0 2026-04-03 19:13 UTC

This package is auto-updated.

Last update: 2026-04-03 19:14:09 UTC


README

A PHP extension for consuming RabbitMQ queues, written in Rust using ext-php-rs and lapin.

Why

The benefit of this approach compared to implementing an RabbitMQ client in PHP is that we can multithread and thus keep sending heartbeats even though the php process is busy doing other things.
This makes it so the connection stays open even when you are processing a job that takes 30 minutes.

Support

This package officially supports and is tested against PHP 8.1 - 8.5.

Installing

To install the extension use PHP's official extension installer PIE. The extension is published on packagist.
For most architecture php combinations prebuilt binaries are available (only PIE version > 1.4 supports downloading prebuilt binaries)
If your specific setup is not prebuilt Rust's buildtools are required to build this library.

pie install jaapieaapie1/rabbitmq-ext

For IDE's and PHPStan there are stubs available as a composer package.

composer require --dev jaapieaapie1/rabbitmq-ext-stubs

Usage

Consuming messages

use RabbitMQ\Connection;

$connection = new Connection('amqp://guest:guest@localhost:5672');
$consumer = $connection->consume('my-queue', prefetchCount: 50);

$consumer->each(function ($message) {
    echo $message->getBody() . PHP_EOL;

    $message->ack();

    return true; // return false to stop consuming
});

$connection->close();

Consuming with a timeout

$consumer = $connection->consume('my-queue');

while ($message = $consumer->next(timeoutMs: 5000)) {
    echo $message->getRoutingKey() . ': ' . $message->getBody() . PHP_EOL;
    $message->ack();
}

// null returned — timed out or consumer was cancelled
$connection->close();

Publishing messages

$connection = new Connection('amqp://guest:guest@localhost:5672');

// Publish and wait for broker confirmation
$connection->publish('my-exchange', 'routing.key', '{"event":"order.created"}', [
    'x-request-id' => 'abc-123',
]);

// Fire-and-forget (confirmed before connection closes)
$connection->publishAsync('my-exchange', 'routing.key', 'payload');

$connection->close();

Handling failures

$consumer = $connection->consume('my-queue');

$consumer->each(function ($message) {
    try {
        processMessage($message->getBody());
        $message->ack();
    } catch (\Throwable $e) {
        $message->nack(requeue: false); // dead-letter the message
    }

    return true;
});