leoxml/hyperf-rocketmq

hyperf rocketmq sdk

v1.0.1 2024-11-29 07:21 UTC

This package is auto-updated.

Last update: 2024-12-29 07:34:47 UTC


README

基于阿里云 rocketmq SDK 封装,实现了协程、连接池、消息可靠投递 等功能

该库来源于: https://gitee.com/easy_code/hyperf-rocketmq

1、安装

composer require leoxml/hyperf-rocketmq

2、配置

发布配置

php bin/hyperf.php vendor:publish leoxml/hyperf-rocketmq

配置说明

return [
    'default' => [ // 分组名,基于 host、port、scheme 进行区分
        'host' => env('ROCKETMQ_HTTP_HOST'),
        'access_key' => env('ROCKETMQ_HTTP_ACCESS_KEY_ID'),
        'secret_key' => env('ROCKET_MQ_HTTP_ACCESS_KEY_SECRET'),
        'instance_id' => env('ROCKET_MQ_HTTP_INSTANCE_ID'),
        'pool' => [
            'min_connections' => 50,
            'max_connections' => 300,
            'connect_timeout' => 3.0,
            'wait_timeout' => 30.0,
            'heartbeat' => -1,
            'max_idle_time' => 60.0,
        ],
    ],
];

3、创建相关数据表

如果不需要记录日志 或 消息不需要可靠投递,可以忽略这步

php bin/hyperf.php migrate --path=migrations/rocketmq

表说明:

rocketmq_status_log:消息生产状态表

rocketmq_produce_status_log:生成消息状态

rocketmq_consume_log:消费日志

4、投递消息

Producer注解参数

4.1 定义生产者相关信息

在 DemoProducer 文件中,我们可以修改 @Producer 注解对应的字段来替换对应的 poolNametopicmessageTag。就是最终投递到消息队列中的数据,所以我们可以随意改写 __construct 方法,只要最后赋值 payload 即可。

使用 @Producer 注解时需 use Leoxml\RocketMQ\Annotation\Producer; 命名空间;

<?php
declare(strict_types=1);

namespace App\Test\Queue\Producer;

use Leoxml\RocketMQ\Annotation\Producer;
use Leoxml\RocketMQ\Message\ProducerMessage;

#[Producer(topic:"Topic_03_test", messageTag:"tMsgKey")]
class DemoProducer extends ProducerMessage
{
    public function __construct(array $data)
    {
        // 设置消息内容
        $this->setPayload($data);
        // 自定义messageKey(不定义,会自动生成)
        $this->setMessageKey('xxxxx');
    }
}

4.2 普通投递方式

通过Leoxml\RocketMQ\Producer实例,即可投递消息。

<?php

declare(strict_types=1);

namespace App\Controller;

use App\Producer\DemoProducer;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Leoxml\RocketMQ\Producer;

#[Controller]
class IndexController extends AbstractController
{
    #[Inject(Producer::class)]
    protected Producer $producer;

    #[RequestMapping("index")]
    public function index()
    {
        $message = new DemoProducer(['a' => 1, 'b' =>2]);
        $this->producer->produce($message);
        return $this->response->json([]);
    }
}

4.3 消息可靠投递方式

目前,消息投递时Rocketmq返回成功响应,就视为投递成功(暂不考虑Rocketmq缓存丢失的问题)。

实现原理:先将需要投递的消息入库处理,然后再进行发送操作

  1. 执行以下命令,生成相关的数据表

    php bin/hyperf.php migrate --path=migrations/rocketmq
  2. 使用示例

    $demoProducer = new BarProducer(['test' => 12345, 'name' => '张三1231']);
    
    Db::beginTransaction();
    try{
        // todo 业务逻辑
    
        // 记录消息状态
        $demoProducer->saveMessageStatus();
        Db::commit();
    } catch(\Throwable $ex){
        Db::rollBack();
    }
    
    // 推送消息
    $this->producer->produce($demoProducer);
  3. 投递失败的消息,可以通过守护进程监听 mq_status_log 数据表status不等于3的消息,进行重新投递(后面实现)

5、消息消费

Consumer注解属性说明

在 DemoConsumer文件中,我们可以修改 @Consumer 注解对应的字段来替换对应的 topicgroupIdmessageTag

使用 @Consumer 注解时需 use Leoxml\RocketMQ\Annotation\Consumer; 命名空间;

use Leoxml\RocketMQ\Annotation\Consumer;
use Leoxml\RocketMQ\Library\Model\Message as RocketMQMessage;
use Leoxml\RocketMQ\Message\ConsumerMessage;
use Leoxml\RocketMQ\Result;

#[Consumer(topic: "Topic_03_test", groupId: "test_test", messageTag: "tMsgKey||tMsgKey_bar")]
class DemoCounser extends ConsumerMessage
{
    public function consumeMessage(RocketMQMessage $rocketMQMessage): string
    {
        $msgTag = $rocketMQMessage->getMessageTag(); // 消息标签
        $msgKey = $rocketMQMessage->getMessageKey(); // 消息唯一标识
        $msgBody = $rocketMQMessage->toArray(); // 消息体
        $msgId = $rocketMQMessage->getMessageId();

        var_dump('消费端接收到消息:', $msgBody);

        return Result::ACK;
    }
}

6、事件说明

下面事件都在 Leoxml\RocketMQ\Event 命名空间下