jeruier/swozr-taskr

task processing service component

v1.21 2020-03-31 02:02 UTC

This package is auto-updated.

Last update: 2024-04-29 04:27:09 UTC


README

php任务服务组件

   _____                             ______           __
  / ___/      ______  ____  _____   /_  __/___ ______/ /_______
  \__ \ | /| / / __ \/_  / / ___/    / / / __ `/ ___/ //_/ ___/
 ___/ / |/ |/ / /_/ / / /_/ /       / / / /_/ (__  ) ,< / /
/____/|__/|__/\____/ /___/_/       /_/  \__,_/____/_/|_/_/    

Php Version Swoole Version

简介

在一些业务场景需要用户异步任务或秒级的定时耗时任务,在一些非swoole的php框架中未支持,在因需要共用一些业务框架逻辑或 不想再维护另一个单独部署的swoole框架的场景下,该任务服务组件可以解决这类问题。

功能特色

  • 异步任务
  • 延迟任务
  • 秒级定时任务
  • RabbmitMQ任务

Install

  • composer command
composer require jeruier/swozr-taskr

Taskr Server Actions

  • status() Taskr服务运行状态
  • reload() 重新加载工作进程
  • restart(bool $onlyTaskWorker = false) 重新启动Taskr服务 @param $onlyTaskWorker 是否只重启work进程
  • start() 启动Taskr服务
  • stop() 停止当前正在运行的Taskr服务

配置服务

配置参数皆为可选,设有默认值

在可以配置应用组件的框架下配置taskr组件,你可以看到taskr数组里面包含了taskr的基本信息

    'task' => [
        'class' => \Swozr\Taskr\Server\TaskrEngine::class,
        'host' => '0.0.0.0',
        'port' => 9501,
        'MQProcessMum' => 3,
        'setting' => [
            'task_worker_num' => 12,
            ...
        ],
        'listener' => [
            ...
        ],
        'exceptionHandler' => [
            ...
        ],
        'crontabs' => [
            ...
        ],
        'rabbmitMqs' => [
            ...
        ],
    ],

例如Yii框架可以使用 Yii:$app->task->start(); 来启动服务

类配置参数启动

    $config = [
        'host' => '0.0.0.0',
        'port' => 9501,
        'MQProcessMum' => 3,
        'setting' => [
             'task_worker_num' => 12,
            ...
        ],
        'listener' => [
            ...
        ],
        'exceptionHandler' => [
            ...
        ],
        'crontabs' => [
            ...
        ],
        'rabbmitMqs' => [
            ...
        ],
   ];

使用(new \Swozr\Taskr\Server\TaskrEngine($config))->start() 来启动服务

可配置项:

以下配置项皆为可选配置(非必须)

  • host 服务地址,默认值 0.0.0.0
  • port 端口,默认值 9501
  • MQProcessMum 配置rabbmitMq任务运行处理进程数,默认值1(在配置了rabbmitMqs时生效,且设置值需要小于setting的task_worker_num,配置的这几个任务经常将全权处理rabbmitMq任务,配置的task_worker_num减MQProcessMum的多余任务才会执行其他类型的任务)
  • type 指定Socket的类型,支持TCP、UDP、TCP6、UDP6、UnixSocket Stream/Dgram 等 Swoole Server 构造函数 第四个参数
  • pidName 启动后进程的名称,默认值swozr-taskr
  • pidFile pid存放路径,默认值/tmp/swozr.pid
  • logFile 指定swoole错误日志文件。在swoole运行期发生的异常信息会记录到这个文件中。默认会打印到屏幕,默认值/tmp/swoole.log
  • debug 是否开启debug,默认值true
  • on 配置监听的事件
  • setting 参考 Swoole Server 配置选项
  • listener 注册事件、设置对应事件的处理监听,事件触发组件调用,在任务里面使用
  • exceptionHandler 自定义异常处理类
  • crontabs 需要处理的定时任务集
  • rabbmitMqs 配置rabbmitMq任务

listener配置

listener为键为事件名称值为事件处理或事件处理数组

事件名称
  • Swoole事件:Swoole 文档中的每个事件,在 Swoft 里面均可监听,并且可以存在多个监听器。(完整事件列表请参阅 SwooleEvent.php 文件)
  • Swoft事件:基于 Swoole 的回调处理扩展了一些可用 Server 事件,提供更加精细的操作空间(完整事件列表请参阅 ServerEvent.php 文件)
事件处理定义

Event $event 事件

  • 闭包方式
   'listener' => [
       ServerEvent::BEFORE_ADDED_EVENT => function(Event $event){
           //触发事件后处理
           $target = $event->getTarget(); //事件触发所传递
           $param = $event->getParam('key'); //获取指定参数
           $params = $event->getParams();  //获取所有参数
       },
       ...
   ],
  • 类public方法或类静态方法方式

事件处理为无需参数可实例化可以调用的方法或可以调用的静态方法,注入的参数为$event

       'listener' => [
           ServerEvent::BEFORE_ADDED_EVENT => [
                'EventHandlerController@action',
                'EventHandlerController@staticAction',
                ...
           ],
           ...
       ], 
  • 可使用函数方法调用的类

该类需定义魔术方法 __invoke()里处理事件逻辑

       'listener' => [
           ServerEvent::BEFORE_ADDED_EVENT => 'EventHandlerController',
           ...
       ], 
  • 事件处理处理类

该类需要继承 EventHandlerInterface

       'listener' => [
           ServerEvent::BEFORE_ADDED_EVENT => 'EventHandlerController',
           ...
       ], 

exceptionHandler配置

exceptionHandler配置为键为指定异常值为异常处理类的数组

指定异常

可定义的异常

异常处理类

需要实现ExceptionHandlerInterface接口

配置自定义异常处理类

   'exceptionHandler' => [
       'SwozrException' => 'SwozrExceptionHandler',
       'ServerException' => 'ServerExceptionHandler',
       ...
   ]

crontabs配置

项目有定时业务需求的时候定义crontabs数组,crontabs数组为键为任务格式值为继承BaseTask的类,必须定义静态变量$cron( Crontab 表达式,支持到秒) Cron格式说明

*    *    *    *    *    *
-    -    -    -    -    -
|    |    |    |    |    |
|    |    |    |    |    +----- day of week (0 - 6) (Sunday=0)
|    |    |    |    +----- month (1 - 12)
|    |    |    +------- day of month (1 - 31)
|    |    +--------- hour (0 - 23)
|    +----------- min (0 - 59)
+------------- sec (0-59)

示例:

  • * * * * * * 表示每秒执行一次。
  • 0 * * * * * 表示每分钟的第0秒执行一次,即每分钟执行一次。
  • 0 0 * * * * 表示每小时的0分0秒执行一次,即每小时执行一次。
  • 0/10 * * * * * 表示每分钟的第0秒开始每10秒执行一次。
  • 10-20 * * * * * 表示每分钟的第10-20秒执行一次。
  • 10,20,30 * * * * * 表示每分钟的第10,20,30秒各执行一次。

配置定时任务

    'crontabs' => [
        CrontabTaskHandle::class,
        ...
    ]

配置rabbmitMqs任务

需要装php扩展:amqp

配置项

* 为必须配置项,可以配置多项(以多维数组的形式)

  • calss 执行rabbmitMq任务的类,必须继承 BaseTask 且实现TaskConsume接口
  • *host rabbmitMq地址
  • port rabbmitMq端口,默认值5672
  • username rabbmitMq用户名,默认值guest
  • password rabbmitMq密码,默认值guest
  • *exchange_name rabbmitMq交换机名称
  • *queue_name rabbmitMq队列名称
  • routing_key rabbmitMq路由名称

配置一项rabbmit任务

    'rabbmitMqs' => [
        'class' => MqTaskHandleOne::class,
        'host' => '192.168.99.100',
        'port' => 5672
        'username' => 'guest',
        'password' => 'guest',
        'exchange_name' => 'exchange1',
        'queue_name' => 'queue1',
        'routing_key' => 'routing1',
    ],

配置多项rabbmit任务

    'rabbmitMqs' => [
        [        
            'class' => MqTaskHandleOne::class,
             'host' => '192.168.99.100',
             'port' => 5672
             'username' => 'guest',
             'password' => 'guest',
             'exchange_name' => 'exchange1',
             'queue_name' => 'queue1',
             'routing_key' => 'routing1',
         ],
         [        
             'class' => MqTaskHandleTwo::class,
              'host' => '192.168.99.100',
              'port' => 5672
              'username' => 'guest',
              'password' => 'guest',
              'exchange_name' => 'exchange1',
              'queue_name' => 'queue2',
              'routing_key' => 'routing2',
          ],
          ...
    ],

声明任务

定义一个任务类(必须继承BaseTask 且实现TaskConsume接口,除定时任务和rabbmitMq任务的其他任务还要实现 TaskNotice接口)

use Swozr\Taskr\Server\Base\BaseTask;
use Swozr\Taskr\Server\Contract\TaskConsume;
use Swozr\Taskr\Server\Contract\TaskNotice;

class TaskHandleTest extends BaseTask implements TaskNotice,TaskConsume
{
    /**
     * 任务投递失败
     * @param array $data 投递的数据
     * @param int $delay 延迟执行毫秒数
     * @param string $failMsg 任务发布失败原因
     * @return mixed
     */
    public static function pushFailure(array $data, int $delay, string $failMsg)
    {
        // TODO: Implement pushFailure() method.
    }

    /**
     *任务已投递
     * @return mixed
     */
    public function pushed()
    {
        // TODO: Implement pushed() method.
    }

    /**
     * 消费任务
     * @return mixed
     */
    public function consume(): string
    {
        // TODO: Implement consume() method.
    }

    /**
     * 任务完成
     * @return mixed
     */
    public function finished()
    {
        // TODO: Implement finished() method.
    }
}

当配置为定时任务时需要继承静态属性进行配置$cron

    public static $cron = '0/3 * * * * *';

可调用的类方法

  • getData() 获取投递的数据
  • getTaskType() 获取投递任务类型
    • BaseTask::TYPE_ASYNC 异步任务(默认投递类型)
    • BaseTask::TYPE_DELAY 延迟任务
    • BaseTask::TYPE_CRONTAB 定时任务
  • getDelay() 获取延迟时间(当投递类型为延迟任务时)
  • getTaskId() 获取投递任务成功swoole返回的task_id
  • getTaskSign() 任务服务的标识,和taskid合成唯一任务
  • getTaskName() 获取任务名称

可实现的方法

  • setTaskName() 可设置自定义任务名称,(非必须)默认值类ShortName

需要实现的TaskNotice接口

  • pushFailure(array $data, int $delay, string $failMsg) 服务投递过程投递失败时触发
    • $data 投递的数据
    • $delay 延迟时间
    • $failMsg 投递失败原因
  • pushed() 任务投递成功时触发
  • finished() 任务消费完成触发

需要实现的实现TaskConsume接口

  • consume() 任务消费逻辑处理

任务投递

TaskTest为声明的任务类,默认投递地址0.0.0.0,端口9501

TaskTest::publish(array $data, ...$varParams)

    $data = [
        'param1' => 'val1',
        'param2' => 'val2',
    ];
    //使用默认地址0.0.0.0端口为9501
    TaskTest::publish($data);  //异步任务
    TaskTest::publish($data, 5000);  //延迟任务
    
    //自定义Taskr client投递任务(自定义地址、端口)
    $taskrClientObj = TaskrClient::getInstance([
        'host' => '0.0.0.0',
        'port' => 9501,
        'timeout' => 1
     ]);
     TaskTest::publish($data, $taskrClientObj);  //异步任务
     TaskTest::publish($data, $taskrClientObj, 5000);  //延迟任务
  • $data 需要投递的数据
  • $varParams 为可选参数
    • int $delay 如需投递延迟任务时传入(单位:ms)
    • TaskrClient $taskrClientObj 如需自定义投递地址及端口需定义发送客户端(地址端口与Taskr配置的服务地址端口需相同)

Taskr Client 发布任务的客户端

配置客户端

  • 方式一
    $taskrClientObj = TaskrClient::getInstance([
        'host' => '0.0.0.0',
        'port' => 9501,
        'timeout' => 1
     ]);
  • 方式二
    $taskrClientObj = TaskrClient::getInstance();
    $taskrClientObj->setHost('0.0.0.0');
    $taskrClientObj->setPort(9501);
    $taskrClientObj->setTimeout(1);

testing

Yii2框架使用示例

其他php框架使用方法类似

使用Taskr服务

应用组件的方式配置

return [
    // ...
    'components' => [
        // ...
        'taskr' => [
            [
                'class' => \Swozr\Taskr\Server\TaskrEngine::class,
                'host' => '0.0.0.0',
                'port' => '9501',
                'pidName' => 'swozr-taskr',
                'MQProcessMum' => 1,
                'debug' => true,
                'setting' => [
                    'worker_num' => 1,
                    'task_worker_num' => 2,
                    'daemonize' => false
                ],
                'exceptionHandler' => [
                    \Swozr\Taskr\Server\Exception\TaskException::class => \SwozrTest\Taskr\Server\ExceptionHandler\TaskExceptionHandler::class
                ],
                'crontabs' => [
                    \SwozrTest\Taskr\Server\Tasks\TaskHandleTest::class
                ],
                'listener' => [
                    \Swozr\Taskr\Server\Event\ServerEvent::TASK_PUSHED => \SwozrTest\Taskr\Server\Listener\TestHandleListener::class,
                    \Swozr\Taskr\Server\Event\ServerEvent::TASK_CONSUME => \SwozrTest\Taskr\Server\Listener\TestHandleListener::class
                ],
                'rabbmitMqs' => [
                    'class' => \SwozrTest\Taskr\Server\Tasks\TaskHandleTest::class,
                    'host' => '192.168.99.100',
                    'exchange_name' => 'a',
                    'queue_name' => 'a',
                    'routing_key' => 'c',
                ],
            ]
        ],
    ],
    // ...
];

之后你就可以通过语句 Yii::$app->taskr 来使用taskr服务了。

创建的控制台命令

  • 配置应用组件的方式
namespace app\commands;

use yii\console\Controller;

class TaskrController extends Controller
{
    private $taskr;
    
    public function init()
    {
        parent::init(); // TODO: Change the autogenerated stub
        $this->taskr = Yii::$app->taskr;
    }
    
    /**
     * start taskr server
     */
    public function actionStart()
    {
        $this->taskr->start();
    }
    
    /**
     * stop taskr server
     */
    public function actionStop()
    {
        $this->taskr->stop();
    }
    
    /**
     * taskr server status
     */
    public function actionStatus()
    {
        $this->taskr->status();
    }
  
    /**
     * @param $type 当为-o则只reload task worker进程,默认重启所有
     * reload taskr server
     */    
    public function actionReload($type = '')
    {
        $onlyTaskWorker = '-o' == $type ? true : false;
        $this->taskr->reload();
    }  
    
    /**
     * restart taskr server
     */
    public function actionRestart()
    {
        $this->taskr->restart();
    }  
}
  • 直接类调用的方式
namespace app\commands;

use yii\console\Controller;
use Swozr\Taskr\Server\TaskrEngine;

class TaskrController extends Controller
{
    private $taskr;
    
    public function init()
    {
        parent::init(); // TODO: Change the autogenerated stub
        $this->taskr = new TaskrEngine([
            'host' => '0.0.0.0',
            'port' => '9501',
            'pidName' => 'swozr-taskr',
            'MQProcessMum' => 1,
            'debug' => true,
            'setting' => [
                'worker_num' => 1,
                'task_worker_num' => 2,
                'daemonize' => false
            ],
            'exceptionHandler' => [
                \Swozr\Taskr\Server\Exception\TaskException::class => \SwozrTest\Taskr\Server\ExceptionHandler\TaskExceptionHandler::class
            ],
            'crontabs' => [
                \SwozrTest\Taskr\Server\Tasks\TaskHandleTest::class
            ],
            'listener' => [
                \Swozr\Taskr\Server\Event\ServerEvent::TASK_PUSHED => \SwozrTest\Taskr\Server\Listener\TestHandleListener::class,
                \Swozr\Taskr\Server\Event\ServerEvent::TASK_CONSUME => \SwozrTest\Taskr\Server\Listener\TestHandleListener::class
            ],
            'rabbmitMqs' => [
                'class' => \SwozrTest\Taskr\Server\Tasks\TaskHandleTest::class,
                'host' => '192.168.99.100',
                'exchange_name' => 'a',
                'queue_name' => 'a',
                'routing_key' => 'c',
            ],
        ]);
    }
    
    /**
     * start taskr server
     */
    public function actionStart()
    {
        $this->taskr->start();
    }
    
    /**
     * stop taskr server
     */
    public function actionStop()
    {
        $this->taskr->stop();
    }
    
    /**
     * taskr server status
     */
    public function actionStatus()
    {
        $this->taskr->status();
    }
  
    /**
     * @param $type 当为-o则只reload task worker进程,默认重启所有
     * reload taskr server
     */    
    public function actionReload($type = '')
    {
        $onlyTaskWorker = '-o' == $type ? true : false;
        $this->taskr->reload();
    }  
    
    /**
     * restart taskr server
     */
    public function actionRestart()
    {
        $this->taskr->restart();
    }  
}

运行该命令

  • 开启taskr服务
    php yii taskr/start
  • 停止taskr服务
    php yii taskr/stop
  • 查看taskr服务状态
    php yii taskr/status
  • reload taskr服务
    php yii taskr/reload
    
    //只reload task worker进程
    php yii taskr/reload -o
  • 重启taskr服务
    php yii taskr/restart