mingyuanyun / yii2-iqueue
该消息组件提供统一的消息队列客户端编程模型,支持消息加密,并提供可追踪的消息日志(与ilog结合),性能数据和消息队列的监控功能,同时允许其它消息队列无缝适配,允许自定义加解密。目前已支持rabbitMQ,阿里MNS
Requires
- php: >=5.6.3
- mingyuanyun/yii2-ilog: >=1.4.8
- php-amqplib/php-amqplib: >=2.10
- symfony/process: ^3.4|^4.0
- v1.2.15
- v1.2.14
- v1.2.13
- v1.2.12
- v1.2.11
- v1.2.10
- v1.2.8
- v1.2.7
- v1.2.6
- v1.2.5
- v1.2.4
- v1.2.3
- v1.2.2
- v1.2.1
- v1.2.0
- v1.1.211124
- v1.1.210720
- dev-master / 1.0.x-dev
- v1.0.210402
- v1.0.210312
- v1.0.200519
- v1.0.191202
- v1.0.190712
- v1.0.190624
- v1.0.190603
- v1.0.190529
- v1.0.190524
- v1.0.190513
- 1.0
- dev-feat-xlog-id
- dev-feat-mns-delay-seconds
- dev-feature-msg-transform
- dev-feature-jms-format
- dev-sp-dev
This package is not auto-updated.
Last update: 2024-10-30 11:30:06 UTC
README
提供统一的消息队列客户端编程模型,支持消息加密,并提供可追踪的消息日志(与ilog结合),性能数据和消息队列的监控功能,同时允许其它消息队列无缝适配,允许自定义加解密。目前已支持rabbitMQ,阿里MNS。
安装方法
composer require mingyuanyun/yii2-iqueue
配置
RabbitMQ
在配置文件中components中添加iQueue组件基础配置
'iQueue' => [
'class' => 'iQueue\MessageQueue',
'type' => 'rabbitmq',
'config' => [
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'admin',
'password' => '123',
'vhost' => '/',
'prefix' => YII_ENV . '-gcxt-',
'allowQueues' => ['smscode','sync-room','*vanke*'],
'heartbeat' => 70,
'read_write_timeout' => 141,
'yii_command_dir'=>dirname(dirname(__DIR__)),
'send_timeout'=>5,
'send_retries'=>3
],
],
iQueue其他配置(按需添加)
'iQueue' => [
...基础配置...
'cryptInterface' => '\iQueue\security\AES256crypt',
'transformInterface' => 'MessageTransformer',
'monitor' => 'http://127.0.0.1:15672',
'warning' => 0
]
属性 | 含义 | 说明 |
---|---|---|
type | 消息队列类型 | 必须 |
host | 主机名 | 必须 |
port | 端口 | 必须 |
user | 用户名 | 必须 |
password | 密码 | 必须 |
vhost | 虚拟主机 | 必须 |
prefix | 队列前缀 | 必须 |
allowQueues | 队列白名单 | 必须, 不在该白名单中的队列不允许使用,支持*通配符 |
cryptInterface | 加密类 | 可选,不需要加密时不需要该项或空,如要自定义加密,实现接口\iQueue\interfaces\CryptInterface |
transformInterface | 消息转换类 | 可选,不需要消息转换时不需要该项或空,如要自定义消息转换,实现接口\iQueue\interfaces\TransformInterface |
monitor | 监控接口地址 | 可选,需要监控时须提供,一般为地址:http(s)://host:15672 |
warning | 队列预警消息条数阀值 | 可选,用于接入监控,当队列中的待消费消息条数达到设定的值时触发预警,为0时表示不预警,具体参考下面的接入预警,支持*通配符 |
heartbeat | 消费端心跳检测 | 选填,默认60秒 |
read_write_timeout | 读写超时描述 | 选填,默认130秒,这个值必须比heartbeat*2大 |
yii_command_dir | yii文件所处位置 | 选填,默认值:dirname(\Yii::$app->basePath),当发消息需要设置超时时间时需要配置这个地址 |
send_timeout | 发送消息超时时间 | 选填,默认为0 |
send_retries | 发送消息重试次数 | 选填,默认为3次,最大10次 |
阿里MNS
在配置文件中components中添加iQueue组件基础配置
'iQueue' => [
'class' => 'iQueue\MessageQueue',
'type' => 'alimns',
'config' => [
'accessId' => 'accessId',
'accessKey' => 'accessKey',
'endPoint' => 'https://1116833840389958.mns.cn-hangzhou.aliyuncs.com/',
'prefix' => YII_ENV . '_test_',
'allowQueues' => ['smscode','sync-room','*vanke*'],
'send_timeout'=>35,
'send_retries'=>3
],
],
iQueue其他配置(按需添加)
'iQueue' => [
...基础配置...
'cryptInterface' => '\iQueue\security\AES256crypt',
'transformInterface' => 'MessageTransformer',
'warning' => 0
]
属性 | 含义 | 说明 |
---|---|---|
type | 消息队列类型 | 必须 |
accessId | 帐号id | 必须 |
accessKey | 帐号key | 必须 |
endPoint | 接入点 | 必须 |
prefix | 队列前缀 | 必须 |
allowQueues | 队列白名单 | 必须, 不在该白名单中的队列不允许使用,支持*通配符 |
cryptInterface | 加密类 | 可选,不需要加密时不需要该项或空,如要自定义加密,实现接口\iQueue\interfaces\CryptInterface |
transformInterface | 消息转换类 | 可选,不需要消息转换时不需要该项或空,如要自定义消息转换,实现接口\iQueue\interfaces\TransformInterface |
warning | 队列预警消息条数阀值 | 可选,用于接入监控,当队列中的待消费消息条数达到设定的值时触发预警,为0时表示不预警,具体参考下面的接入预警,支持*通配符 |
send_retries | 发送消息重试次数 | 选填,默认为3次,最大10次 |
send_timeout | 发送消息超时时间 | 选填,默认为35秒(忽略建立连接时间),加上重试次数n,最大超时时间为35*n |
发送消息实例
public function actionSendString()
{
// 生产消息:不指定消息的处理action和延时的秒数
$message = 'helloworld';
\yii::$app->iQueue->createClient('demo')->sendMsg($message, function($msgBody) {
$result = [
'messageId' => $msgBody->getMessageId(),
'message' => $msgBody->getMessage()
];
var_dump($result);
});
// 生产消息:指定消息的处理action或延时的秒数
$message = 'helloworld';
$queueMsg = new IQueueMessage($message);
$queueMsg->setAction('receive-new/test');
$queueMsg->setDelaySeconds(5);
\yii::$app->iQueue->createClient('demo1')->sendMsg($queueMsg, function($msgBody) {
$result = [
'messageId' => $msgBody->getMessageId(),
'message' => $msgBody->getMessage()
];
var_dump($result);
});
}
public function actionSendObject()
{
// 生产消息:不指定消息的处理action和延时的秒数
$message = ['name' => '张三', 'age' => 23, 'teacher' => ['name' => '李静', 'age' => 44]];
\yii::$app->iQueue->createClient('demo')->sendMsg($message, function($msgBody) {
$result = [
'message' => $msgBody->getMessage(),
'messageId' => $msgBody->getMessageId(),
];
var_dump($result);
});
// 生产消息:指定消息的处理action或延时的秒数
$message = ['name' => '张三', 'age' => 23, 'teacher' => ['name' => '李静', 'age' => 44]];
$message = new IQueueMessage($message);
$message->setAction('receive-new/test');
$message->setDelaySeconds(10);
\yii::$app->iQueue->createClient('demo1')->sendMsg($message, function($msgBody) {
$result = [
'message' => $msgBody->getMessage(),
'messageId' => $msgBody->getMessageId(),
];
var_dump($result);
});
}
消费端实现-推荐(指定action处理队列消息,参见demo文件夹下面的ReceiveController)
定义一个consoleController继承 \iQueue\controllers\ShellConsumeController,基类主要代码如下:
class ShellConsumeController extends \yii\console\Controller
{
// 如果生产消息时没有指定action,需要在子类的Controller中指定$queueAction,否则消息无法消费
protected $queueAction;
/**
* 接收消息并调用业务处理
* @param array $messageBody
*/
protected function successed(\iQueue\MessageBody $messageBody)
{
// shell脚本执行
.......
}
/**
* 定义yii路径,可根据需要重写
* @return string
*/
protected function getYii()
{
return dirname(dirname(dirname(dirname(dirname(__FILE__))))) . "/yii";
}
}
子类的代码示例如下:
class ReceiveController extends \iQueue\controllers\ShellConsumeController
{
protected $queueAction = 'receive/test';
// protected function successed(\iQueue\MessageBody $messageBody)
// {
// }
}
基类的successed方法已经实现了shell执行脚本,如果有特殊要求可重写该方法
然后配置守护进程执行 php yii receive/consume 队列名A 即:对队列名A中的消息使用actionXXX进行消费处理
消费端实现(原单进程,参照demo文件夹下的SmsController,注意该模式下业务代码更新后必须重启进程,否则无法更新;另外对于由于始终在一个进程下运行,使用方要对全局变量谨慎,否则可能存在占用大量内存的情况)
定义一个consoleController继承 \iQueue\controllers\ConsumeController 实现方法successed即可
class SmsController extends \iQueue\controllers\ConsumeController
{
protected function successed(\iQueue\MessageBody $messageBody)
{
$result = [
'messageId' => $msgBody->getMessageId(),
'message' => $msgBody->getMessage()
];
var_dump($result);
}
}
然后配置守护进程执行 php yii sms/consume 队列名A 即:对队列名A中的消息使用SMS进行业务处理消费
发送消息增加超时时间
发消息方法增加超时设置
$message = ['name' => '张三', 'age' => 23, 'teacher' => ['name' => '李静', 'age' => 44]];
$message = new IQueueMessage($message);
$message->setAction('/receive-new/test');
$message->setDelaySeconds(10);
\yii::$app->iQueue->createClient('demo1')->sendMsg($message, function($msgBody) {
$result = [
'message' => $msgBody->getMessage(),
'messageId' => $msgBody->getMessageId(),
];
}, null, 5);// 消息发送超时5秒抛异常结束
注意: 当使用 rabbitmq 渠道时需完成以下两项配置:
1、在命令行应用配置增加modules配置:
'iqueue' => [
'class' => 'iQueue\Module',
]
2、配置yii文件路径
'iQueue' => [
'class' => 'iQueue\MessageQueue',
'type' => 'rabbitmq',
'config' => [
...基础配置...
'yii_command_dir'=>dirname(dirname(__DIR__))
],
],
属性 | 含义 | 说明 |
---|---|---|
yii_command_dir | yii文件所处位置 | 选填,默认值:dirname(\Yii::$app->basePath),当发消息需要设置超时时间时需要配置这个地址 |
高级功能
注意事项
1、队列与消费端是强绑定关系,意味着一个队列只能存放一种业务消息,调用方必须保证所入队列正确,故为了方便记忆,建议消费端controller定义与队列名相同(去除分隔符)
2、使用RabbitMQ的延时功能时,需要在服务器安装并启用rabbitmq_delayed_message_exchange插件,请参照https://confluence.mysre.cn/pages/viewpage.action?pageId=10926446