mix / worker-pool
Swoole-based worker pool, coroutine pool
v3.0.1
2021-07-21 01:53 UTC
Requires
- php: >=7.0.0
- ext-swoole: >=4.4.4
Requires (Dev)
- phpunit/phpunit: ^7.0.0
README
OpenMix 出品:https://openmix.org
Mix Worker Pool
Swoole-based worker pool, coroutine pool
基于 Swoole 的工作池,协程池
Installation
composer require mix/worker-pool
单次调度
- 如果不想阻塞执行,可以使用
$pool->start()
启动
$jobQueue = new Swoole\Coroutine\Channel(200); $maxWorkers = 100; $handler = function ($data) { // do something }; $pool = new Mix\WorkerPool\WorkerPool($jobQueue, $maxWorkers, $handler); go(function () use ($jobQueue, $pool) { // 投放任务 for ($i = 0; $i < 1000; $i++) { $jobQueue->push($i); } // 停止 $pool->stop(); }); $pool->run(); // 阻塞等待
上面是采用闭包处理任务,也可以使用对象处理任务
class FooHandler implements \Mix\WorkerPool\RunInterface { public function do($data): void { // do something } } $pool = new Mix\WorkerPool\WorkerPool($jobQueue, $maxWorkers, new FooHandler());
常驻调度
适合处理 MQ 队列的异步消费
以 Redis 作为 MQ 为例:
$maxWorkers = 20; $maxQueue = 10; $jobQueue = new Swoole\Coroutine\Channel($maxQueue); $handler = function ($data) { // do something }; $pool = new Mix\WorkerPool\WorkerPool($jobQueue, $maxWorkers, $handler); $quit = new Swoole\Coroutine\Channel(); foreach ([SIGHUP, SIGINT, SIGTERM] as $signal) { Swoole\Process::signal($signal, function () use ($quit) { $quit->push(true); }); } go(function () use ($jobQueue, $pool, $quit) { // 投放任务 while (true) { if (!$quit->isEmpty()) { $pool->stop(); return; } try { $data = $redis->brPop(['test'], 1); } catch (\Throwable $ex) { // print log $pool->stop(); return; } if (!$data) { continue; } $data = array_pop($data); // brPop命令最后一个键才是值 $jobQueue->push($data); } }); $pool->run(); // 阻塞等待
异常处理
闭包或者对象 do
方法中执行的代码,可能会抛出异常,必须要使用 try/catch
避免协程退出
class FooHandler implements \Mix\WorkerPool\RunInterface { public function do($data): void { try { // do something } catch (\Throwable $ex){ // print log } } }
License
Apache License Version 2.0, http://www.apache.org/licenses/