roiwk/rabbitmq

rabbitmq async(workerman) and sync Client, Producer, Consumer

v1.2.0 2023-12-19 01:29 UTC

This package is auto-updated.

Last update: 2024-04-23 07:32:23 UTC


README

rabbitmq async(workerman) and sync PHP Client, Producers, Consumers

rabbitmq 是一个异步(workerman)和同步的PHP客户端,用于异步(workerman)和同步的生产者和消费者。

Dependencies 依赖

php >= 8.0

Install 安装

composer require roiwk/rabbitmq

Usage 使用

All demo

Config Demo

// 配置格式
$config = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'vhost' => '/',
    'mechanism' => 'AMQPLAIN',
    'user' => 'username',
    'password' => 'password',
    'timeout' => 10,
    'heartbeat' => 60,
    'heartbeat_callback' => function(){},
    'error_callback'     => null,
];

Publisher Demo

// 同步发布者  sync Publisher
Roiwk\Rabbitmq\Producer::connect($config)->publishSync('Hello World!', '', '', 'hello');

// 异步发布者  async Publisher(workerman)

use Workerman\Worker;

$worker = new Worker();
$worker->onWorkerStart = function() use($config) {
    Roiwk\Rabbitmq\Producer::connect($config)->publishAsync('Hello World!', '', '', 'hello');
};
Worker::runAll();

Consumer Demo

// 同步消费者  sync Consumer

use Bunny\AbstractClient;
use Roiwk\Rabbitmq\AbstractConsumer;

// style 1:
$client = new Roiwk\Rabbitmq\Client($config, null, '', '', 'hello');
$client->syncProcess(function(Message $message, Channel $channel, AbstractClient $client){
    echo " [x] Received ", $message->content, "\n";
    $channel->ack();
});

// style 2:
$consumer = new class ($config) extends AbstractConsumer {
    protected bool $async = false;
    protected string $queue = 'hello';
    protected array $consume = [
        'noAck' => true,
    ];
    public function consume(Message $message, Channel $channel, AbstractClient $client)
    {
        echo " [x] Received ", $message->content, "\n";
    }
};
$consumer->onWorkerStart(null);
// 异步消费者  async Consumer(workerman)

use Workerman\Worker;
use Bunny\AbstractClient;
use Roiwk\Rabbitmq\AbstractConsumer;

$worker = new Worker();

$consumer = new class ($config) extends AbstractConsumer {

    protected bool $async = true;

    protected string $queue = 'hello';

    protected array $consume = [
        'noAck' => true,
    ];

    public function consume(Message $message, Channel $channel, AbstractClient $client)
    {
        echo " [x] Received ", $message->content, "\n";
    }
};

$worker->onWorkerStart = [$consumer, 'onWorkerStart'];
Worker::runAll();

Advanced 高级用法

webman中自定义进程--消费者

1.process.php

'hello-rabbitmq' => [
    'handler' => app\queue\rabbitmq\Hello::class,
    'count'   => 1,
    'constructor' => [
        'rabbitmqConfig' => $config,
        //'logger' => Log::channel('hello'),
    ],
]

2.app\queue\rabbitmq\Hello.php

namespace app\queue\rabbitmq;

use Roiwk\Rabbitmq\AbstractConsumer;
use Roiwk\Rabbitmq\Producer;
use Bunny\Channel;
use Bunny\Message;
use Bunny\AbstractClient;

class Hello extends AbstractConsumer
{
    protected bool $async = true;

    protected string $queue = 'hello';

    protected array $consume = [
        'noAck' => true,
    ];

    public function consume(Message $message, Channel $channel, AbstractClient $client)
    {
        echo " [x] Received ", $message->content, "\n";
    }
}

webman中自定义进程--分组消费者

类似webman-queue插件, 分组将消费者放同一个文件夹下, 使用同一个worker, 多个进程数处理
1.process.php

'hello-rabbitmq' => [
    'handler' => Roiwk\Rabbitmq\GroupConsumers::class,
    'count'   => 2,
    'constructor' => [
        'consumer_dir' => app_path().'/queue/rabbimq',
        'rabbitmqConfig' => $config,
        //'logger' => Log::channel('hello'),
    ],
]

2.在 app_path().'/queue/rabbimq' 目录下创建php文件, 继承Roiwk\Rabbitmq\AbstractConsumer即可, 同上app\queue\rabbitmq\Hello.php

Tips

  1. !!! 此库异步仅支持在workamn环境中, 同步环境都支持. 别的环境,如需异步, 请使用bunny的客户端