cheng_util/mqserver

v1.0.5 2022-12-05 08:28 UTC

This package is auto-updated.

Last update: 2024-04-07 16:59:54 UTC


README

composer require cheng_util/mqserver

配置

$config = [
        'main_ip' => '192.168.10.5',
        'rbmq' => [
            'host' => '192.168.10.5',
            'port' => 5672,
            'user' => 'guest',
            'pass' => 'guest',
            'vhost' => '/',
            'debug' => true,
        ],
        'coroutine' => true,    //使用swoole协程处理mq接收到的数据
        'swoole_udp' => [   //swoole upd的配置
            'port' => 9502,
            'host' => '127.0.0.1'
        ]
    ];

启动消费服务

/**
 * 默认消费队列,纯rbmq处理
 */
 
require_once __DIR__.'/../../../vendor/autoload.php';
ConsumeServer::getInstance()->setConfig($config);
ConsumeServer::getInstance()->startDefaultConsume(function($message){   //使用闭包 config的coroutine必须为false
            $message->ack();
            var_dump(json_decode($message->body, true));
        });
        

/**
 * 延时队列,纯rbmq处理
 */
 
require_once __DIR__.'/../../../vendor/autoload.php';
ConsumeServer::getInstance()->setConfig($config);
        ConsumeServer::getInstance()->startDelayConsume(function($message){   //使用闭包 config的coroutine必须为false
            $message->ack();
            var_dump($message->body);
        });
        

发送mq消息

require_once __DIR__.'/../../../vendor/autoload.php';
ProducerServer::getInstance()->setConfig($config);

$time = time();
//发送普通消息 第二个参数是延时,0则为普通数据
$res = ProducerServer::getInstance()->pushMessage([
    'type' => 'default_message',
    'msg' => '普通内容',
    'time_stamp' => $time,
    'time_date' => date('Y-m-d H:i:s', $time)
], 3, true);

var_dump($res);


//协程发消息
ProducerServer::getInstance()->pushMessageCoroutine([
                'type' => 'deafult_message',
                'msg' => '普通内容',
                'time_stamp' => $time,
                'time_date' => date('Y-m-d H:i:s', $time)
            ]);

单独发送udp消息,不经过mq

// 单独发udp消息
SwooleUdpClientService::getInstance()->setConfig($config);
$body = SwooleUdpClientService::getInstance()->buildBody('ccc', ['asdf' => date('Y-m-d H:i:s')]);
SwooleUdpClientService::getInstance()->CoroutineSend($body);

demo说明:

1、普通消费

DemoDefaultConsumeServer.php 启动普通消费队列

DemoDelayConsumeServer 启动延时队列

2、swoole的协程消费

先启动对应的mq服务,普通或者延时,然后启动DemoSwooleUdpServer.php 这个udp协程处理