airygen/laravel-amqp-producer

RabbitMQ publisher for Laravel with confirms, retries, DLX strategy.

1.0.2 2025-09-25 11:14 UTC

This package is not auto-updated.

Last update: 2025-09-25 14:29:44 UTC


README

Laravel-oriented RabbitMQ publisher with:

  • Publisher confirms
  • Mandatory publish (detect unroutable messages)
  • Exponential retry for transient AMQP errors
  • Structured automatic headers: request_id, source, env, ISO8601 datetime
  • Multi-connection support (choose connection per payload)

This library focuses on publishing only. For queue workers / consumers you can use: vyuldashev/laravel-queue-rabbitmq.

Installation

composer require airygen/laravel-amqp-producer

Publish Configuration

php artisan vendor:publish --provider="Airygen\\RabbitMQ\\RabbitMQServiceProvider" --tag=config

Generated file: config/amqp.php

return [
    'retry' => [
        'base_delay' => 0.2,
        'max_delay' => 1.5,
        'jitter' => false, // set true to randomize backoff (helps avoid thundering herd)
    ],
    'connections' => [
        'default' => [
            'host' => env('AMQP_HOST', '127.0.0.1'),
            'port' => (int) env('AMQP_PORT', 5672),
            'user' => env('AMQP_USER', 'guest'),
            'password' => env('AMQP_PASSWORD', 'guest'),
            'vhost' => env('AMQP_VHOST', '/'),
            'options' => [
                'lazy' => true,
                'keepalive' => true,
                'heartbeat' => 60,
                // Channel reuse removed: each publish opens/closes channel and connection
            ],
        ],
        // Add more named connections if needed
        // 'analytics' => [ ... ],
    ],
];

Defining a Payload

Extend ProducerPayload and (optionally) override connection / exchange / routing key.

use Airygen\RabbitMQ\ProducerPayload;

final class MemberCreatedPayload extends ProducerPayload
{
    protected string $connectionName = 'default';              // optional (defaults to 'default')
    protected ?string $exchangeName = 'ex.members';            // required if you publish to a non-empty exchange
    protected ?string $routingKey = 'member.created';          // required for direct/topic exchanges
}

Basic Publish

use Airygen\RabbitMQ\Publisher;

$publisher = app(Publisher::class);
$publisher->publish(new MemberCreatedPayload(['id' => 123]));

Custom Headers

$publisher->publish(
    new MemberCreatedPayload(['id' => 123]),
    header: ['foo' => 'bar']
);

Batch Publish

$payloads = [
    new MemberCreatedPayload(['id' => 1]),
    new MemberCreatedPayload(['id' => 2]),
];

$publisher->batchPublish($payloads);

Retry Strategy

The publisher retries transient AMQP IO / protocol errors (IO / protocol channel exceptions) with exponential backoff:

  • Initial delay base_delay (~200ms), doubled each attempt, capped at max_delay (~1500ms)
  • Default attempts: 3

Custom rule:

$publisher->publish(
    new MemberCreatedPayload(['id' => 1]),
    retryTimes: 5,
    when: function (Throwable $e): bool {
        return $e instanceof PhpAmqpLib\Exception\AMQPIOException
            || str_contains($e->getMessage(), 'timeout');
    }
);

Disable retry: Enable jitter:

config(['amqp.retry.jitter' => true]);
$publisher->publish($payload);

Jitter multiplies each delay by a random factor ~0.85 - 1.15.

$publisher->publish(
    new MemberCreatedPayload(['id' => 1]),
    retryTimes: 1,
    when: fn() => false
);

Multi-Connection Example

final class AnalyticsEventPayload extends ProducerPayload
{
    protected string $connectionName = 'analytics';
    protected ?string $exchangeName = 'ex.analytics';
    protected ?string $routingKey = 'event.ingest';
}

$publisher->publish(new AnalyticsEventPayload(['type' => 'login']));

Automatic Headers Added

MessageFactory injects:

  • request_id (existing X-Request-Id header or a new UUID)
  • source (Laravel app name)
  • env (current environment)
  • datetime (ISO8601)

You can still provide additional custom headers; your keys override defaults if duplicated.

Header precedence: custom headers provided in publish() / batchPublish() override automatically injected keys when the same key exists.

Metrics & Stats

The package ships with an in-memory static counter registry Stats intended for lightweight instrumentation or exporting into your own monitoring system.

Global counters:

  • publish_attempts
  • publish_retries
  • publish_failures
  • connection_resets

Per‑connection counters (nested under per_connection[connection_name]):

  • publish_attempts
  • publish_retries
  • publish_failures
  • connection_resets

Example snapshot:

use Airygen\RabbitMQ\Support\Stats;
$snapshot = Stats::snapshot();
// [ 'publish_attempts' => 10, 'per_connection' => ['default' => ['publish_attempts' => 7]] ]

You can reset counters (e.g. at the start of a test or scheduled export) with:

Stats::reset();

For production telemetry, consider periodically reading the snapshot and pushing to Prometheus / OpenTelemetry.*

Note: These counters are process‑local (not shared across workers). If you run Octane/Swoole multi-worker, aggregate externally.

Behavior: Always Open/Close

For operational safety across PHP-FPM/CLI and worker runtimes (Octane/Swoole/RoadRunner), this package always opens a fresh channel for each publish and closes both channel and connection afterwards.

Octane / Swoole / RoadRunner

Health Check Command

Run a simple connectivity probe:

php artisan rabbitmq:ping            # test all configured connections
php artisan rabbitmq:ping secondary  # test a specific connection

Exit code is non‑zero on failure (suitable for container readiness / liveness probes). Long-lived worker environments reuse PHP processes, so you must ensure stale connections/channels don't leak across deploys or forks.

Built-in safeguards:

  • On worker start/stop (Octane events) the connection manager reset() is invoked (if Octane is installed).
  • Connections are opened and closed per publish; you can call ConnectionManager::reset() manually if desired.

Recommended practices:

  1. Avoid holding a Publisher instance in static singletons you construct before workers fork.
  2. Call app(ProducerInterface::class) per request/job (container will reuse safe singleton manager underneath).
  3. If you rotate workers periodically, no extra action is needed—the hook already clears state.
  4. For Swoole without Octane events, you can manually schedule: ConnectionManager::reset() during your custom lifecycle hooks.

Optional manual reset example:

// e.g. in a scheduled task or health hook
app(\Airygen\RabbitMQ\Support\ConnectionManager::class)->reset();

TLS / SSL (Optional)

If you need TLS encryption, enable and configure the SSL related options inside config/amqp.php:

    'connections' => [
        'default' => [
            // ... host, port, user, password, vhost
            'options' => [
                'ssl' => true,
                'cafile' => base_path('certs/ca.pem'),
                'local_cert' => base_path('certs/client.pem'),
                'local_pk' => base_path('certs/client.key'),
                'verify_peer' => true,
                // 'passphrase' => env('AMQP_CERT_PASSPHRASE'),
            ],
        ],
    ],

The factory will build a stream context when ssl is truthy and any of the certificate fields are present. If verify_peer is enabled, ensure cafile is supplied.

Roadmap

  • Pluggable metrics exporter interface
  • Circuit breaker / total backoff budget
  • Async publisher confirm pipeline
  • Mandatory publish return callbacks (unroutable detection)
  • Dead letter / delayed publish helpers

Development

Dockerized Workflow

All commands are wrapped to run inside the php service defined in docker-compose.yml.

Startup & install:

docker compose up -d rabbitmq
docker compose build php
docker compose run --rm php composer install

Using Makefile targets:

make unit          # run unit tests
make integration   # run integration tests (requires rabbitmq service up)
make test          # unit + integration
make coverage      # generates coverage/html & coverage/clover.xml
make lint          # code style check
make analyse       # phpstan static analysis
make fix           # auto-fix style

Manual (host) without Docker wrapper:

php -d xdebug.mode=off vendor/bin/phpunit --testsuite Unit
INTEGRATION_TESTS=1 php -d xdebug.mode=off vendor/bin/phpunit --testsuite Integration

Management UI: http://localhost:15672 (guest / guest)

If you prefer to run host-native (without docker) use the host:* scripts or call PHPUnit directly.

Prometheus Metrics (Skeleton)

The package keeps lightweight in-memory counters (not persisted). A minimal Prometheus text exporter is provided.

Artisan command:

php artisan rabbitmq:metrics            # full HELP/TYPE + samples
php artisan rabbitmq:metrics --raw      # only metric lines

Example output:

# HELP rabbitmq_publish_attempts_total Total publish attempts (before confirm).
# TYPE rabbitmq_publish_attempts_total counter
rabbitmq_publish_attempts_total 42
... (other metrics)

Per-connection metrics are emitted with a connection label, e.g.:

rabbitmq_connection_publish_attempts_total{connection="primary"} 10

You can bind your own implementation of Airygen\\RabbitMQ\\Contracts\\MetricsExporterInterface if you need richer aggregation or to integrate with an existing metrics system.

License

MIT