busyphp / queue
基于ThinkPHP官方改进的一个消息队列服务,它支持消息队列的一些基本特性,发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制,多队列,内存限制 ,启动,停止,守护等
v3.0.0
2023-06-10 08:54 UTC
Requires
- php: >=8.0.0
- ext-json: *
- busyphp/busyphp: ^7.0
- nesbot/carbon: ^2.16
- symfony/process: ^6.0
Requires (Dev)
- symfony/var-dumper: ^4.2
README
安装方式
composer require busyphp/queue
安装完成后可以通过后台管理 > 开发模式 > 插件管理进行
安装/卸载/管理
命令行
cd
到到项目根目录下执行
queue:work
命令
该命令将启动一个 work 进程来处理消息队列
php think queue:work
queue:listen
命令
listen命令所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后,listen命令所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程
php think queue:listen
queue:failed
列出所有失败的任务
php think queue:failed
queue:flush
刷新所有失败的任务
php think queue:flush
queue:forget
强制执行一条失败的任务
php think queue:forget id 1(失败任务ID)
queue:retry
将一批失败的任务进行重试
php think queue:forget id 1,2,3
queue:restart
重启进程
php think queue:restart
配置 config/busy-queue.php
<?php return [ // 默认驱动 'default' => 'sync', // 队列驱动配置 'connections' => [ // 同步驱动 'sync' => [ 'type' => 'sync', ], // 数据库驱动 'database' => [ 'type' => 'database', 'queue' => 'default', ], // redis驱动 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false, ], // 更多驱动... ], // 任务失败 'failed' => [ 'type' => 'database', ], ];
创建任务类
<?php use BusyPHP\queue\contract\JobFailedInterface; use BusyPHP\queue\contract\JobInterface; use BusyPHP\queue\Job; class TestJob implements JobInterface, JobFailedInterface { /** * 执行任务 * @param Job $job 任务对象 * @param mixed $data 发布任务时自定义的数据 */ public function fire(Job $job, $data) : void { //....这里执行具体的任务 if ($job->attempts() > 3) { //通过这个方法可以检查这个任务已经重试了几次了 } //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法 $job->delete(); // 也可以重新发布这个任务 $job->release($delay); //$delay为延迟时间 } /** * 执行任务达到最大重试次数后失败 * @param mixed $data 发布任务时自定义的数据 * @param Throwable $e 异常 */ public function failed($data, Throwable $e) : void { // ...任务达到最大重试次数后,失败了 } }
发布任务
<?php // 发布一条任务到队列中 \BusyPHP\queue\facade\Queue::push($job, $data); // 发布一条延迟执行的任务到队列中 \BusyPHP\queue\facade\Queue::later(10, $job, $data); // 向 database 队列连接器中发布一条任务 \BusyPHP\queue\facade\Queue::connection('database')->push($job, $data); // 向 redis 队列连接器中发布一条任务 \BusyPHP\queue\facade\Queue::connection('redis')->push($job, $data);