zwilias / qman
An evented beanstalkd queue manager
Requires
- php: >=5.5.0
- ext-ev: ^0.2
- jeremeamia/superclosure: ^2.1
- psr/log: ^1.0
- zwilias/beanie: ~0.1
Requires (Dev)
- phpunit/php-invoker: ^1.1
- phpunit/phpunit: ^4.7
- scrutinizer/ocular: ^1.1
Suggests
- monolog/monolog: Allows for powerful, granular logging. QMan has full support for psr/log, which this provides.
This package is not auto-updated.
Last update: 2025-01-04 19:36:17 UTC
README
An evented beanstalkd queue manager.
Core features
- Sane defaults, highly extensible and configurable
- Allows effortless reserving from a connection pool
- Supports graceful shutdown upon receiving a signal
- Protects job-execution from unexpected intrusions
- Extensible job-failure handling
- Built-in support for fine-grained PSR-3 logging
- Built-in support for queueing closures
Requirements
- PHP 5.5, PHP 5.6.
ev
does not support PHP 7 yet. - the pcntl extension.
ev
, an interface to libev, the high performance full-featured event-loop.- one or more instances of beanstalkd.
Use case
QMan is optimized for multiple beanstalkd instances and a pool of workers listening to all of those.
Examples
Queueing a closure
Queueing a closure is quite possibly the easiest thing
use QMan\QMan; $qMan = QMan::create(['localhost:11300']); $qMan->queueClosure(function () { echo 'Hello world!'; });
Essentially, this is equivalent to the following:
use QMan\QMan; use QMan\ClosureCommand; $qMan = QMan::create(['localhost:11300']); $qMan->queue(ClosureCommand::create(function () { echo 'Hello world!'; }));
Working the queue
Starting a worker with all the defaults injected, is easy:
use Beanie\Beanie; use QMan\WorkerBuilder; $beanie = Beanie::pool(['localhost:11300']); $worker = (new WorkerBuilder()) ->build($beanie); $worker->run();
The WorkerBuilder
ensures the QMan\Worker
is setup with all of its required dependencies and configuration.
Queueing custom commands
The ClosureCommand
, while convenient, comes with two major downsides:
- Serializing closures is rather expensive, in terms of computational power required
- Unit-testing closures quickly devolves into a huge mess
As such, you'll quickly be writing custom commands on a regular basis.
A command should implement QMan\CommandInterface
, which is easily done through extending QMan\AbstractCommand
:
use QMan\AbstractCommand; class CustomCommand extends AbstractCommand { public function getType() { return 'my.custom.command'; } public function execute() { echo $this->getData() * 5; return true; } }
The getType()
function should return a string which can be uniquely mapped to the class you want to execute. This
indirection is required in order to safely handle picking up stuff like renamed classes through a simple restart of the
worker.
In order for the QMan worker to pick up and execute your command, you'll need to make sure the instance of
CommandSerializerInterface
will pick it up. QMan comes with a generic implementation of this interface, aptly named
GenericCommandSerializer
. Let's make sure the class we created above is properly registered:
use Beanie\Beanie; use QMan\WorkerBuilder; use QMan\GenericCommandSerializer; $serializer = new GenericCommandSerializer(); $serializer->registerCommandType('my.custom.command', CustomCommand::class); $beanie = Beanie::pool(['localhost:11300']); $worker = (new WorkerBuilder()) ->withCommandSerializer($serializer) ->build($beanie); $worker->run();
You could easily futureproof your application by gathering this type <-> class mapping, and representing the types as constants:
final class Commands { const TYPE_CUSTOM_COMMAND = 'my.custom.command'; public static function $map = [ self::TYPE_CUSTOM_COMMAND => CustomCommand::class ]; }
QMan's GenericCommandSerializer
comes with a registerCommandTypes($map)
function which can handle exactly the case
described above.
Configuration
Each Worker
receives an instance of QManConfig
. The following properties are currently included:
(1): Assuming you're using the default GenericJobFailureStrategy
. Implementing a custom strategy for handling failed
jobs is, of course, perfectly possible.
Changing configuration is as simple as instantiating QManConfig
, setting your configuration preferences and passing it
to the CommandBuilder
:
use QMan\QManConfig; use QMan\QManBuilder; use Beanie\Beanie; $config = new QManConfig(); $config->setTerminationSignals([SIGTERM, SIGQUIT]); $beanie = Beanie::pool($servers); $worker = (new WorkerBuilder()) ->withQManConfig($config) ->build($beanie);
Handling failed jobs
By default, qMan will employ a very simple strategy when handling failed jobs:
- a failed job will either be buried or released again
- if a job has failed less than
maxTries
times in a row, it will be released with(tries in a row) * defaultFailureDelay
- else, when it has failed
maxTries
times in a row, it will be buried
Overriding this behavior can be done easily by implementing JobFailureStrategyInterface
(which extends both PSR-3's
LoggerAwareInterface
and qMan's ConfigAwareInterface
.
use Psr\Log\LoggerAwareTrait; use QMan\JobFailureStrategyInterface; use QMan\Job; use QMan\ConfigAwareTrait; class MyCustomJobFailureStrategy implements JobFailureStrategyInterface { use LoggerAwareTrait, ConfigAwareTrait; public function handleFailedJob(Job $job) { // Do stuff, like deleting the job after 10 total tries $stats = $job->stats(); if ($stats['reserves'] > 10) { $this->logger->alert('Deleting job after failing to successfully execute over 10 times', ['job' => $job]); $job->delete(); } } } use QMan\WorkerBuilder; $worker = (new WorkerBuilder)->withJobFailureStrategy(new MyCustomJobFailureStrategy())->build([...]);
Contributing
Pull requests are appreciated. Make sure code-quality (according to scrutinizer) doesn't suffer too badly and all code is thoroughly unit-tested.
Running the tests locally:
$ git clone https://github.com/zwilias/qman.git
$ cd qman
$ composer install
$ vendor/bin/phpunit
License
Copyright (c) 2015 Ilias Van Peer
Released under the MIT License, see the enclosed LICENSE
file.