herosphp/crontab

crontab adapter herosphp Framework

v1.0.0 2022-09-19 00:20 UTC

This package is auto-updated.

Last update: 2024-12-22 05:52:04 UTC


README

定时任务调度

install

    composer install herosphp/crontab

usage

config/process.config.php

 return [
    //仅能1个进程
    'crontab' => [
        'enable' => true,
        'handler' => CrontabWorker::class
    ],

    //大量任务的时候,通过投递到异步任务完成
    'async_worker' => [
        'enable' => true,
        'listen' => 'tcp://127.0.0.1:8182',
        'handler' => AsyncTaskWorker::class,
        'count' => 1
    ],
];

CrontabWorker

<?php

declare(strict_types=1);
/**
 * This file is part of Heros-Worker.
 *
 * @contact  chenzf@pvc123.com
 */

namespace process;

use Exception;
use herosphp\plugin\crontab\Crontab;
use herosphp\utils\Lock;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Worker;

/**
 * 采用异步通知消息来完成定时任务
 * 主要解决是如果定时任务定义间隙过短、任务执行过久,导致部分任务跳过。
 * 参考[http://doc.workerman.net/faq/async-task.html].
 */
class CrontabWorker
{
    // 定时任务列表 memo:定时任务的备注,该属性为可选属性,没有任何逻辑上的意义,仅供开发人员查阅帮助对该定时任务的理解。
    protected static array $cronList = [
       [
           'rule' => '* * * * * *',  //支持秒
           'task' => [AsyncTask::class, 'run'],  //处理类和执行方法
           'memo' => 'say Hello'  //备注可选
       ]
    ];

    /**
     * @param Worker $worker
     * @return void
     */
    public function onWorkerStart(Worker $worker): void
    {
        foreach (static::$cronList ?? [] as $cron) {
            new Crontab($cron['rule'], static function () use ($cron) {
                static::delivery($cron['task'][0], $cron['task'][1], $cron['memo']);
            });
        }
    }

    /**
     * 投递到异步进程. 一个定时任务执行比较久,间隔设置时间比较短,加锁。一个任务一个时刻仅有一个运行.
     *
     * @throws Exception
     */
    private static function delivery(string $clazz, string $method, string $memo): void
    {
        $lock = Lock::get("{$clazz}{$method}");
        if ($lock->tryLock()) {
            $taskConnection = new AsyncTcpConnection('tcp://127.0.0.1:8182');
            $taskConnection->send(json_encode(['clazz' => $clazz, 'method' => $method]));
            $taskConnection->onMessage = function (AsyncTcpConnection $asyncTcpConnection, $taskResult) use ($lock) {
                $asyncTcpConnection->close();
                $lock->unlock();
            };
            $taskConnection->connect();
        }
    }
}

AsyncTaskWorker

<?php
declare(strict_types=1);
namespace process;

use Workerman\Connection\TcpConnection;

class AsyncTaskWorker
{
    public function onMessage(TcpConnection $connection, string $data): void
    {
        $class = json_decode($data, true);
        if (isset($class['clazz'], $class['method']) && class_exists($class['clazz']) && method_exists($class['clazz'], $class['method'])) {
            call_user_func([new $class['clazz'](), $class['method']]);
        }
        $connection->send('ok');
    }
}