zanphp/nsq-client

There is no license information available for the latest version (dev-master) of this package.

dev-master 2017-08-23 14:08 UTC

This package is not auto-updated.

Last update: 2024-11-10 03:56:41 UTC


README

API

<?php
class NSQ
{
    /**
     * 订阅
     * @param string $topic
     * @param string $channel
     * @param MsgHandler|callable $msgHandler
     * @param int $maxInFlight
     * @return \Generator yield return Consumer
     * @throws NsqException
     */
    public static function subscribe($topic, $channel, $msgHandler, $maxInFlight = -1);
    
    /**
     * 取消订阅
     * @param string $topic
     * @param string $channel
     * @return bool
     */
    public static function unSubscribe($topic, $channel);
    
    /**
     * 发布
     * @param string $topic
     * @param string[] ...$messages
     * @return \Generator yield bool
     * @throws NsqException
     */
    public static function publish($topic, ...$messages);
    
    /**
     * 统计信息
     * @return array
     */    
    public static function stat();
}
<?php
class Message
{
    /**
     * @return string
     */
    public function getId();
    
    /**
     * @return string
     */
    public function getBody();

    /**
     * @return string
     */
    public function getTimestamp();
    
    /**
     * @return int
     */
    public function getAttempts();
    
    /**
     * 关闭自动回复 
     * DisableAutoResponse disables the automatic response that
     * would normally be sent when a MsgHandler:;handleMessage
     * returns (FIN/REQ based on the value returned).
     * @return void
     */
    public function disableAutoResponse();
    
    /**
     * IsAutoResponseDisabled indicates whether or not this message
     * will be responded to automatically
     * @return bool
     */
    public function isAutoResponse();
    
    /**
     * HasResponded indicates whether or not this message has been responded to
     * @return bool
     */
    public function hasResponsed();
    
    /**
     * 完成该消息
     * Finish sends a FIN command to the nsqd which
     * sent this message
     */
    public function finish();
    
    /**
     * 更新服务端消息超时时间
     * Touch sends a TOUCH command to the nsqd which
     * sent this message
     */
    public function touch();
    
    /**
     * 重试该消息
     * Requeue sends a REQ command to the nsqd which
     * sent this message, using the supplied delay.
     *
     * A delay of -1 will automatically calculate
     * based on the number of attempts and the
     * configured default_requeue_delay
     * @param int $delay ms
     * @param bool $backoff
     */
    public function requeue($delay, $backoff = false);

}

Config

首先要添加nsq节点配置

config/${env}/nsq.php

/**
 * 说明:
 * 1. 只有lookup项必填, 其他全部选填
 * 2. 所有时间配置 单位: ms
 */
return [
    // ["必填"]lookup 节点地址
    "lookup" => [
        "http://xxx.yyy.zzz:4161"
    ]
];

配置 WorkerStart, 参考

./init/WorkerStart/.config.php

<?php
use ZanPHP\NSQ\InitializeNSQ;

return [
    InitializeNSQ::class,
];

Example

Publish:

<?php
function taskPub()
{
    $topic = "zan_mqworker_test";

    $oneMsg = "hello";
    $multiMsgs = [
        "hello",
        "hi",
    ];

    /* @var Producer $producer */
    $ok = (yield NSQ::publish($topic, $oneMsg));
    $ok = (yield NSQ::publish($topic, "hello", "hi"));
    $ok = (yield NSQ::publish($topic, ...$multiMsgs));
}

Task::execute(taskPub());

Subscribe:

<?php
// auto response + msgHandlerCallback
$task1 = function() {
    $topic = "zan_mqworker_test";
    $ch = "ch1";
    /* @var Consumer $consumer */
    $consumer = (yield NSQ::subscribe($topic, $ch, function(Message $msg, Consumer $consumer) {
        echo $msg->getId(), "\n";
        yield taskSleep(1000);
    }));
    swoole_timer_after(3000, function() use($consumer) {
        $consumer->stop();
    });
};
Task::execute($task1());

// auto response + TestMsgHandlerImpl
$task2 = function() {
    $topic = "zan_mqworker_test";
    $ch = "ch1";
    $msgHandler = new TestMsgHandler();
    yield NSQ::subscribe($topic, $ch, $msgHandler);
};
Task::execute($task2());


$task2 = function() {
    $topic = "zan_mqworker_test";
    $ch = "ch1";
    yield NSQ::subscribe($topic, $ch, function(Message $msg) {
        // $msg->finish();
        // $msg->touch();
        // $msg->requeue($delay, $isBackoff);
        // throw new \Exception()
    });;
};

All Config

/**
 * 说明:
 * 1. 只有lookup项必填, 其他全部选填
 * 2. 所有时间配置 单位: ms
 */
return [
    // ["必填"]lookup 节点地址
    "lookup" => [
        "http://xxx.yyy.zzz:4161"
    ],

    // ["建议填写"] 需要publish的topic列表, 预先配置, 会在workerStart时候建立好连接
    "topic" => [
        "zan_mqworker_test",
    ],



    // ====================================== 以下选择性配置 ====================================
    // ======================================  identity  ======================================
    "identity" => [
        // Identifiers sent to nsqd representing this client
        "client_id" => gethostname(),

        "hostname" => gethostname(),

        "feature_negotiation" => true, // is enable negotiation

        // Duration of time between heartbeats. This must be less than ReadTimeout
        "heartbeat_interval" => 30 * 1000, // default: 30000ms

        // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
        "output_buffer_size" => 16384, // 16kb

        // Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
        // WARNING: configuring clients with an extremely low
        // (< 25ms) output_buffer_timeout has a significant effect
        // on nsqd CPU usage (particularly with > 50 clients connected).
        "output_buffer_timeout" => -1, // default 250ms

        // "tls_v1" => false,
        "snappy" => false,
        "deflate" => false,
        "deflate_level" => 1,
        "sample_rate" => 0,

        "user_agent" => "zan-nsq-client/v0.1",

        // 服务端消息超时时间
        // The server-side message timeout for messages delivered to this client
        "msg_timeout" => 60000, // ms
    ],



    // ======================================  Lookup  ======================================
    // nsqd连接延迟关闭时间
    "delaying_close_time" => 5 * 1000,

    // 连接nsqd超时时间
    "nsqd_connect_timeout" => 3 * 1000,

    // lookup连接超时时间
    "nsqlookupd_connect_timeout" => 3 * 000,

    // 通过lookupd更新nsqd节点周期
    // Duration between polling lookupd for new producers, and fractional jitter to add to
    // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
    // restart at the same time
    "lookupd_poll_interval" => 60 * 1000,
    "lookupd_poll_jitter" => 0.3, // 0~1



    // =====================================  Producer  =====================================
    // 需要publish的topic列表, 预先配置, 会在workerStart时候建立好连接
    /*
    "topic" => [
        "zan_mqworker_test",
    ],
    */

    // publish临时连接生命周期, 要大于消息处理时长
    "disposable_connection_lifecycle" => 60 * 1000,

    // 发布超时时间
    "publish_timeout" => 3 * 1000,



    // =====================================  Consumer  =====================================

    // 是否开启消息自动回复
    // MsgHandler $result !== false 时, 自动发送FIN命令完成任务
    "message_auto_response" => true,

    // pipe_count: 设置当前消费者实例(多个NSQD连接)最大允许的in-flight消息数量, 每个nsqd连接均分
    // Maximum number of messages to allow in flight (concurrency knob)
    "max_in_flight" => 10, //2500,

    // 每个topic的最大nsqd连接数, 最小值为lookup节点查询当前nsqd数量
    // max(count($nsqdList), $this->maxConnectionNum)
    "max_connection_per_topic" => 50,

    // consumer消息最大重试次数
    "max_attempts" => 5,

    /**
     * 消息requeue backoff设置
     * $delay = Backoff::calculate($msg->getAttempts(), $c["min"], $c["max"], $c["factor"], $c["jitter"]);
     */
    "message_backoff" => [
        "min" => 2 * 1000, // backoff 起始 时间
        // Maximum amount of time to backoff when processing fails
        // 0 == no backoff
        "max" => 1000 * 60 * 10, // backoff 时间 上限
        "factor" => 2,
        "jitter" => 0.3 // 0~1
    ],

    // 在nsqd连接间重新分配max-in-flight的时间间隔
    "rdy_redistribute_interval" => 5 * 1000,

    // 重新分配rdy的闲置等待时间阈值
    // Duration to wait for a message from a producer when in a state where RDY
    // counts are re-distributed (ie. max_in_flight < num_producers)
    "low_rdy_idle_timeout" => 10 * 1000,


    // =====================================  General  =====================================
    // prod机器 /proc/sys/net/core/rmem_max = /proc/sys/net/core/wmem_max = 327679
    "socket_buffer_size" => 327679,
    "packet_size_limit" => 327679,

    // auth 不支持
    // "auto_secret" => "",
];