roiwk / rabbitmq
rabbitmq async(workerman) and sync Client, Producer, Consumer
v2.0.0
2025-01-09 09:27 UTC
Requires
- php: ^8.0
- bunny/bunny: ^0.5.5
- illuminate/collections: ^8.0|^9.0|^10.0
- psr/log: ^1.0||^2.0||^3.0
- workerman/rabbitmq: ^2.0
Requires (Dev)
- monolog/monolog: ^2.0|^3.0
- revolt/event-loop: ^1.0
- workerman/workerman: ^5.0
README
rabbitmq async(workerman) and sync PHP Client, Producers, Consumers
rabbitmq 是一个异步(workerman)和同步的PHP客户端,用于异步(workerman)和同步的生产者和消费者。
Dependencies 依赖
Version Tag | Dependencies |
---|---|
^v1.0 | php >=8.0 workerman >= 4.0 |
^v2.0 | php >=8.1 workerman >= 5.0 |
Install 安装
composer require roiwk/rabbitmq
Usage 使用
Config Demo
// 配置格式 $config = [ 'host' => '127.0.0.1', 'port' => 5672, 'vhost' => '/', 'mechanism' => 'AMQPLAIN', 'user' => 'username', 'password' => 'password', 'timeout' => 10, 'heartbeat' => 60, 'heartbeat_callback' => function(){}, 'error_callback' => null, ];
Publisher Demo
// 同步发布者 sync Publisher Roiwk\Rabbitmq\Producer::connect($config)->publishSync('Hello World!', '', '', 'hello'); // 异步发布者 async Publisher(workerman) use Workerman\Worker; $worker = new Worker(); $worker->onWorkerStart = function() use($config) { Roiwk\Rabbitmq\Producer::connect($config)->publishAsync('Hello World!', '', '', 'hello'); }; Worker::runAll();
Consumer Demo
// 同步消费者 sync Consumer use Bunny\Channel; use Bunny\Message; use Bunny\AbstractClient; use Roiwk\Rabbitmq\AbstractConsumer; // style 1: $client = new Roiwk\Rabbitmq\Client($config, null, '', '', 'hello'); $client->syncProcess(function(Message $message, Channel $channel, AbstractClient $client){ echo " [x] Received ", $message->content, "\n"; $channel->ack(); }); // style 2: $consumer = new class ($config) extends AbstractConsumer { protected bool $async = false; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }; $consumer->onWorkerStart(null);
// 异步消费者 async Consumer(workerman) use Bunny\Channel; use Bunny\Message; use Workerman\Worker; use Bunny\AbstractClient; use Roiwk\Rabbitmq\AbstractConsumer; $worker = new Worker(); $consumer = new class ($config) extends AbstractConsumer { protected bool $async = true; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }; $worker->onWorkerStart = [$consumer, 'onWorkerStart']; Worker::runAll();
Advanced 高级用法
webman中自定义进程--消费者
1.process.php
'hello-rabbitmq' => [ 'handler' => app\queue\rabbitmq\Hello::class, 'count' => 1, 'constructor' => [ 'rabbitmqConfig' => $config, //'logger' => Log::channel('hello'), ], ]
2.app\queue\rabbitmq\Hello.php
namespace app\queue\rabbitmq; use Roiwk\Rabbitmq\AbstractConsumer; use Roiwk\Rabbitmq\Producer; use Bunny\Channel; use Bunny\Message; use Bunny\AbstractClient; class Hello extends AbstractConsumer { protected bool $async = true; protected string $queue = 'hello'; protected array $consume = [ 'noAck' => true, ]; public function consume(Message $message, Channel $channel, AbstractClient $client) { echo " [x] Received ", $message->content, "\n"; } }
webman中自定义进程--分组消费者
类似webman-queue插件, 分组将消费者放同一个文件夹下, 使用同一个worker, 多个进程数处理
1.process.php
'hello-rabbitmq' => [ 'handler' => Roiwk\Rabbitmq\GroupConsumers::class, 'count' => 2, 'constructor' => [ 'consumer_dir' => app_path().'/queue/rabbimq', 'rabbitmqConfig' => $config, //'logger' => Log::channel('hello'), ], ]
2.在 app_path().'/queue/rabbimq'
目录下创建php文件, 继承Roiwk\Rabbitmq\AbstractConsumer
即可, 同上app\queue\rabbitmq\Hello.php
Tips
- !!! 此库异步仅支持在workamn环境中, 同步环境都支持. 别的环境,如需异步, 请使用bunny的客户端