mle86/wq

A simple library for work-queue and job handling.

v0.21.1 2022-05-24 09:37 UTC

README

Build Status Coverage Status Latest Stable Version PHP 8 License

This package provides an easy way to put PHP tasks of any kind into a work queue such as Beanstalkd or Redis to execute them at a later time.

This is version 0.21.1.

Installation and Dependencies

$ composer require mle86/wq

It requires PHP 8.0+ and has no other dependencies (apart from PHPUnit/Coveralls for development and the PSR-3 interfaces).

Adapter Implementations

You'll also want to install at least one other package which contains a WorkServerAdapter implementation, such as:

Basic Concepts

  • A Job is something which should be done exactly once. Maybe it's sending an e-mail, maybe it's an external API call like a webhook, maybe it's some slow clean-up process. In any case, we're talking about a unit of work that could be executed right away but it would be better for the application's performance to put it in a Work Queue instead, so it can be done asynchronously.

  • A Work Queue is a list of jobs that should be executed at some other time. They are stored in some kind of Work Server. One work server well-known in the PHP world is Beanstalkd. It can store any number of work queues, although it calls them “tubes”.

Different work queues, or tubes, are commonly used to separate job types. For example, the same work server might have one “mail” queue for outgoing mails to be sent, one “cleanup” queue for all kinds of clean-up jobs, and one “webhook” queue for outgoing web-hook calls.

This package provides some helpful classes to set up a simple work queue system.

Quick Start

This is our Job implementation. It represents an e-mail that can be sent.

<?php

use mle86\WQ\Job\AbstractJob;

class EMail extends AbstractJob
{
    protected $recipient;
    protected $subject;
    protected $message;
    
    public function __construct(string $recipient, string $subject, string $message)
    {
        $this->recipient = $recipient;
        $this->subject   = $subject;
        $this->message   = $message;
    }
    
    public function send()
    {
        if (mail($this->recipient, $this->subject, $this->message)) {
            // ok, has been sent!
        } else {
            throw new \RuntimeException ("mail() failed");
        }
    }
}

We have some code using that e-mail class:

<?php

use mle86\WQ\WorkServerAdapter\BeanstalkdWorkServer;

$mailJob = new EMail("test@myproject.xyz", "Hello?", "This is a test mail.");

$workServer = BeanstalkdWorkServer::connect("localhost");
$workServer->storeJob("mail", $mailJob);

And finally, we have our background worker script which regularly checks the work server for new e-mail jobs:

<?php

use mle86\WQ\WorkServerAdapter\BeanstalkdWorkServer;
use mle86\WQ\WorkProcessor;

$queue = "mail";
printf("%s worker %d starting.\n", $queue, getmypid());

$processor  = new WorkProcessor(BeanstalkdWorkServer::connect("localhost"));
$fn_handler = function(EMail $mailJob) {
    $mailJob->send();
    // don't catch exceptions here, or the WorkProcessor won't see them.
};

while (true) {
    try {
        $processor->processNextJob($queue, $fn_handler);
    } catch (\Throwable $e) {
        echo $e . "\n";  // TODO: add some real logging here
    }
}

Documentation

  1. Implementing a Job class
  2. Execute or Queue
  3. Work the Queue
  4. Error Handling
  5. Usage Examples

Class reference: