mingyuanyun/yii2-iqueue

该消息组件提供统一的消息队列客户端编程模型,支持消息加密,并提供可追踪的消息日志(与ilog结合),性能数据和消息队列的监控功能,同时允许其它消息队列无缝适配,允许自定义加解密。目前已支持rabbitMQ,阿里MNS

v1.2.16 2023-11-01 01:57 UTC

README

提供统一的消息队列客户端编程模型,支持消息加密,并提供可追踪的消息日志(与ilog结合),性能数据和消息队列的监控功能,同时允许其它消息队列无缝适配,允许自定义加解密。目前已支持rabbitMQ,阿里MNS。

安装方法

composer require mingyuanyun/yii2-iqueue

配置

RabbitMQ

在配置文件中components中添加iQueue组件基础配置

'iQueue' => [
    'class' => 'iQueue\MessageQueue',
    'type' => 'rabbitmq',
    'config' => [
        'host' => '127.0.0.1',
        'port' => '5672',
        'user' => 'admin',
        'password' => '123',
        'vhost' => '/',
        'prefix' => YII_ENV . '-gcxt-',
        'allowQueues' => ['smscode','sync-room','*vanke*'],
        'heartbeat' => 70,
        'read_write_timeout' => 141,
        'yii_command_dir'=>dirname(dirname(__DIR__)),
        'send_timeout'=>5,
        'send_retries'=>3
    ],
],

iQueue其他配置(按需添加)

'iQueue' => [
    ...基础配置...
    'cryptInterface' => '\iQueue\security\AES256crypt',
    'transformInterface' => 'MessageTransformer',
    'monitor' => 'http://127.0.0.1:15672',
    'warning' => 0
]
属性含义说明
type消息队列类型必须
host主机名必须
port端口必须
user用户名必须
password密码必须
vhost虚拟主机必须
prefix队列前缀必须
allowQueues队列白名单必须, 不在该白名单中的队列不允许使用,支持*通配符
cryptInterface加密类可选,不需要加密时不需要该项或空,如要自定义加密,实现接口\iQueue\interfaces\CryptInterface
transformInterface消息转换类可选,不需要消息转换时不需要该项或空,如要自定义消息转换,实现接口\iQueue\interfaces\TransformInterface
monitor监控接口地址可选,需要监控时须提供,一般为地址:http(s)://host:15672
warning队列预警消息条数阀值可选,用于接入监控,当队列中的待消费消息条数达到设定的值时触发预警,为0时表示不预警,具体参考下面的接入预警,支持*通配符
heartbeat消费端心跳检测选填,默认60秒
read_write_timeout读写超时描述选填,默认130秒,这个值必须比heartbeat*2大
yii_command_diryii文件所处位置选填,默认值:dirname(\Yii::$app->basePath),当发消息需要设置超时时间时需要配置这个地址
send_timeout发送消息超时时间选填,默认为0
send_retries发送消息重试次数选填,默认为3次,最大10次

阿里MNS

在配置文件中components中添加iQueue组件基础配置

'iQueue' => [
    'class' => 'iQueue\MessageQueue',
    'type' => 'alimns',
    'config' => [
        'accessId' => 'accessId',
        'accessKey' => 'accessKey',
        'endPoint' => 'https://1116833840389958.mns.cn-hangzhou.aliyuncs.com/',
        'prefix' => YII_ENV . '_test_',
        'allowQueues' => ['smscode','sync-room','*vanke*'],
        'send_timeout'=>35,
        'send_retries'=>3
    ],
],

iQueue其他配置(按需添加)

'iQueue' => [
    ...基础配置...
    'cryptInterface' => '\iQueue\security\AES256crypt',
    'transformInterface' => 'MessageTransformer',
    'warning' => 0
]
属性含义说明
type消息队列类型必须
accessId帐号id必须
accessKey帐号key必须
endPoint接入点必须
prefix队列前缀必须
allowQueues队列白名单必须, 不在该白名单中的队列不允许使用,支持*通配符
cryptInterface加密类可选,不需要加密时不需要该项或空,如要自定义加密,实现接口\iQueue\interfaces\CryptInterface
transformInterface消息转换类可选,不需要消息转换时不需要该项或空,如要自定义消息转换,实现接口\iQueue\interfaces\TransformInterface
warning队列预警消息条数阀值可选,用于接入监控,当队列中的待消费消息条数达到设定的值时触发预警,为0时表示不预警,具体参考下面的接入预警,支持*通配符
send_retries发送消息重试次数选填,默认为3次,最大10次
send_timeout发送消息超时时间选填,默认为35秒(忽略建立连接时间),加上重试次数n,最大超时时间为35*n

发送消息实例

public function actionSendString()
{
    // 生产消息:不指定消息的处理action和延时的秒数
    $message = 'helloworld';
    \yii::$app->iQueue->createClient('demo')->sendMsg($message, function($msgBody) {
        $result = [
            'messageId' => $msgBody->getMessageId(),
            'message' => $msgBody->getMessage()
        ];
        var_dump($result);
    });

    // 生产消息:指定消息的处理action或延时的秒数
    $message = 'helloworld';
    $queueMsg = new IQueueMessage($message);
    $queueMsg->setAction('receive-new/test');
    $queueMsg->setDelaySeconds(5);
    \yii::$app->iQueue->createClient('demo1')->sendMsg($queueMsg, function($msgBody) {
        $result = [
            'messageId' => $msgBody->getMessageId(),
            'message' => $msgBody->getMessage()
        ];
        var_dump($result);
    });
}
public function actionSendObject()
{
    // 生产消息:不指定消息的处理action和延时的秒数
    $message = ['name' => '张三', 'age' => 23, 'teacher' => ['name' => '李静', 'age' => 44]];
    \yii::$app->iQueue->createClient('demo')->sendMsg($message, function($msgBody) {
        $result = [
            'message' => $msgBody->getMessage(),
            'messageId' => $msgBody->getMessageId(),
        ];
        var_dump($result);
    });

    // 生产消息:指定消息的处理action或延时的秒数
    $message = ['name' => '张三', 'age' => 23, 'teacher' => ['name' => '李静', 'age' => 44]];
    $message = new IQueueMessage($message);
    $message->setAction('receive-new/test');
    $message->setDelaySeconds(10);
    \yii::$app->iQueue->createClient('demo1')->sendMsg($message, function($msgBody) {
        $result = [
            'message' => $msgBody->getMessage(),
            'messageId' => $msgBody->getMessageId(),
        ];
        var_dump($result);
    });
}

消费端实现-推荐(指定action处理队列消息,参见demo文件夹下面的ReceiveController)

定义一个consoleController继承 \iQueue\controllers\ShellConsumeController,基类主要代码如下:

class ShellConsumeController extends \yii\console\Controller
{
    // 如果生产消息时没有指定action,需要在子类的Controller中指定$queueAction,否则消息无法消费
    protected $queueAction;

    /**
     * 接收消息并调用业务处理
     * @param array $messageBody
     */
    protected function successed(\iQueue\MessageBody $messageBody)
    {
        // shell脚本执行
        .......
    }

    /**
     * 定义yii路径,可根据需要重写
     * @return string
     */
    protected function getYii()
    {
        return dirname(dirname(dirname(dirname(dirname(__FILE__))))) . "/yii";
    }
}

子类的代码示例如下:


class ReceiveController extends \iQueue\controllers\ShellConsumeController
{
    protected $queueAction = 'receive/test';

    // protected function successed(\iQueue\MessageBody $messageBody)
    // {
    // }
}

基类的successed方法已经实现了shell执行脚本,如果有特殊要求可重写该方法

然后配置守护进程执行 php yii receive/consume 队列名A 即:对队列名A中的消息使用actionXXX进行消费处理

消费端实现(原单进程,参照demo文件夹下的SmsController,注意该模式下业务代码更新后必须重启进程,否则无法更新;另外对于由于始终在一个进程下运行,使用方要对全局变量谨慎,否则可能存在占用大量内存的情况)

定义一个consoleController继承 \iQueue\controllers\ConsumeController 实现方法successed即可

class SmsController extends \iQueue\controllers\ConsumeController
{
    protected function successed(\iQueue\MessageBody $messageBody)
    {
        $result = [
            'messageId' => $msgBody->getMessageId(),
            'message' => $msgBody->getMessage()
        ];
        var_dump($result);
    }
}

然后配置守护进程执行 php yii sms/consume 队列名A 即:对队列名A中的消息使用SMS进行业务处理消费

发送消息增加超时时间

发消息方法增加超时设置

$message = ['name' => '张三', 'age' => 23, 'teacher' => ['name' => '李静', 'age' => 44]];
$message = new IQueueMessage($message);
$message->setAction('/receive-new/test');
$message->setDelaySeconds(10);
\yii::$app->iQueue->createClient('demo1')->sendMsg($message, function($msgBody) {
    $result = [
        'message' => $msgBody->getMessage(),
        'messageId' => $msgBody->getMessageId(),
    ];
}, null, 5);// 消息发送超时5秒抛异常结束

注意: 当使用 rabbitmq 渠道时需完成以下两项配置:

1、在命令行应用配置增加modules配置:

'iqueue' => [
    'class' => 'iQueue\Module',
]

2、配置yii文件路径

'iQueue' => [
    'class' => 'iQueue\MessageQueue',
    'type' => 'rabbitmq',
    'config' => [
        ...基础配置...
        'yii_command_dir'=>dirname(dirname(__DIR__))
    ],
],
属性含义说明
yii_command_diryii文件所处位置选填,默认值:dirname(\Yii::$app->basePath),当发消息需要设置超时时间时需要配置这个地址

高级功能

注意事项

1、队列与消费端是强绑定关系,意味着一个队列只能存放一种业务消息,调用方必须保证所入队列正确,故为了方便记忆,建议消费端controller定义与队列名相同(去除分隔符)

2、使用RabbitMQ的延时功能时,需要在服务器安装并启用rabbitmq_delayed_message_exchange插件,请参照https://confluence.mysre.cn/pages/viewpage.action?pageId=10926446

具体Demo可参考代码包中的Demo