losingbattle / rocketmq-http
基于阿里云的rocketmq-http客户端及服务端封装
Installs: 1 090
Dependents: 0
Suggesters: 0
Security: 0
Stars: 8
Watchers: 1
Forks: 1
Open Issues: 0
pkg:composer/losingbattle/rocketmq-http
Requires
- php: >=8.1
- hyperf/codec: ~3.1.0
- hyperf/collection: ~3.1.0
- hyperf/contract: ~3.1.0
- hyperf/coroutine: ~3.1.0
- hyperf/di: ~3.1.0
- hyperf/guzzle: ~3.1.0
- hyperf/process: ~3.1.0
- hyperf/support: ~3.1.0
- psr/container: ^1.0|^2.0
- psr/event-dispatcher: ^1.0
- psr/log: ^1.0|^2.0|^3.0
Suggests
- hyperf/di: Required to use annotations.
- hyperf/event: Declare queue and start consumers automatically.
This package is auto-updated.
Last update: 2025-10-23 21:53:58 UTC
README
##安装
composer require Losingbattle/rocketmq-http
- hyperf框架直接引用即可,使用姿势与官方rabbitmq基本一致
- 其余框架生产者依赖guzzlehttp,psr/container,实现即可,消费者使用了协程消费依赖swoole example
- 阿里云的rocketmq-http本身存在一些问题,当gid+topic+instance过长时消费消息将会报错,所以只能自身在创建时把控(Code: NotSupport Message: the length of GID(CID) and TOPIC is too long, total length(include instance) should not longer than 119, please change another topic or another cid RequestId : 605402BE384531236C9E1205 HostId)
- 普通消息相关已在线上稳定运行一年多
默认配置
配置 | 类型 | 默认值 | 备注 |
---|---|---|---|
host | string | Host | |
access_key_id | string | ||
access_key_secret | string | ||
instance_id | string | 实例id | |
concurrent.limit | int | 0 | 同时消费的数量 |
hyperf中使用一下命令初始化即可
php bin/hyperf.php vendor:publish losingbattle/rocketmq-http
投递消息
在 DemoProducer 文件中,我们可以修改 @Producer
注解对应的字段来替换对应的 topic
和 tag
。
其中 MessageBody
就是最终投递到消息队列中的数据。
示例如下。
使用
@Producer
注解时需use Losingbattle\RocketMqHttp\Annotation\Producer;
命名空间;
<?php namespace App\Rocketmq\Producer; use Losingbattle\RocketMqHttp\Annotation\Producer; use Losingbattle\RocketMqHttp\Message\ProducerMessage; #[Producer(topic: "order_center_normal_topic", tag: "order_submit")] class TestMessage extends ProducerMessage { public function setBizId($bizId) { $this->setMessageBody('bizId', $bizId); return $this; } }
<?php use Losingbattle\RocketMqHttp\Producer; use App\Rocketmq\Producer\TestMessage; use Hyperf\Utils\ApplicationContext; $producer = $this->container->get(Producer::class); $testMessage = new TestMessage(); $testMessage->setOrderSn("1")->setType(1)->setUserId(1)->setBizId(1); $m = $producer->produce($testMessage,1);
消费消息
<?php declare(strict_types=1); namespace App\Rocketmq\Consumer; use Losingbattle\RocketMqHttp\Annotation\Consumer; use Losingbattle\RocketMqHttp\Message\ConsumerMessage; use Losingbattle\RocketMqHttp\Result; #[Consumer(topic: "order_center_normal_topic", groupId: "GID_order_center_status_change", numOfMessages: 16, waitSeconds: 30)] class OrderCenterConsumer extends ConsumerMessage { public function __construct() { $this->registerRoute('tag_default_share-order_C_TERMINAL_updateDraftOrderStatus', [$this, 'updateOrderStatus']); $this->registerRoute('tag_test', [$this, 'test']); } public function isEnable(): bool { return false; } public function updateOrderStatus($x) { return Result::ACK; } public function test($x) { return Result::ACK; } public function consumeMessage($consumeMessageResponse) { //没有指定tag则默认使用consumeMessage return Result::ACK; } }
日志
dependencies.php中添加(建议只在本地调试时使用,本质上是不停的http轮询,影响日志查看)
return [ RocketMqHttpLoggerFactory::class,//(example文件中,重写loggerfaoctory) ];
listener.php 中添加消费listener
return [ Losingbattle\RocketMqHttp\Listener\ConsumeListener::class, ];
demo
延时消息生产与普通消息相比只是在注解上多个一个delayTtl(秒)的属性
延时消息与普通基本在消费形式上没有太大区别,只有在阿里云控制台有区分