coding-linheng / hyperf-rocketmq
hyperf 的 aurocketmq 组件
v1.0.2
2022-10-06 06:11 UTC
Requires
- php: >=8
- ext-swoole: >=4.6
- coding-linheng/mq-http-php-sdk: ^1.0.4
Requires (Dev)
- hyperf/config: ^3.0
- hyperf/di: ^3.0
- hyperf/utils: ^3.0
- phpunit/phpunit: ^9.5
This package is auto-updated.
Last update: 2024-11-06 10:43:43 UTC
README
基于阿里云的rocketmq-php-sdk 修改的 hyperf2.2 的 rocket 组件
任何问题请加QQ群提问:873213948
安装 - install
$ composer require coding-linheng/hyperf-rocketmq
发布配置 vendor:publish
php bin/hyperf.php vendor:publish coding-linheng/hyperf-rocketmq
使用
以下是伪代码,仅供参考
<?php declare(strict_types=1); namespace App\Controller; use Codelin\HyperfRocketmq\Client; use Hyperf\HttpServer\Annotation\AutoController; use MQ\Exception\MessageNotExistException; use MQ\Model\Message; use MQ\Model\TopicMessage; use MQ\MQConsumer; use MQ\MQProducer; use Psr\Container\ContainerInterface; #[AutoController] class IndexController extends AbstractController { protected MQProducer $producer; protected MQConsumer $consumer; public function __construct(ContainerInterface $container) { $this->producer = $container->get(Client::class)->getProducer(); $this->consumer = $container->get(Client::class)->getConsumer(); } public function testProducer() { try { for($i = 1; $i <= 8; $i++) { $publishMessage = new TopicMessage( "test"// 消息内容 ); // 设置属性 $publishMessage->putProperty("a", $i); // 设置分区顺序KEY $publishMessage->setShardingKey($i % 2); // 设置tag $publishMessage->setMessageTag('hyperf-test'); $result = $this->producer->publishMessage($publishMessage); print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n"; } } catch (\Exception $e) { print_r($e->getMessage() . "\n"); } } public function testConsumer() { while(True) { try { // 长轮询消费消息 // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回 $messages = $this->consumer->consumeMessage( 5, // 一次最多消费3条(最多可设置为16条) 3 // 长轮询时间3秒(最多可设置为30秒) ); } catch (\Exception $e) { if ($e instanceof MessageNotExistException) { // 没有消息可以消费,接着轮询 printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId()); continue; } print_r($e->getMessage() . "\n"); sleep(3); continue; } var_dump($this->consumer->getMessageTag()); var_dump($messages); print "consume finish, messages:\n"; // 处理业务逻辑 $receiptHandles = array(); /** @var Message $message */ foreach($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n", $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(), $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(), $message->getMessageKey()); print_r($message->getProperties()); } // $message->getNextConsumeTime()前若不确认消息消费成功,则消息会重复消费 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样 print_r($receiptHandles); try { $this->consumer->ackMessage($receiptHandles); } catch (\Exception $e) { if ($e instanceof AckMessageException) { // 某些消息的句柄可能超时了会导致确认不成功 printf("Ack Error, RequestId:%s\n", $e->getRequestId()); foreach ($e->getAckMessageErrorItems() as $errorItem) { printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode()); } } } print "ack finish\n"; } } public function testPackage() { var_dump($this->consumer); var_dump($this->producer); } }