alicfeng / aliyun_rocket_mq
基于阿里云官方SDK增强组件,更加优雅的应用姿势、更加灵活的动态配置,让应用层服务组件更加标准规范
1.0.3
2021-07-28 04:36 UTC
Requires
- php: ^7.0|^8.0
- ext-json: *
- aliyunmq/mq-http-sdk: ^1.0.3
- friendsofphp/php-cs-fixer: ^2.18
Requires (Dev)
- mockery/mockery: ^1.4.2
- nunomaduro/collision: ^5.0
- phpunit/phpunit: ^9.3.3
README
阿里云RocketMQ增强组件
基于阿里云官方SDK增强组件
更加优雅的应用姿势、更加灵活的动态配置,让应用层服务组件更加标准规范
特点
-
支持消息幂等性消费,防止消息重复消费
-
关系协议约定,仅关心消息消费逻辑即可
-
专注于配置化定义与注册,更加灵活简洁
安装
composer require alicfeng/aliyun_rocket_mq -vvv
# 安装完毕必须执行如下脚本 解决官网代码缺陷
vendor/alicfeng/aliyun_rocket_mq/bin/fix_official_pkg.sh
配置
$config = [ 'client' => [ 'endpoint' => env('MQ_ROCKET_CLIENT_ENDPOINT'), 'access_key' => env('MQ_ROCKET_CLIENT_ACCESS_KEY'), 'secret_key' => env('MQ_ROCKET_CLIENT_SECRET_KEY'), ], 'consumer' => [ 'handler_base_namespace' => env('MQ_ROCKET_CONSUMER_HANDLER_BASE_NAMESPACE'), 'topic' => env('MQ_ROCKET_CONSUMER_TOPIC'), 'message_tags' => [ ], 'group_id' => env('MQ_ROCKET_CONSUMER_GROUP_ID'), 'instance_id' => env('MQ_ROCKET_CONSUMER_INSTANCE_ID'), ], 'cache' => [ 'enable' => env('MQ_ROCKET_CACHE_ENABLE', true), 'host' => env('REDIS_HOST', '127.0.0.1'), 'password' => env('REDIS_PASSWORD', null), 'port' => env('REDIS_PORT', '6379'), 'database' => env('REDIS_DB', '0'), ] ];
使用
简单使用
use MQ\Model\TopicMessage; use Samego\RocketMQ\Consumer; use Samego\RocketMQ\Enum\MessageTagEnum; use Samego\RocketMQ\Enum\TopicEnum; use Samego\RocketMQ\Event\MessageEvent; use Samego\RocketMQ\Producer; $message = new TopicMessage(['name' => 'hello world']); $message->putProperty('timestamp', time()); $message->setMessageTag(MessageTagEnum::TRAINING_SERVICE_TRAINING_CONTROLLER); $message->setMessageKey('uuid'); // 普通消息发送 Producer::normal($config['client'])->publish('MQ_xxx', TopicEnum::DEMO_SERVICE, $message); // 普通消息订阅 Consumer::normal($config['client'], new MessageEvent($config['consumer'], $config['cache']))->subscribe();
模切约定
- 消费时、每一个服务订阅一个主题,即一个进程仅支持一个主题(
topic
)监听 - 消费时、每一个订阅进程支持多个消息标签(
MessageTag
)监听 - 消费时、每一个消息标签需要实现对于应的标签处理事件类,具体约定示例如下:
// 消费配置 'consumer' => [ // 定义承载消费处理基类命名空间 'handler_base_namespace' => 'App\\Queue\\Handler', 'topic' => env('MQ_ROCKET_CONSUMER_TOPIC'), 'message_tags' => [ 'Demo' ], 'group_id' => env('MQ_ROCKET_CONSUMER_GROUP_ID'), 'instance_id' => env('MQ_ROCKET_CONSUMER_INSTANCE_ID'), ]; // 定义好了 consumer.handler_base_namespace 与 consumer.message_tags.* 则需要定义Demo消费处理类 DemoHandler // 同时此类需要继承 Samego\RocketMQ\Contract\QueueServiceHandlerInterface 接口 namespace App\Queue\Handler; use MQ\Model\Message; use Samego\RocketMQ\Contract\QueueServiceHandlerInterface; use Samego\RocketMQ\Helper\StdLogHelper; class DemoHandler implements QueueServiceHandlerInterface { public function handler(Message $message): bool { return true; } public function failure(Message $message): void { } }
注意
假设以镜像交付时,务必再安装依赖完毕时执行修复脚本,如下为示例
# 安装完毕必须执行如下脚本 解决官网代码缺陷 RUN vendor/alicfeng/aliyun_rocket_mq/bin/fix_official_pkg.sh
或者在持续集成中执行
# 安装完毕必须执行如下脚本 解决官网代码缺陷 - name: 安装依赖 image: registry-vpc.cn-shenzhen.aliyuncs.com/library/application:1.0.0 commands: - composer config -g - COMPOSER_MEMORY_LIMIT=-1 composer install --optimize-autoloader -vvv - vendor/alicfeng/aliyun_rocket_mq/bin/fix_official_pkg.sh