mle86/wq-redis

A Redis module for mle86/wq using the phpredis extension

v1.0.2 2018-11-01 19:33 UTC

This package is auto-updated.

Last update: 2024-10-29 04:47:30 UTC


README

This package contains the PHP class mle86\WQ\WorkServerAdapter\RedisWorkServer.

It supplements the mle86/wq package by implementing its WorkServerAdapter interface.

It connects to a Redis server using the phpredis extension.

Version and Compatibility

This is version 1.0.2 of mle86/wq-redis.

It was developed for version 1.0.0 of mle86/wq and should be compatible with all of its future 1.x versions as well.

Installation and Dependencies

$ sudo apt install php-redis  # to install the phpredis extension
$ composer require mle86/wq-redis

It depends on PHP 7.1, mle86/wq, and the phpredis extension.

Class reference

(class mle86\WQ\WorkServerAdapter\RedisWorkServer implements WorkServerAdapter)

It connects to a Redis server.

Because Redis does not have delayed entries, reserved entries, or buried entries, this class uses several custom workarounds to emulate those features.

For every $workQueue used, this class will create multiple Redis keys:

  • _wq.$workQueue (ready jobs – List)
  • _wq_delay.$workQueue (delayed jobs – Ordered Set)
  • _wq_buried.$workQueue (buried jobs – List)

The delaying mechanism was inspired by this StackOverflow response.

  • public function __construct (\Redis $serverConnection)
    Takes an already-configured Redis instance to work with. Does not attempt to establish a connection itself – use the connect() factory method for that instead or do it with Redis::connect() prior to using this constructor.
  • public function connect ($host = "localhost", $port = 6379, $timeout = 0.0, $retry_interval = 0)
    Factory method. This will create a new Redis instance by itself.
    See Redis::connect() for the parameter descriptions.

Interface methods which are documented in the WorkServerAdapter interface:

  • public function storeJob (string $workQueue, Job $job, int $delay = 0)
  • public function getNextQueueEntry ($workQueue, int $timeout = DEFAULT_TIMEOUT) : ?QueueEntry
  • public function buryEntry (QueueEntry $entry)
  • public function requeueEntry (QueueEntry $entry, int $delay, string $workQueue = null)
  • public function deleteEntry (QueueEntry $entry)

Usage example

<?php
use mle86\WQ\WorkServerAdapter\RedisWorkServer;
use mle86\WQ\WorkProcessor;
use mle86\WQ\Job\Job;

$processor = new WorkProcessor( new RedisWorkServer("localhost") );

while (true) {
    $processor->processNextJob("webhook", function(Job $job) {
        $job->...;
    });
}

This executes all jobs available in the local Redis server's “webhook” queue, forever. It will however abort if one of the jobs throws an exception – you might want to add a logging try-catch block around the processNextJob() call as shown in WQ's “Quick Start” example.