assegaiphp/rabbitmq

RabbitMQ queue integration for AssegaiPHP framework, providing decorators and tools for producing and consuming AMQP-based jobs.

1.0.0 2025-07-15 21:46 UTC

This package is auto-updated.

Last update: 2025-07-15 22:24:18 UTC


README

Assegai Logo

A progressive PHP framework for building efficient and scalable web applications.

AssegaiPHP RabbitMQ Queue Integration

License AssegaiPHP

This package provides RabbitMQ queue support for the AssegaiPHP framework. It enables asynchronous job handling using AMQP through PhpAmqpLib.

๐Ÿ“ฆ Installation

Install via Composer:

composer require assegaiphp/rabbitmq

Or use the Assegai CLI:

assegai add rabbitmq

โš™๏ธ Configuration

Update your application's config/queues.php file to register the RabbitMQ driver and define your connections:

<?php

return [
  'drivers' => [
    'rabbitmq' => 'Assegai\\RabbitMQ\\RabbitMQQueue'
  ],
  'connections' => [
    'rabbitmq' => [
      'notifications' => [
        'host' => 'localhost',
        'port' => 5672,
        'username' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'exchange_name' => 'notifications',
        'routing_key' => 'notifications',
        'passive' => false,
        'durable' => true,
        'exclusive' => false,
        'auto_delete' => false,
      ],
    ],
  ],
];

๐Ÿ“ Note:

  • The drivers key maps queue driver names (like 'rabbitmq') to their fully qualified class names.
  • The connections key defines queue configurations by driver and queue name (e.g., 'rabbitmq.notifications').

โœจ Usage

Producing Jobs

Inject the queue connection using the #[InjectQueue] attribute:

use Assegai\Core\Queues\Attributes\InjectQueue;
use Assegai\Core\Queues\Interfaces\QueueInterface;

readonly class NotificationsService
{
  public function __construct(
    #[InjectQueue('rabbitmq.notifications')] private QueueInterface $queue
  ) {}

  public function send(array $payload): void
  {
    $this->queue->add($payload);
  }
}

Consuming Jobs

Define a consumer class with the #[Processor] attribute:

use Assegai\Core\Queues\Attributes\Processor;
use Assegai\Core\Queues\WorkerHost;
use Assegai\Core\Queues\QueueProcessResult;
use Assegai\Core\Queues\Interfaces\QueueProcessResultInterface;

#[Processor('rabbitmq.notifications')]
class NotificationsConsumer extends WorkerHost
{
  public function process(callable $callback): QueueProcessResultInterface
  {
    $job = $callback();
    $data = $job->data;

    echo "Processing notification: {$data->message}" . PHP_EOL;

    return new QueueProcessResult(data: ['status' => 'done'], job: $job);
  }
}

โš ๏ธ Do not use #[Injectable] on consumers. The process() method must accept a callable and return a QueueProcessResultInterface.

Running the Worker

If you have the Assegai CLI installed simply run the queue worker with:

assegai queue:work

This will automatically load and execute all consumer classes registered with the #[Processor] attribute.

You can also run a specific consumer directly:

assegai queue:work --consumer=NotificationsConsumer

This will start the worker for the NotificationsConsumer class, processing jobs from the rabbitmq.notifications queue.

For more information on running workers, refer to the AssegaiPHP documentation.

๐Ÿงช Testing

You can trigger jobs via your API or CLI and observe processing output in the worker terminal.

๐Ÿ“š Resources

Support

Assegai is an MIT-licensed open source project. It can grow thanks to sponsors and support by the amazing backers. If you'd like to join them, please read more here.

Stay in touch

License

Assegai is MIT licensed.