qingmutec / monitor
swoole process manager for rabbitMQ
v2.1.12
2022-09-20 06:48 UTC
Requires
- php: >=7.2
- easyswoole/swoole-ide-helper: ^1.3
- php-amqplib/php-amqplib: 3.1.2
README
介绍
MQ监听项目 本项目遵循MIT开源项目协定,使用MIT协议的开源软件作者只保留版权,而对使用者无任何其它限制。
软件架构
软件架构说明
安装教程
composer地址:https://packagist.org/packages/qingmutec/monitor?query=qingmutec
composer命令: swoole process manager for rabbitMQ
其它说明:
- php版本>=7.2。
- swoole版本>v4.5.3 (swoole版本至少也要v4.5.4,本项目用于生产环境的swoole版本=4.8.9)
使用说明
- 配置文件自行在项目中或下方拷贝一份,放在任意位置,在调用时传入配置数组(必须)。
- 本项目一共分成两部分,分别为多进程消费和生产者。
- 多进程消费使用cli模式启动,可以随意整合到框架自带的Command命令或自行调用。后台任务可以使用nohup命令。
- 生产端注意在配置文件中的queue_run_right_now参数使用就好,为true时将立即执行不会经过队列,主要用来调试。
- 注意env文件中的true和false等配置参数转入config时的值类型转换。false和"false"是不一样的。
- 后台运行:nohup command & | nohup ./command >test.log& | nohup ./command >>test.log&
说明:命令执行的结果保存到test.log中 ">"表示覆盖原文件内容(文件的日期也会自动更新),">>"表示追加内容(会另起一行,文件的日期也会自动更新)
- 当前只支持processManager模式使用多进程,在此进程模式下消息必须是消息消费确认模式,所以"no_ack"配置必须=false。否则将不能启动项目。
- 当前项目已经默认将"php-amqplib"添加为依赖所以可以不用单独为其设置依赖
- v1.3中根目录会有相应的.sh文件可以直接使用。
- 在框架中可使用框架自带的Command命令去调用\qmmonitor\command中相应的方法完成对接。
- 如new \qmmonitor\comman(env('app.app_name'));需要传入当前项目名称会被当作当前进程标识。
- 初始状态=free,只要有消息进来activity和done之间切换,如果接收到退出信息时状态会变成stopped。
- 内部添加了对messageId的支持,在对接阿里时rabbitmq时必须使用此属性
- 接收到退出信息、重试超限执行回调前会强制输出告警提示。
- JobArguments类中会携带一些必要参数转发到业务实现子类Job中,基类是:AbstractJob
- 配置按队列维度级别可设置timeout参数(单位秒,默认值=0),意味着在消费消息时阻塞多少秒,超出将重新连接。
接入示例
使用框架自带的Command单独创建一个命令类,再加入以下:
* @var \qmmonitor\command\Command
*/
private $command = null;
/**
* 配置信息
* @var array
*/
private $config = [];
public function __construct()
{
$this->command = new Command(env('app.app_name'));//传入当前项目名称,,会被当作进程标识
$this->config = config('queue');
}
/**
* 测试执行函数
* @return array
*/
public function run()
{
//启动
$this->command->start($this->config);
}
/**
* 重启队列
*/
public function restart()
{
$this->command->restart($this->config);
}
/**
* 结束队列
*/
public function stop()
{
$this->command->stop($this->config);
}
/**
* 结束队列
*/
public function reload()
{
$this->command->reload($this->config);
}
/**
* 获取队列运行状态
*/
public function list()
{
$list = $this->command->list();
$outputStr = '';
foreach ($list as $item) {
$outputStr .= $item . "\r\n";
}
exit(Color::blue($outputStr));
}
生产者代码如下:
public function put()
{
try {
$queueComponent = new QueueComponent();
$queueComponent->setExchange('direct_goods_exchange')
->setQueue('direct_goods_test_queue')
->setProducerConfirm(true)
->setRouteKey('goods_test');
//或者使用RabbitMqQueueArguments对象设置属性后传入第二个参数中
$queueComponent->put('我是一个测试消息', null, $this->config);
} catch (\Throwable $exception) {
var_dump($exception->getMessage());
}
}