fangchaogang / phpmq
PHP MQ
v1.0.1
2021-10-27 12:32 UTC
Requires
- php: 7.*
- ext-curl: *
- ext-json: *
- enqueue/amqp-lib: 0.10.1
- pda/pheanstalk: ^4.0
Requires (Dev)
- phpunit/phpunit: 6.*
README
PHP队列集合(rabbitmq、redis、beanstalk)
安装
composer require fangchaogang/phpmq "v1.*"
使用rabbitmq
use phpmq\drivers\amqp_interop\Queue; $config = [ "host"=>"127.0.0.1", "port"=>5672, "user" => "root", "password" => "root", "vhost"=>"/" ]; $queue = new Queue($config); //---发消息 $job = new \phpmq\tests\TestJob(); $job->data = ['delay' => '5',]; //直接发 $queue->push($job); //延时发 $queue->delay(1)->push($job); //遇到错误N秒重试发 $queue->ttr(10)->push($job); //带routingKey发 $queue->setRoutingKey('modify')->push($job); //其他参考源码 //---监听 //直接监听 $queue->listen(); //带routingKey监听 $queue->regRoutingKeyCallback('modify', function ($messageData) { var_dump('this is modify routingKey', $messageData); })->listen();
使用redis
use phpmq\drivers\redis\Queue; $config = [ 'host' => '127.0.0.1', 'port' => 6379 ]; $queue = new Queue($config); //---发消息 $job = new \phpmq\tests\TestJob(); $job->data = ['delay' => '5',]; //直接发 $queue->push($job); //延时发 $queue->delay(1)->push($job); //遇到错误N秒重试发 $queue->ttr(10)->push($job); //---监听 //直接监听 $queue->listen();
使用beanstalk
use phpmq\drivers\beanstalk\Queue; $config = [ 'host'=>'150.158.185.89', ]; $queue = new Queue($config); //---发消息 $job = new \phpmq\tests\TestJob(); $job->data = ['delay' => '5',]; //直接发 $queue->push($job); //延时发 $queue->delay(1)->push($job); //遇到错误N秒重试发 $queue->ttr(10)->push($job); //---监听 //直接监听 $queue->listen();