itholiday / rabbitmq-pool
easyswoole rabbitmq pool
v1.0.0
2020-06-24 07:22 UTC
Requires
- easyswoole/component: ^2.0
- easyswoole/pool: ^1.0
- easyswoole/spl: ^1.1
- php-amqplib/php-amqplib: ^2.11
Requires (Dev)
This package is auto-updated.
Last update: 2025-03-24 18:41:43 UTC
README
easyswoole rabbitmq pool
第一步-配置:
'RABBITMQ_1' => [ 'host' => '192.168.199.217', 'port' => 5672, 'user' => 'jddtest', 'passwd' => 'jddtest', 'vhost' => '/' ], 'RABBITMQ_CHANNEL_1' => [ 'exchange' => 'router', 'queue' => 'msgs', 'routekey' => '', 'connectionPoolName' => 'rabbitmq-pool-1' ],
第二步-EasySwooleEvent.php文件中initialize方法中注册:
class EasySwooleEvent implements Event { public static function initialize() { // TODO: Implement initialize() method. date_default_timezone_set('Asia/Shanghai'); // rabbitmq-pool $rabbitmqPoolConfig = RabbitMQ::getInstance()->register('rabbitmq-pool-1', new RabbitMQConfig(Config::getInstance()->getConf('RABBITMQ_1'))); $rabbitmqPoolConfig->setMinObjectNum(1); $rabbitmqPoolConfig->getMaxObjectNum(2); $rabbitmqPoolConfig->setIntervalCheckTime(30); $rabbitmqPoolConfig->setMaxIdleTime(30000000); $rabbitmqChannelPoolConfig = RabbitMQChannel::getInstance()->register('rabbitmq-channel-pool', new RabbitMQChannelConfig(Config::getInstance()->getConf('RABBITMQ_CHANNEL_1'))); $rabbitmqChannelPoolConfig->setMaxIdleTime(3000000); }
第三步-使用:
public function rabbitmq() { $request = $this->request(); $params = $request->getRequestParam(); //rabbitmq-pool go(function () use ($params) { //直接初始化channel 如果连接重启EasySwoole\RabbitMQPool\RabbitMQChannelPool pool is empty $times = 3; while ($times > 0) { $res = $this->channelInvoke($params); if ($res === true) { break; } var_dump($res, $times); $times --; } // // 初始化connection // \EasySwoole\RabbitMQPool\RabbitMQ::invoke('rabbitmq-pool', function (AMQPStreamConnection $rabbitmq) use ($params) { // $channel = $rabbitmq->channel(); // $exchange = 'router'; // $exchange = $params['exchange']; // $queue = 'msgs'; // $queue = $params['queue']; // $msg = $params['msg']; // $channel->queue_declare($queue, false, true, false, false); // $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); // $channel->queue_bind($queue, $exchange); // $messageBody = "{'name'=>'jsk', 'age' => 12, 'msg'=> '失败的请求数量。因网络原因或服务器性能原因,发起的请求并不一定全部成功,通过'}"; // $messageBody = $msg; // $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); // $channel->basic_publish($message, $exchange); // $channel->close(); // }); }); $this->writeJson(200, $params); } private function channelInvoke($params) { try { $res = RabbitMQChannel::invoke('rabbitmq-channel-pool', function (AMQPChannel $channel) use ($params) { $exchange = 'router'; $exchange = $params['exchange']; $queue = 'msgs'; $queue = $params['queue']; $msg = $params['msg']; $messageBody = $msg; $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); // 如果connection关闭了,channel-pool不会一并回收。 try { $channel->basic_publish($message, $exchange); return true; } catch (\Exception $exception) { var_dump($exception->getMessage()); return false; } }); return $res; } catch (\Exception $exception) { var_dump($exception->getMessage()); return false; } }