A lightweight queue implementation

1.0.0 2017-12-07 18:55 UTC

README

分离自ThinkPHP使用REDIS持久化的消息队列管理类,支持普通队列延时队列两种模式

安装

composer require easyswoole/queue

初始化配置

在框架frameInitialized事件里进行初始化操作

use easySwoole\Queue\Connector\Redis;
use easySwoole\Queue\Queue;

function frameInitialized()
{
    $RedisConf = [
        'default'    => 'default',   // 默认队列名称
        'host'       => '127.0.0.1', // Redis地址
        'password'   => '',          // 如果有密码则需要填写此项
        'port'       => 6379,        // Redis端口
        'select'     => 0,           // 使用的数据库序号
        'timeout'    => 0,           // 链接超时
        'expire'     => 60,          // 任务执行超时时间
        'persistent' => true         // 是否开启长连接
    ];

    // 初始化队列
    $RedisConnector = new Redis($RedisConf);
    Queue::init($RedisConnector);
}

建立任务处理类

任务处理类需要实现 easySwoole\Queue\JobInterface 接口,其中fire方法是必须实现的方法

<?php

use easySwoole\Queue\JobInterface;
use easySwoole\Queue\Job\Job;

class someJobs implements JobInterface
{

    /**
     * 执行任务
     * @param \easySwoole\Queue\Job\Job $Job
     * @param mixed $data 任务参数
     */
    public function fire(Job $Job, $data)
    {
        // 执行一些任务处理逻辑

        $Job->delete();    // 任务完成后需要指定删除
        $Job->failed();    // 本次的任务处理失败 退回队列
        $Job->release();   // 拿到不能处理的任务 退回队列

    }

    /**
     * 任务达到最大重试次数
     * @param mixed $data 任务参数
     */
    public function failed($data)
    {
        // 任务到达最大重试次数后执行本方法
        // 可用于发送通知或日志记录等收尾工作
    }
}

将任务投递到队列

在业务逻辑中像下面这样进行投递

function deliver()
{
    /**
     * 投递普通任务
     * @param string $job 任务处理类的完全名称(包含全命名空间)
     * @param mixed $data 任务的自定义数据
     * @param string $queue 任务队列的名称
     */
    Queue::push(someJobs::class, 'someTaskData', 'QueueName');

    /**
     * 投递延时任务
     * @param int $delay 任务延时秒数
     * @param string $job 任务处理类的完全名称(包含全命名空间)
     * @param mixed $data 任务的自定义数据
     * @param string $queue 任务队列的名称
     */
    Queue::later(30, someJobs::class, 'someTaskData', 'QueueName');
}

监听任务队列

EventframeInitializeonWorkerStart事件中添加如下代码启动Worker进行队列监听

use easySwoole\Queue\Listener;
use Core\Component\ShareMemory;
use Core\Swoole\AsyncTaskManager;
use Core\Swoole\Timer;
function frameInitialize()
{
    ShareMemory::getInstance()->clear(); // 运行环境清理
}

其中Listenerlisten方法可以接受三个参数,按顺序分别是

  • QueueName : 不传则监听所有任务 传入名称则监听指定队列 用逗号分隔名称监听多个队列
  • maxTries : 每个任务的最大尝试次数
  • delay : 如果任务失败间隔几秒可以被再次获取
function onWorkerStart(\swoole_server $server, $workerId)
{
    // 获得最大TaskWorker数量
    $TaskWorkerNum = Config::getInstance()->getConf('SERVER.CONFIG.task_worker_num');
    if ($workerId == 0) {
        // 启动定时器每3秒投递一个Listener
        Timer::loop(3, function () use ($TaskWorkerNum) {
            $share = ShareMemory::getInstance();
            // 请勿使得所有Worker都在繁忙状态 危险操作
            if ($share->get('TASK_RUNNING_NUM') < $TaskWorkerNum - 1) {
                AsyncTaskManager::getInstance()->add(function () use ($share) {
                    // Worker计数器自增
                    $share->startTransaction();
                    $share->set('TASK_RUNNING_NUM', $share->get(WorkConsts::TASK_RUNNING_NUM) + 1);
                    $share->commit();
                    // 启动一个任务监听
                    $listener = new Listener;
                    $listener->listen('QueueName,OtherName', 3, 5);
                    return true;
                }, -1, function () use ($share) {
                    // Worker计数器自减
                    $share->startTransaction();
                    $share->set('TASK_RUNNING_NUM', $share->get(WorkConsts::TASK_RUNNING_NUM) - 1);
                    $share->commit();
                });
            }
        });
    }
}