volcengine/ve-rocketmq-php-sdk

PHP SDK for volcengine RocketMQ

0.1.1 2023-08-02 06:33 UTC

This package is auto-updated.

Last update: 2024-05-29 10:26:01 UTC


README

火山引擎消息队列 RocketMQ版PHP SDK 是基于 HTTP-Proxy 的RocketMQ客户端。 该SDK可通过RocketMQ实例的Http Proxy 接入点连接实例,实现消息的生产与消费。

安装

composer require volcengine/ve-rocketmq-php-sdk

快速开始

创建客户端。

初始化一个RocketMQ客户端需要准备好火山引擎RocketMQ实例的HTTP Proxy接入点、accessKey和secretKey。

use RMQ\Client;

//  HTTP Proxy 接入点
$endpoint = ""; 
// 密钥
$accessKey = ""; 
$secretKey = "";

// 实例化客户端
$client = new Client($endpoint, $accessKey, $secretKey);

生产

创建生产者

调用client实例的 createProducer() 方法即可创建一个生产者实例。

// 创建一个生产者
$producer = $client->createProducer();

创建消息

一条消息拥有很多属性topic、body、tag、key等,可以使用 Message 实例化一个消息的信息对象并可在这个对象上设置这些属性。

use RMQ\Message;

// 目标topic
$topic ="topic_name"; 
// 消息的内容
$messageContent = "content." 

// 实例化一个消息
$msg = new Message($topic, $messageContent);
// 设置消息的tag值
$msg->setTag("tag_a");
// 设置ShardingKey
$msg->setShardingKey("my_key");
// 设置自定义属性
$msg->putProperty("property_name", "test");

生产消息

调用producer实例的 publishMessage() 方法就能发布一条消息。在发布消息前还需要调用 open() 方法在服务端开启一个生产者实例, 在不需要发送消息时可以调用 close() 方法销毁。

$producer->open();

$msg = new Message("topic_name", "hello!");
$messageInfo = $producer->publishMessage($msg);
$producer->close();

var_dump(messageInfo);

消费

创建消费者

调用client实例的 createConsumer() 方法即可创建一个消费者实例, 创建消费者时必须指定消费者的GroupID。

$groupID = ""; // 消费组ID
 
$consumer = $client->createConsumer($groupID, [
  // 每次调用consumeMessage最多拉取12条消息
  "max_message_number" => 12,
  // 在消息达到max_message_number之前,请求在服务端挂起的最大等待时长(单位ms)
  "max_wait_time"      => 3000
]);

消费消息

调用消费者的 consumeMessage() 能拉取一批消息。在拉取消息并被使用后,需要调用 ackMessages() 对消息的消费状态进行确认,未被确认或确认消费失败的消息都会被重复消费。

use RMQ\Model\MessageInfo;

$consumer = $client->createConsumer($groupID);
// 订阅topic_a 全部消息1
$consumer->subscribe("topic_a");
// 订阅topic_b tag为A的消息
$consumer->subscribe("topic_b", "A");

$consumer->open();

// 拉取消息
$messages = $consumer->consumeMessage();

$acksHandles = [];
foreach ($messages as $msg) {
    $body = $msg->body;
    echo "message bode: $body \n";
    array_push($acksHandles, $msg->msgHandle);
}
// 确认消息的消费情况
// ackMessages第一个参数是确认消费成功的消息的msgHandle
// ackMessages第二个参数是确认消费失败的消息的msgHandle
$consumer->ackMessages($acksHandles, []);

$consumer->close();

进阶指引

持续生产消息

服务端会对每一个客户端创建一个生产者实例,在客户端生产频率较低时,可能会出现服务端生产者实例被释放导致生产消息失败的情况。

$producer->open();
// 在open后等待60秒
sleep(60);
// 下面的方法调用会失败,因为服务端的生产者实例已超时被销毁掉
$producer->publishMessage($msg);

所以在持续生产消息时需要捕获这类异常并重新调用 open() 方法重新在服务端开启一个生产者实例,SDK 提供了一个专门用来捕获该类错误的Exception在RMQ\Exception\MQTokenTimeoutException 。如下demo,对部分消息等待一个很长的时间,这些消息发送时就会捕获到超时错误。

use RMQ\Exception\MQTokenTimeoutException;

for ($i = 0; $i < 10; $i++) {
  if ($i % 2 == 0) {
    sleep(60 * 10); // 偶数消息等待10分钟
  }
  
  $message = new Message("topic_name", "hello!");
  try {
    // 发送消息
    $producer->publishMessage($message);
  } catch (MQTokenTimeoutException $e) {
    // token失效的情况需要重连
    $producer->open();
    // 对消息重发
    $producer->publishMessage($message);
  } catch (RuntimeException $e) {
    // 其他错误情况
    echo $e . "\n";
  }
}

持续消费消息

持续消息实际上就是一个轮询不断拉取消息,如果每次拉取消息的间隔过长也可能出现超时的情况,所以也需要捕获超时错误并重新调用 open() 方法。

$consumer->open();

while (true) {
  try {
    // 拉取消息
    $messages    = $consumer->consumeMessage();
    $acksHandles = [];
    foreach ($messages as $msg) {
        $body = $msg->body;
        echo "message bode: $body \n";
        array_push($acksHandles, $msg->msgHandle);
    }
    // 确认消费状态
    $consumer->ackMessages($acksHandles, []);
  } catch (MQTokenTimeoutException $e) {
    // token失效的情况需要重连
    $consumer->open();
  } catch (RuntimeException $e) {
    // 其他错误 
    echo $e;
  }
}

延时消息

延时是消息的一个属性,可通过Message类的putProperty()方法来设置消息的延时属性。

定时投递消息

通过指定的具体的毫秒时间戳定时投递消息。

use RMQ\Message;

$msg = new Message("topic_name", "content.");
// 消息投递的具体毫秒时间戳(当前时间延迟30秒)
$postTime = time() * 1000 + 30000; 
// 将延时属性设置到Property中
$msg->putProperty("__STARTDELIVERTIME", "$currentTimeStamp"); 

延时等级消息

Message 类有 setDelayLevel() 方法可设置消息的延时属性。可设置1-18等级.

$msg2 = new Message("topic_name", "content");

$msg2->setDelayLevel(5)

setDelayLevel() 方法背后实际上使用的还是putProperty()方法。使用setDelayLevel() 实际效果和下面一致。

$level = 5

$msg2 = new Message("topic_name", "content");

$msg2->putProperty("__DelayTimeLevel", "$level");