devweyes / queue
queue for swoft
v1.1
2020-01-10 09:33 UTC
Requires
- php: >7.1
- ext-json: *
- ext-mbstring: *
- ext-redis: *
- swoft/bean: ~2.0.0
- swoft/redis: ~2.0.0
- swoft/serialize: ~2.0.0
- swoft/stdlib: ~2.0.0
This package is not auto-updated.
Last update: 2025-03-02 09:00:01 UTC
README
1. 介绍
用户进程/进程池与queue的完美结合,注解切面版queue,支持 redis,rabbitmq(等官方发基础包 开发中)
2. 使用
composer
composer require devweyes/queue
主要配置
<?php return [ Queue::MANAGER => [ 'class' => QueueManager::class, 'driver' => bean(Queue::DRIVER), 'serverIdPrefix' => 'swoft_ws_server_cluster_' ], Queue::DRIVER => [ 'class' => RedisQueue::class, 'redis' => bean('redis.pool'), 'serializer' => bean(Queue::SERIALIZER), 'prefix' => 'swoft_queue_', 'default' => 'default', 'waite' => 10, 'retry' => 3 ], Queue::SERIALIZER => [ 'class' => PhpSerializer::class ] ];
用户进程更多消费者
新增进程数量配置
<?php use Jcsp\Queue\Helper\Tool; ... 'wsServer' => [ 'class' => \Swoft\WebSocket\Server\WebSocketServer::class, ... //可配置多个消息消费,视业务量而定 'process' => array_merge( Tool::moreProcess('recvMessageProcess', bean(\Jcsp\WsCluster\Process\RecvMessageProcess::class), 3), [ //自定义进程 ] ) ]
数据手动生产
<?php use Jcsp\Queue\Queue; Queue::bind('queue')->push('this is message');
redis驱动queue消费
- redis内存数据库,并不可靠但性能及高,数据量小于10K出队入队速度显著,非常适用于实时异步短消息传输。否则,请使用rabbitmq或其他作为驱动
- 用户进程需继承
Jcsp\Queue\Contract\UserProcess
- 进程池需实现
Jcsp\Queue\Contract\ProcessInterface
- 进程需包含三个方法
run()
,receive()
,fallback()
,分别实现入口
,消费
,错误处理
逻辑 run
方法内无需再实现while(true)
的业务,甚至无需任何代码run()
方法内严禁使用exit()
return
等,$this->queue
用于自定义队列名以覆盖注解receive
内return Result::ACK
为正确消费,其他方法或发生异常均视为消费失败
<?php declare(strict_types=1); namespace App\Process; use Jcsp\Queue\Annotation\Mapping\Pull; use Jcsp\Queue\Result; use Swoft\Bean\Annotation\Mapping\Bean; use Swoft\Bean\Annotation\Mapping\Inject; use Swoft\Bean\BeanFactory; use Swoft\Log\Helper\CLog; use Swoft\Process\Process; use Jcsp\Queue\Contract\UserProcess; /** * Class MonitorProcess * * @since 2.0 * * @Bean() */ class RecvMessageProcess extends UserProcess { /** * @param Process $process * @Pull("queue") */ public function run(Process $process): void { //add queue $this->queue = 'new_queue'; //waite } /** * customer * @param $message * @return string */ public function receive($message): string { return Result::ACK; } /** * when error callback * @param $message * @return string */ public function fallback(\Throwable $throwable, $message): void { // vdump('error', $throwable->getMessage(), 'message',$message); } }
rabbitmq驱动queue消费
In development