xd/queue-consumers-manager

1.2 2016-06-21 06:49 UTC

This package is auto-updated.

Last update: 2024-09-04 19:58:52 UTC


README

一个简单的队列消费者进程管理轮子,主要为了解决平滑重启、平滑关闭、进程自动平滑重启等问题

原理

本想用信号量来控制进程,实测此方案会出现业务闪断(在收到信号量时,php会断掉当前正在阻塞的语句,可能会导致调用api或执行sql等阻塞操作的中断),因此本项目采用了文件的方式来控制进程。进程自重启是为了防止进程内存泄露的问题,您可以配置一个进程最多消费多少次队列消息后重启,进程会记录处理消费数量,达到最大值便会重新拉起一个进程并退出当前进程,由于新拉起的进程是直接使用的exec函数,且直接执行php命令,因此需要开放exec函数且php已存在于环境变量中

依赖

php >= 5.4 (理论上是>=5.3即可,我没有此环境且版本太老就没验证)
posix 扩展 此扩展不是必须的,只是在记录运行日志时会详细到某一个pid

安装

composer地址:https://packagist.org/packages/xd/queue-consumers-manager

权限

由于采用了文件方式来控制进程,所以需要给目录data读写权限

示例

  1. 创建一个消费者

    class myConsumer extends \Xd\QueueConsumersManager\Consumer
    {
        //开始运行,当队列开始后会调用此方法
        public function run()
        {
            //模拟获取,消费队列消息
            while(true) {
                sleep(2);//模拟处理业务的耗时处理
                \Xd\QueueConsumersManager\Manager::noticeFetchedQueueMsg();
            }
        }
    
        /**
         * 关闭consumer
         */
        public function shutdown()
        {
            //关闭连接等处理
        }
    }
    
    $myConsumer = new myConsumer();
    
    $consumerManager = \Xd\QueueConsumersManager\Manager::getInstance($myConsumer);
    $consumerManager->receivedMax = 1000;//设置每个consumer最多请求多少次消息后重启,默认为1000
    $consumerManager->run();
    
    
  2. 启动
    执行对应vendor包中的start.sh, 第三个参数为消费者脚本路径(目前只支持绝对路径),第四个参数为开启的进程个数,运行后中途可以随意再增加,运行日志的名称为consumer脚本的文件名+.log,位于/var/log目录下

    sh vendor/xd/queue-consumers-manager/bins/start.sh /www/queue/test.php 10
    

    输出

     consumer运行日志文件:/var/log/test.php.log
     正在启动第1个进程...
     正在启动第2个进程...
     正在启动第3个进程...
     正在启动第4个进程...
     正在启动第5个进程...
     正在启动第6个进程...
     正在启动第7个进程...
     正在启动第8个进程...
     正在启动第9个进程...
     正在启动第10个进程...
     启动完成
    
  3. 查看运行状态
    执行对应vendor包中的status.sh, 第三个参数为消费者脚本路径(目前只支持绝对路径)

    sh vendor/xd/queue-consumers-manager/bins/status.sh /www/queue/test.php
    

    输出

     当前共有 10个 consumer运行,如果下面展示的consumer行数在多次查看状态下一直少于此数量,则有可能存在consumer阻塞死了
     状态说明:第一列:开始时间,第二列:运行时间,第三列:请求队列数次,第四列:使用内存,第五列:内存峰值
     正在等待进程反馈状态,获取到结果后会自动依次展示在下文,请稍候...
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:19 | 2分钟   | 87次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:21 | 2分钟   | 86次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:13 | 3分钟   | 90次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:17 | 2分钟   | 88次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:15 | 2分钟   | 89次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:22 | 2分钟   | 86次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:20 | 2分钟   | 87次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:14 | 3分钟   | 90次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:16 | 2分钟   | 89次            | 853.9 KB | 865.9 KB |
    
     --------------------------------------------------------------------------
     | 2016-05-30 10:45:18 | 2分钟   | 88次            | 853.9 KB | 865.9 KB |
    
  4. 关闭
    执行对应vendor包中的shutdown.sh, 第三个参数为消费者脚本路径(目前只支持绝对路径)

    sh vendor/xd/queue-consumers-manager/bins/shutdown.sh /www/queue/test.php
    

    输出

    还有10个进程等待关闭,请稍候...
    还有5个进程等待关闭,请稍候...
    已完成关闭!