mle86/wq-amqp

An AMQP module for mle86/wq using the php-amqplib connector

v1.4.0 2022-06-28 08:56 UTC

This package is auto-updated.

Last update: 2024-12-28 14:50:09 UTC


README

This package contains the PHP class mle86\WQ\WorkServerAdapter\AMQPWorkServer.

It supplements the mle86/wq package by implementing its WorkServerAdapter interface.

It connects to an AMQP server such as RabbitMQ using the php-amqplib/php-amqplib package.

It creates durable queues in the default exchange (“”) for immediate-delivery jobs and the durable “_phpwq._delayed” queue in the custom “_phpwq._delay_exchange” exchange for delayed jobs. Jobs stored with storeJob() are always durable as well. Empty queues or exchanges won't be deleted automatically.

Version and Compatibility

This is version 1.4.0 of mle86/wq-amqp.

It was developed for version 1.0.0 of mle86/wq and should be compatible with all of its future 1.x versions as well.

Installation and Dependencies

$ composer require mle86/wq-amqp

It requires PHP 8.0+.

It depends on mle86/wq and php-amqplib/php-amqplib, which in turn requires the mbstring, sockets, and bcmath PHP extensions.

Class reference

class mle86\WQ\WorkServerAdapter\AMQPWorkServer implements WorkServerAdapter

  • public function __construct (AMQPStreamConnection $connection)
    Constructor. Takes an already-configured AMQPStreamConnection instance to work with. Does not attempt to establish a connection itself – use the connect() factory method for that instead.
  • public static function connect ($host = 'localhost', $port = 5672, $user = 'guest', $password = 'guest', $vhost = '/', $insist = false, $login_method = 'AMQPLAIN', $login_response = null, $locale = 'en_US', $connection_timeout = 3.0, $read_write_timeout = 3.0, $context = null, $keepalive = false, $heartbeat = 0)
    Factory method. See AMQPStreamConnection::__construct for the parameter descriptions.

Interface methods which are documented in the WorkServerAdapter interface:

  • public function storeJob (string $workQueue, Job $job, int $delay = 0)
  • public function getNextQueueEntry ($workQueue, int $timeout = DEFAULT_TIMEOUT): ?QueueEntry
  • public function buryEntry (QueueEntry $entry)
  • public function requeueEntry (QueueEntry $entry, int $delay, string $workQueue = null)
  • public function deleteEntry (QueueEntry $entry)

Usage example

<?php
use mle86\WQ\WorkServerAdapter\AMQPWorkServer;
use mle86\WQ\WorkProcessor;
use mle86\WQ\Job\Job;

$processor = new WorkProcessor( AMQPWorkServer::connect("localhost") );

while (true) {
    $processor->processNextJob("mail", function(Job $job) {
        $job->...;
    });
}

This executes all jobs available in the local AMQP server's “mail” queue, forever. It will however abort if one of the jobs throws an exception – you might want to add a logging try-catch block around the processNextJob() call as shown in WQ's “Quick Start” example.