bang / queue
依赖workerman环境和redis环境的任务队列插件
Installs: 13
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 0
Open Issues: 0
pkg:composer/bang/queue
Requires
- php: >=8.1
- topthink/framework: ^8.0
- workerman/redis-queue: ^1.2
This package is auto-updated.
Last update: 2025-10-10 21:17:17 UTC
README
描述
基于 workerman + thinkphp8 + redis,实现的一个任务队列
安装
composer require bang/queue
配置
配置文件位于
config/bang_queue.php
配置说明
仅支持Redis驱动,没有Redis请自行安装
[
'default' => 'default',//默认连接名
'count' => 8,//进程数
'isDynamic' => true,//是否进行动态监听。慎用,有一定的性能消耗。而且redis请做好持久化
'processPort' => 9500,//isDynamic为true。需要用一个端口来监听进程
'loggerChannel' => 'file',//记录日志的通道名称。具体请看thinkphp8手册的日志处理
//通用配置
'config' => [
'path' => 'workerman_log/queue',//存储日志的目录
'pidFileName' => 'workerman.pid',//日志文件
'stdoutFile' => 'stdout.pid',//stdoutFile文件
'logFile' => 'log.log',//日志文件
],
'options' => [
'default' => [
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 2,
'prefix' => '',
'max_attempts' => 5, // 消费失败后,重试次数
'retry_seconds' => 10, // 重试间隔,单位秒
]
],
//这里可以定义 队列名 与 消费队列 的映射,优先级最高,动态监听也无法去修改
'listenList' => [
//示例格式为
//'queue1' => \app\queue\test\Consume::class
]
];
配置解释
default, 对应options下的连接配置
count,开启的消费者的进程数,按照自己服务器配置来衡量
isDynamic,是否需要动态监听。懒人必备,这样不必去配置listenList了。有一定的性能消耗
processPort,开启动态监听时,需要启动一个端口来监听。请确保你设置的端口没被占用。默认是9500
loggerChannel,日志通道。具体请看thinkphp8手册的日志处理。留空则获取它默认的配置通道
config,是workerman的一个日志、pid、stdoutFile的一些配置。一般来说不需要做过多的修改
pidFileName,pid文件名path,这些配置存储的目录。相对于项目的根目录stdoutFile,stdoutFile文件名logFile,日志文件名
options,连接列表,里面可以配置各种连接,不过这些必须都是Redis的配置
host,Redis链接port,端口password,密码。如没设置,留空即可select,select 的 database。可以有效避免和别的业务冲突prefix,前缀,一般留空即可max_attempts,消费失败后,重试次数retry_seconds,重试间隔,单位秒。
重试说明
重试次数由 `max_attempts` 控制,重试间隔由 `retry_seconds` 和 `max_attempts` 共同控制。 比如`max_attempts`为5,`retry_seconds`为10。 第1次重试间隔为1*10秒; 第2次重试时间间隔为2*10秒; 第3次重试时间间隔为3*10秒; . . . 以此类推
创建任务类
任务类 必须
implementsbang\queue\Consumer
以下写一个例子
<?php declare (strict_types=1); namespace app\queue\test; use bang\queue\Consumer; class Consume implements Consumer { /** * 队列名称 * @var string */ public static string $queue = 'queue1'; /** * 使用的链接 * @var string */ public static string $connection = 'default'; /** * 消费逻辑 * @param $data * @return void * @throws \Throwable */ public function consume($data) { throw new \Error('抛出异常就会重新执行队列,直到重试次数'); } /** * 失败回调逻辑 * @param $data * @return void * @throws \Throwable */ public function onConsumeFailure(\Throwable $e, $package) { //这里是每一次执行失败时会调用,你可以写任何的邮件、短信等等通知 //$package 包含了 attempts 已经重试的次数。data,生产者发布的数据。error,是错误信息。 } }
发布任务
第一个,
bang\queue\Queue::send($queueName , $data, $delay = 0)第二个,
bang\queue\Queue::sendDynamic($class , $data, $delay = 0)
第一个命令说明
$queueName,队列名,优先获取 配置中的listenList的映射$data,需要传输到消费者中的参数$delay,意思在n秒后执行,传0或不传,就是立即执行
第一个命令说明
$queueName,具体的消费者类路径,例子:可以传app\queue\test\Consume或者app\queue\test\Consume::class。用这种方法,消费者类最好定义好static $queue的队列名$data,需要传输到消费者中的参数$delay,意思在n秒后执行,传0或不传,就是立即执行
监听任务并执行
&> php think bang_queue:work start
如果需要进程常驻,则可以运行 &> php think bang_queue:work start -d true
两种,具体的可选参数可以输入命令加 --help 查看
注意事项
为了在长时间中,mysql有断开的风险。建议在
database.php配置文件中break_reconnect设置成true。也就是支持断线重连功能。
有问题请联系 邮箱:
1002335644@qq.com