jetea/queue

a job queue

2.0.1 2019-01-25 06:41 UTC

This package is auto-updated.

Last update: 2024-04-25 20:01:03 UTC


README

A job queue based on beanstalkd(and other queue system), easy to dispatch job and handle job.

Keywords

queue, job, easy handle, job with timeout, delayed job, retry_after, given connection, given queue, given tube, memory limit,reload

Installation

composer require "jetea/queue:~2.0"

Overview

Create Queue

use Jetea\Queue\Drivers\Beanstalkd;
use Jetea\Queue\Queue;

$queue = new Queue(new Beanstalkd($host, $port));

Create Job

<?php

use Jetea\Queue\Job;

class ExampleJob extends Job
{
    /**
     * @var string job queue name (beanstalkd tube)
     */
    public $queue = 'default';

    /**
     * The "time to run" for all pushed jobs. (beanstalkd ttr, timeout)
     *
     * @var int 允许 worker 执行的最大秒数,超时 job 将会被 release 到 ready 状态.
     */
    public $retry_after = 60;

    /**
     * The number of times the job may be attempted.
     *
     * @var int 最大尝试次数
     */
    public $tries = 1;

    /**
     * @var array
     */
    public $words;

    public function __construct(array $words)
    {
        $this->words = $words;
    }

    public function handle()
    {
        var_export($this->words);

        var_dump($this->retry_after, $this->tries);

        // throw new \Exception('handle job with error...lol ^_^');
    }
}

specifying job queue by defining $queue , specifying Max Job Attempts by defining $tries , specifying timeout Values by defining $retry_after .

Dispatch Job

$queue->push(new ExampleJob(['i', 'love', 'china']));

of cause, you can dispatch job later (push a delayed job) :

$queue->later(60, new ExampleJob(['i', 'love', 'china']));

Process Job

$worker = new Worker($queue);

$worker->daemon();

Note: $worker->daemon() is blocking.

by default, the worker will will listen the tube named default, you can specifying worker queue (beanstalkd tube) like :

$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);

$worker->daemon();

you can specifying worker with sleep time while there is no job, and memoryLimit, like :

$sleep = 60;
$memoryLimit = 128;

$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);

$worker->daemon($sleep, $memoryLimit);

Notice, if you want reload queue worker, you should implement queueShouldRestartmethod on class Jetea\Queue\Worker.

Recommend

it's highly recommended that extends Jetea\Queue\Worker, and you should implements these methods: logProcessError, handleWithObj, queueShouldRestart.

Other

the queue package can work on other queue system, you should implements these interface Jetea\Queue\Drivers\Jobs\JobInterface, Jetea\Queue\Drivers\QueueInterface.

Example

Link

Todo

  • add some useful command, like clearn queue job or kick buried job, etc.