kode / queue
一个现代化、高性能的 PHP 队列客户端,支持多种队列驱动,专为 PHP 8.1+ 设计,完美支持 PHP 8.5 管道操作符新特性
dev-main
2026-03-01 15:00 UTC
Requires (Dev)
- phpunit/phpunit: ^9.6|^10.0
Suggests
- ext-rdkafka: Kafka 驱动所需
- pda/pheanstalk: Beanstalkd 驱动所需
- php-amqplib/php-amqplib: AMQP 驱动所需
- predis/predis: Redis 驱动所需
This package is auto-updated.
Last update: 2026-03-01 15:07:52 UTC
README
一个现代化、高性能的 PHP 队列客户端,支持多种队列驱动,专为 PHP 8.1+ 设计,完美支持 PHP 8.5 管道操作符新特性。
目录
特性
- 多驱动支持(Redis、Beanstalkd、AMQP、Kafka、Database、Sync)
- 自动驱动检测与配置
- 统一的队列操作 API
- 支持延迟入队功能
- 支持批量操作(批量入队、批量出队)
- 支持队列监控功能
- 完善的异常处理机制
- 支持事务功能
- 中间件支持(重试、超时、日志等)
- 上下文管理(Context)
- 工具类支持(QueueUtil)
- 符合 PSR 规范(PSR-1、PSR-2、PSR-4、PSR-12)
- PHP 8.1+ 兼容,完美支持 PHP 8.5 管道操作符
安装
composer require kode/queue
环境要求
| 要求 | 版本 |
|---|---|
| PHP | >= 8.1 |
| ext-json | * |
| ext-pdo | * |
驱动依赖
| 驱动 | 依赖包 | 说明 | 安装命令 |
|---|---|---|---|
| Redis | predis/predis |
Redis 驱动所需 | composer require predis/predis |
| Beanstalkd | pda/pheanstalk |
Beanstalkd 驱动所需 | composer require pda/pheanstalk |
| AMQP | php-amqplib/php-amqplib |
AMQP 驱动所需 | composer require php-amqplib/php-amqplib |
| Kafka | ext-rdkafka |
PHP 扩展,Kafka 驱动所需 | pecl install rdkafka |
| Database | ext-pdo |
内置支持 | 内置 |
| Sync | 无 | 内置,用于测试 | 内置 |
快速开始
基本使用
use Kode\Queue\Factory; // 创建队列实例 $queue = Factory::create([ 'default' => 'redis', 'connections' => [ 'redis' => [ 'host' => '127.0.0.1', 'port' => 6379, ], ], ]); // 入队 $jobId = $queue->push('SendEmail', ['user_id' => 123]); // 延迟入队(60秒后执行) $jobId = $queue->later(60, 'SendEmail', ['user_id' => 123]); // 批量入队 $jobIds = $queue->bulk(['SendEmail', 'ProcessOrder'], ['user_id' => 123]); // 出队 $job = $queue->pop(); if ($job) { // 处理任务 echo $job['job']; // 任务名称 print_r($job['data']); // 任务数据 } // 查看队列长度 $size = $queue->size(); // 查看队列统计信息 $stats = $queue->stats();
使用指定驱动
use Kode\Queue\Factory; // 使用 Sync 驱动(同步执行,用于测试) $queue = Factory::createWithDriver('sync'); // 使用 Redis 驱动 $queue = Factory::createWithDriver('redis', [ 'host' => '127.0.0.1', 'port' => 6379, 'database' => 0, 'password' => null, ]); // 使用 Database 驱动 $queue = Factory::createWithDriver('database', [ 'dsn' => 'mysql:host=127.0.0.1;dbname=queue', 'username' => 'root', 'password' => '', 'table' => 'jobs', ]); // 使用 Beanstalkd 驱动 $queue = Factory::createWithDriver('beanstalkd', [ 'host' => '127.0.0.1', 'port' => 11300, 'tube' => 'default', ]); // 使用 AMQP 驱动 $queue = Factory::createWithDriver('amqp', [ 'host' => '127.0.0.1', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'vhost' => '/', 'queue' => 'default', ]); // 使用 Kafka 驱动 $queue = Factory::createWithDriver('kafka', [ 'bootstrap_servers' => '127.0.0.1:9092', 'topic' => 'queue', 'group_id' => 'queue-consumer', ]);
驱动配置
完整配置示例
$config = [ 'default' => 'redis', 'connections' => [ 'redis' => [ 'driver' => 'redis', 'host' => '127.0.0.1', 'port' => 6379, 'database' => 0, 'password' => null, 'options' => [], ], 'beanstalkd' => [ 'driver' => 'beanstalkd', 'host' => '127.0.0.1', 'port' => 11300, 'tube' => 'default', ], 'amqp' => [ 'driver' => 'amqp', 'host' => '127.0.0.1', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'vhost' => '/', 'queue' => 'default', ], 'kafka' => [ 'driver' => 'kafka', 'bootstrap_servers' => '127.0.0.1:9092', 'topic' => 'queue', 'group_id' => 'queue-consumer', ], 'sync' => [ 'driver' => 'sync', ], 'database' => [ 'driver' => 'database', 'dsn' => 'mysql:host=127.0.0.1;dbname=queue', 'username' => 'root', 'password' => '', 'table' => 'jobs', ], ], ]; $queue = Factory::create($config);
各驱动配置说明
Redis 驱动
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| host | string | 127.0.0.1 | Redis 服务器地址 |
| port | int | 6379 | Redis 端口 |
| database | int | 0 | 数据库编号 |
| password | string | null | null |
| options | array | [] | Predis 选项 |
Database 驱动
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| dsn | string | - | PDO DSN 连接字符串 |
| username | string | null | 数据库用户名 |
| password | string | null | 数据库密码 |
| table | string | jobs | 任务表名 |
Beanstalkd 驱动
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| host | string | 127.0.0.1 | Beanstalkd 服务器地址 |
| port | int | 11300 | Beanstalkd 端口 |
| tube | string | default | 管道名称 |
AMQP 驱动
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| host | string | 127.0.0.1 | RabbitMQ 服务器地址 |
| port | int | 5672 | RabbitMQ 端口 |
| username | string | guest | 用户名 |
| password | string | guest | 密码 |
| vhost | string | / | 虚拟主机 |
| queue | string | default | 队列名称 |
Kafka 驱动
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| bootstrap_servers | string | 127.0.0.1:9092 | Kafka 服务器地址 |
| topic | string | queue | 主题名称 |
| group_id | string | queue-consumer | 消费者组ID |
核心功能
全局队列和局部队列
use Kode\Queue\Factory; $queue = Factory::createWithDriver('sync'); // 获取全局队列实例(单例模式) $globalQueue1 = $queue->global('orders'); $globalQueue2 = $queue->global('orders'); // $globalQueue1 和 $globalQueue2 是同一个实例 // 全局队列应用场景: // - 系统级任务队列 // - 跨模块共享的队列 // - 需要在多个地方访问的队列 // 获取局部队列实例(每次创建新实例) $localQueue1 = $queue->local('emails'); $localQueue2 = $queue->local('emails'); // $localQueue1 和 $localQueue2 是不同实例 // 局部队列应用场景: // - 临时任务队列 // - 单次操作的队列 // - 不需要跨模块共享的队列
使用事务
use Kode\Queue\Factory; $queue = Factory::createWithDriver('database'); try { $queue->beginTransaction(); // 执行多个队列操作 $jobId1 = $queue->push('Task1', ['data' => 'value1']); $jobId2 = $queue->push('Task2', ['data' => 'value2']); $queue->commit(); } catch (\Exception $e) { $queue->rollback(); // 处理异常 }
延迟任务
use Kode\Queue\Factory; $queue = Factory::createWithDriver('sync'); // 延迟 60 秒执行 $jobId = $queue->later(60, 'SendEmail', ['user_id' => 123]); // 延迟 5 分钟执行 $jobId = $queue->later(300, 'ProcessOrder', ['order_id' => 456]); // 延迟 1 小时执行 $jobId = $queue->later(3600, 'CleanupTask', ['type' => 'logs']);
批量操作
use Kode\Queue\Factory; $queue = Factory::createWithDriver('sync'); // 批量推送任务 $jobIds = $queue->bulk([ 'SendEmail', 'SendSms', 'SendPush', ], ['user_id' => 123]); // 批量推送不同数据的任务 $jobs = [ ['job' => 'SendEmail', 'data' => ['user_id' => 1]], ['job' => 'SendEmail', 'data' => ['user_id' => 2]], ['job' => 'SendEmail', 'data' => ['user_id' => 3]], ]; foreach ($jobs as $job) { $queue->push($job['job'], $job['data']); }
中间件
内置中间件
| 中间件 | 说明 | 参数 |
|---|---|---|
LogMiddleware |
记录队列操作日志 | callable $logger (可选) |
RetryMiddleware |
自动重试失败的任务 | int $maxAttempts, int $delay, float $multiplier |
RateLimitMiddleware |
限制队列操作速率 | int $capacity, float $rate |
使用中间件
use Kode\Queue\Factory; use Kode\Queue\Middleware\RetryMiddleware; use Kode\Queue\Middleware\LogMiddleware; use Kode\Queue\Middleware\RateLimitMiddleware; $queue = Factory::createWithDriver('sync'); // 添加中间件 $queue->addMiddleware(new RetryMiddleware(3, 100, 2.0)) ->addMiddleware(new LogMiddleware()) ->addMiddleware(new RateLimitMiddleware(10, 1)); // 使用队列 $jobId = $queue->push('SendEmail', ['user_id' => 123]);
自定义中间件
use Kode\Queue\Middleware\MiddlewareInterface; class CustomMiddleware implements MiddlewareInterface { public function handle(callable $next, string $method, array $parameters) { // 前置处理 echo "Before: $method\n"; // 执行下一个处理器 $result = $next($parameters); // 后置处理 echo "After: $method\n"; return $result; } } // 使用自定义中间件 $queue->addMiddleware(new CustomMiddleware());
上下文管理
Context 类用于封装和管理队列任务的上下文信息,提供便捷的方法来访问和操作任务数据。
创建上下文
use Kode\Queue\Context\Context; // 从任务数据创建上下文 $context = new Context([ 'id' => 'job_123', 'job' => 'SendEmail', 'data' => ['user_id' => 123], 'attempts' => 0, 'created_at' => time(), ]); // 从数组创建 $context = Context::fromArray($jobData); // 从 JSON 创建 $context = Context::fromJson('{"id":"job_123","job":"SendEmail",...}');
获取任务信息
// 获取任务ID $jobId = $context->getJobId(); // 'job_123' // 获取任务名称 $jobName = $context->getJob(); // 'SendEmail' // 获取任务数据 $jobData = $context->getData(); // ['user_id' => 123] // 获取重试次数 $attempts = $context->getAttempts(); // 0 // 获取创建时间 $createdAt = $context->getCreatedAt(); // 时间戳 // 获取队列名称 $queue = $context->getQueue(); // 队列名称 // 获取完整负载 $payload = $context->getPayload(); // 完整数组数据
操作上下文
// 增加重试次数 $context->incrementAttempts(); // 转换为数组 $payload = $context->toArray(); // 转换为 JSON $json = $context->toJson();
在任务处理器中使用
use Kode\Queue\Context\Context; class SendEmailHandler { public function handle(array $jobData) { $context = new Context($jobData); // 获取任务信息 $userId = $context->getData()['user_id']; $attempts = $context->getAttempts(); // 执行任务逻辑 try { $this->sendEmail($userId); } catch (\Exception $e) { // 记录重试 $context->incrementAttempts(); // 判断是否超过最大重试次数 if ($context->getAttempts() >= 3) { throw $e; } // 重新入队 return false; } return true; } }
工具类
QueueUtil 类提供队列操作中常用的工具方法。
生成任务ID
use Kode\Queue\Util\QueueUtil; // 生成唯一任务ID $jobId = QueueUtil::generateJobId(); // 输出: 67e8f12345678.12345678
创建和解析负载
use Kode\Queue\Util\QueueUtil; // 创建任务负载 $payload = QueueUtil::createPayload('SendEmail', ['user_id' => 123]); // 输出: {"job":"SendEmail","data":{"user_id":123},"id":"...","attempts":0,"created_at":1234567890} // 解析任务负载 $data = QueueUtil::parsePayload($payload); // 输出: ['job' => 'SendEmail', 'data' => ['user_id' => 123], ...]
检查任务状态
use Kode\Queue\Util\QueueUtil; // 检查任务是否就绪(用于延迟任务) $job = ['available_at' => time() - 10]; $isReady = QueueUtil::isJobReady($job); // true $job = ['available_at' => time() + 60]; $isReady = QueueUtil::isJobReady($job); // false
计算延迟
use Kode\Queue\Util\QueueUtil; // 计算延迟后的时间戳 $timestamp = QueueUtil::calculateDelay(60); // 当前时间 + 60秒
格式化队列大小
use Kode\Queue\Util\QueueUtil; // 格式化队列大小 echo QueueUtil::formatSize(500); // "500" echo QueueUtil::formatSize(1500); // "1.50K" echo QueueUtil::formatSize(1500000); // "1.50M"
验证名称
use Kode\Queue\Util\QueueUtil; // 验证队列名称 QueueUtil::validateQueueName('my-queue'); // true QueueUtil::validateQueueName('my_queue'); // true QueueUtil::validateQueueName('my.queue'); // true QueueUtil::validateQueueName('my queue'); // false (包含空格) // 验证任务名称 QueueUtil::validateJobName('SendEmail'); // true QueueUtil::validateJobName('App\Jobs\SendEmail'); // true QueueUtil::validateJobName('123SendEmail'); // false (数字开头)
指数退避
use Kode\Queue\Util\QueueUtil; // 计算指数退避延迟时间(毫秒) QueueUtil::exponentialBackoff(1, 100, 2.0); // 100ms (第1次重试) QueueUtil::exponentialBackoff(2, 100, 2.0); // 200ms (第2次重试) QueueUtil::exponentialBackoff(3, 100, 2.0); // 400ms (第3次重试) QueueUtil::exponentialBackoff(4, 100, 2.0); // 800ms (第4次重试) // 自定义参数 QueueUtil::exponentialBackoff(3, 50, 1.5); // 112ms (基础延迟50ms,乘数1.5)
PHP 8.5 管道操作符支持
PHP 8.5 引入了管道操作符 |>,本包完美支持这一新特性。
传统写法
use Kode\Queue\Factory; use Kode\Queue\Middleware\LogMiddleware; $queue = Factory::createWithDriver('sync'); $queue->addMiddleware(new LogMiddleware()); $jobId = $queue->push('SendEmail', ['user_id' => 123]);
PHP 8.5 管道操作符写法
use Kode\Queue\Factory; use Kode\Queue\Middleware\LogMiddleware; use Kode\Queue\Middleware\RetryMiddleware; // 基本管道操作 $jobId = Factory::createWithDriver('sync') |> fn($q) => $q->push('SendEmail', ['user_id' => 123]); // 管道操作与中间件 $jobId = Factory::createWithDriver('sync') |> fn($q) => $q->addMiddleware(new LogMiddleware()) |> fn($q) => $q->addMiddleware(new RetryMiddleware(3, 100, 2.0)) |> fn($q) => $q->push('SendEmail', ['user_id' => 123]); // 管道操作与全局队列 $jobId = Factory::createWithDriver('sync') |> fn($q) => $q->global('emails') |> fn($q) => $q->push('SendEmail', ['user_id' => 123]); // 管道操作与延迟任务 $jobId = Factory::createWithDriver('sync') |> fn($q) => $q->later(60, 'DelayedJob', ['data' => 'delayed']); // 管道操作与批量任务 $jobIds = Factory::createWithDriver('sync') |> fn($q) => $q->bulk(['Job1', 'Job2', 'Job3'], ['data' => 'bulk']);
兼容 PHP 8.1-8.4 的管道方法
对于 PHP 8.5 以下版本,可以使用 pipe() 方法实现类似功能:
use Kode\Queue\Factory; use Kode\Queue\Middleware\LogMiddleware; // 使用 pipe 方法 $jobId = Factory::createWithDriver('sync') ->pipe(fn($q) => $q->addMiddleware(new LogMiddleware())) ->pipe(fn($q) => $q->push('SendEmail', ['user_id' => 123])); // 链式 pipe 调用 $result = Factory::createWithDriver('sync') ->pipe(fn($q) => $q->addMiddleware(new LogMiddleware())) ->pipe(fn($q) => $q->push('Task1', ['data' => 'value1'])) ->pipe(fn($q) => $q->push('Task2', ['data' => 'value2'])) ->pipe(fn($q) => $q->size());
API 参考
QueueInterface
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
push |
$job, array $data = [], string $queue = null |
string |
推送任务到队列 |
pushRaw |
string $payload, string $queue = null, array $options = [] |
string |
推送原始负载到队列 |
later |
int $delay, $job, array $data = [], string $queue = null |
string |
延迟推送任务到队列 |
bulk |
array $jobs, array $data = [], string $queue = null |
array |
批量推送任务到队列 |
pop |
string $queue = null |
mixed |
从队列中取出下一个任务 |
size |
string $queue = null |
int |
获取队列大小 |
delete |
string $jobId, string $queue = null |
bool |
从队列中删除任务 |
release |
int $delay, string $jobId, string $queue = null |
bool |
将任务释放回队列 |
stats |
string $queue = null |
array |
获取队列统计信息 |
beginTransaction |
- | void |
开始事务 |
commit |
- | void |
提交事务 |
rollback |
- | void |
回滚事务 |
global |
string $queue = 'default' |
QueueInterface |
获取全局队列实例 |
local |
string $queue = 'default' |
QueueInterface |
获取局部队列实例 |
pipe |
callable $callback |
mixed |
管道操作 |
addMiddleware |
MiddlewareInterface $middleware |
$this |
添加中间件 |
项目结构
src/
├── Driver/ # 驱动目录
│ ├── DriverInterface.php # 驱动接口
│ ├── SyncDriver.php # 同步驱动
│ ├── DatabaseDriver.php # 数据库驱动
│ ├── RedisDriver.php # Redis 驱动
│ ├── BeanstalkdDriver.php # Beanstalkd 驱动
│ ├── AmqpDriver.php # AMQP 驱动
│ └── KafkaDriver.php # Kafka 驱动
├── Exception/ # 异常目录
│ ├── QueueException.php # 队列异常基类
│ ├── DriverException.php # 驱动异常
│ └── TransactionException.php # 事务异常
├── Middleware/ # 中间件目录
│ ├── MiddlewareInterface.php # 中间件接口
│ ├── LogMiddleware.php # 日志中间件
│ ├── RetryMiddleware.php # 重试中间件
│ └── RateLimitMiddleware.php # 限流中间件
├── Context/ # 上下文目录
│ └── Context.php # 队列操作上下文
├── Util/ # 工具目录
│ └── QueueUtil.php # 队列工具类
├── AbstractQueue.php # 抽象队列基类
├── Factory.php # 工厂类
└── QueueInterface.php # 队列接口
测试
# 运行测试 composer test # 生成测试覆盖率报告 composer test-coverage