kickpeach / queue
基于beanstalkd实现的任务队列,方便去分发任务和解决任务
Installs: 26
Dependents: 1
Suggesters: 0
Security: 0
Stars: 1
Watchers: 1
Forks: 0
Open Issues: 0
pkg:composer/kickpeach/queue
Requires
- php: >=7.1.3
- pda/pheanstalk: ^3.1
Requires (Dev)
- phpunit/phpunit: ^5.4
This package is not auto-updated.
Last update: 2025-10-24 17:31:32 UTC
README
基于beanstalkd实现的任务队列,方便去分发任务和解决任务,此库可适用以下场景:
- 用作延时队列:比如可以用于如果用户30分钟内不操作,任务关闭。
- 用作定时任务:比如可以用于专门的后台任务。
- 用作异步操作:这是所有消息队列都最常用的,先将任务仍进去,顺序执行。
- 用作循环队列:用release命令可以循环执行任务,比如可以做负载均衡任务分发。
- 用作兜底机制:比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止
关于Beanstalkd
Beanstalkd 是一个轻量级的内存型队列,利用了和 Memcache 类似的协议。依赖 libevent 单线程事件分发机制, 可以部署多个实例,但是高并发支持还是不太友好;
几个重要的概念
- job:一个需要异步处理的任务,是 Beanstalkd 中的基本单元,需要放在一个 tube 中。
- tube:一个有名的任务队列,用来存储统一类型的 job,是 producer 和 consumer 操作的对象。
- producer:Job 的生产者,通过 put 命令来将一个 job 放到一个 tube 中。
- consumer:Job的消费者,通过 reserve/release/bury/delete 命令来获取 job 或改变 job 的状态。
Job的生命周期
任务在队里之中被称作 Job. 一个 Job 在 Beanstalkd 中有以下的生命周期:
- put 将一个任务放置进 tube 中
- deayed 这个任务现在再等待中,需要若干秒才能准备完毕【延迟队列】
- ready 这个任务已经准备好了,可以消费了。所有的消费都是要从取 ready 状态的 job
- reserved 这个任务已经被消费者消费
- release 这个 job 执行失败了,把它放进 ready 状态队列中。让其他队列执行
- bury 这个 job 执行失败了,但不希望其他队列执行,先把它埋起来
    
     put with delay               release with delay
      ----------------> [DELAYED] <------------.
                            |                   |
                            | (time passes)     |
                            |                   |
       put                  v     reserve       |       delete
      -----------------> [READY] ---------> [RESERVED] --------> *poof*
                           ^  ^                |  |
                           |   \  release      |  |
                           |    `-------------'   |
                           |                      |
                           | kick                 |
                           |                      |
                           |       bury           |
                        [BURIED] <---------------'
                           |
                           |  delete
                            `--------> *poof*
怎么使用
具体实例可参考测试例子
安装
composer require kickpeach/queue -vvv
Overview
Create Queue
use KickPeach\Queue\Drivers\Beanstalkd;
use KickPeach\Queue\Queue;
$queue = new Queue(new Beanstalkd($host, $port));
Create Job
<?php
use KickPeach\Queue\Job;
class ExampleJob extends Job
{
    /**
     * @var string job queue name (beanstalkd tube)
     */
    public $queue = 'default';
    /**
     * The "time to run" for all pushed jobs. (beanstalkd ttr, timeout)
     *
     * @var int 允许 worker 执行的最大秒数,超时 job 将会被 release 到 ready 状态.
     */
    public $retry_after = 60;
    /**
     * The number of times the job may be attempted.
     *
     * @var int 最大尝试次数
     */
    public $tries = 1;
    /**
     * @var array
     */
    public $words;
    public function __construct(array $words)
    {
        $this->words = $words;
    }
    public function handle()
    {
        var_export($this->words);
        var_dump($this->retry_after, $this->tries);
        // throw new \Exception('handle job with error...lol ^_^');
    }
}
specifying job queue by defining $queue , specifying Max Job Attempts by defining $tries , specifying timeout Values by defining $retry_after .
Dispatch Job
$queue->push(new ExampleJob(['i', 'love', 'china']));
of cause, you can dispatch job later (push a delayed job) :
$queue->later(60, new ExampleJob(['i', 'love', 'china']));
Process Job
$worker = new Worker($queue);
$worker->daemon();
Note:
$worker->daemon()is blocking.
by default, the worker will will listen  the tube named default, you can specifying worker queue (beanstalkd tube) like :
$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);
$worker->daemon();
you can specifying worker with sleep time while there is no job, and memoryLimit, like :
$sleep = 60;
$memoryLimit = 128;
$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);
$worker->daemon($sleep, $memoryLimit);
License
The MIT License (MIT).