lzw7758 / php_redis_mq
PHP Redis消息队列完整实现方案,支持可靠队列、重试机制、死信队列、多进程消费等功能
1.0.3
2025-08-07 07:52 UTC
Requires
- php: >=7.4
- ext-json: *
- ext-redis: *
- topthink/framework: ^6.0.0
- topthink/think-view: ^1.0
Requires (Dev)
- phpstan/phpstan: ^1.0
- phpunit/phpunit: ^9.0
This package is not auto-updated.
Last update: 2025-08-08 06:13:54 UTC
README
一个功能完整的PHP Redis消息队列实现,支持可靠队列、重试机制、死信队列、多进程消费等功能。
特性
- ✅ 可靠队列: 使用BRPOPLPUSH确保消息不丢失
- ✅ 重试机制: 指数退避重试策略
- ✅ 死信队列: 处理无法处理的消息
- ✅ 进程管理: 心跳检测和优雅关闭
- ✅ 监控告警: 完整的队列状态监控
- ✅ 多进程支持: 支持多进程并发消费
- ✅ 连接管理: 自动重连和连接池
- ✅ 错误处理: 完善的异常处理机制
- ✅ 优先级队列: 支持高、中、低优先级
- ✅ 延迟队列: 支持延迟消息处理
- ✅ 批量操作: 支持批量推送消息
- ✅ Web管理界面: 提供可视化的队列管理界面
安装
composer require Lzw7758/php_redis_mq
快速开始
基本使用
<?php
use Lzw7758\PhpRedisMq\QueueManager;
// 初始化队列管理器
$queueManager = new QueueManager([
'host' => '127.0.0.1',
'port' => 6379,
'password' => null,
'database' => 0,
'max_retry_count' => 3,
]);
// 推送消息
$messageId = $queueManager->push('email_queue', [
'to' => 'user@example.com',
'subject' => '测试邮件',
'body' => '这是一封测试邮件'
]);
// 消费消息
$queueManager->consume('email_queue', function($message, $messageData) {
echo "处理消息: " . $messageData['id'] . "\n";
// 处理消息逻辑
$result = sendEmail($message['to'], $message['subject'], $message['body']);
return $result; // 返回true表示处理成功,false表示处理失败
});
高级功能
优先级队列
// 高优先级消息
$queueManager->push('email_queue', $message, 'high');
// 低优先级消息
$queueManager->push('email_queue', $message, 'low');
延迟队列
// 延迟60秒处理
$queueManager->pushDelayed('email_queue', $message, 60);
批量推送
$messages = [
['to' => 'user1@example.com', 'subject' => '邮件1'],
['to' => 'user2@example.com', 'subject' => '邮件2'],
['to' => 'user3@example.com', 'subject' => '邮件3'],
];
$messageIds = $queueManager->pushBatch('email_queue', $messages);
多进程消费
// 启动4个进程并发消费
$queueManager->consumeMulti('email_queue', function($message, $messageData) {
// 消息处理逻辑
return true;
}, [], 4);
常量定义
项目提供了完整的常量定义,用于统一配置管理和错误处理:
使用常量
use Lzw7758\PhpRedisMq\Constants;
// 使用预定义的优先级
$messageId = $queueManager->push('email_queue', $message, Constants::PRIORITY_HIGH);
// 使用预定义的消息状态
$messageData[Constants::MESSAGE_FIELD_STATUS] = Constants::MESSAGE_STATUS_PENDING;
// 使用预定义的错误码
if ($error) {
throw new Exception(Constants::getErrorMessage(Constants::ERROR_CODE_MESSAGE_PUSH_FAILED));
}
// 获取默认配置
$config = Constants::getDefaultConfig();
$consumerOptions = Constants::getDefaultConsumerOptions();
主要常量分类
- 队列类型常量: 定义不同类型的队列前缀
- 优先级常量: 定义消息优先级级别
- 消息状态常量: 定义消息生命周期状态
- 工作者状态常量: 定义工作者运行状态
- 重试策略常量: 定义消息重试策略
- 错误码常量: 定义系统错误码
- 配置常量: 定义默认配置选项
详细使用说明请参考 常量使用文档。
消息处理器系统
项目提供了完整的消息处理器系统,支持灵活、可扩展的消息处理机制:
使用消息处理器
use Lzw7758\PhpRedisMq\HandlerManager;
use Lzw7758\PhpRedisMq\Handlers\EmailMessageHandler;
use Lzw7758\PhpRedisMq\Constants;
// 创建处理器管理器
$handlerManager = new HandlerManager();
// 创建并注册邮件处理器
$emailHandler = new EmailMessageHandler([
'email' => [
'default_from' => 'noreply@example.com',
'smtp_host' => 'localhost',
]
]);
$handlerManager->registerHandler($emailHandler);
// 处理消息
$message = [
Constants::MESSAGE_FIELD_TYPE => 'email',
'to' => 'user@example.com',
'subject' => '测试邮件',
'body' => '邮件内容'
];
$results = $handlerManager->handleMessage($message, $messageData);
主要特性
- 灵活的消息处理: 支持多种消息类型和处理器
- 完整的生命周期管理: 初始化、处理、清理
- 强大的错误处理: 重试策略和异常处理
- 丰富的监控功能: 统计信息和健康检查
- 可扩展的架构: 易于创建自定义处理器
详细使用说明请参考 消息处理器文档。
Web管理界面
项目提供了完整的Web管理界面,支持可视化的队列监控和管理:
快速集成
安装包
composer require Lzw7758/php_redis_mq
配置路由
// 在 config/route.php 中添加 Route::any('queue-manage/:action?', function ($action = 'index') { $handler = new \Lzw7758\PhpRedisMq\Web\ThinkPhpWebHandler(); $_GET['action'] = $action; return $handler->handle(); });
访问管理界面
http://your-domain.com/queue-manage/
主要特性
- ✅ 零配置: 无需编写控制器代码
- ✅ 一行路由: 一行配置搞定所有功能
- ✅ 完整功能: 支持所有队列管理操作
- ✅ 可扩展: 支持自定义配置和中间件
- ✅ ThinkPHP原生: 完全兼容ThinkPHP框架
详细使用说明请参考 ThinkPHP集成指南。
配置选项
$config = [
// Redis连接配置
'host' => '127.0.0.1',
'port' => 6379,
'password' => null,
'database' => 0,
// 连接超时配置
'connection_timeout' => 5,
'read_timeout' => 5,
'write_timeout' => 5,
// 重试配置
'retry_attempts' => 3,
'retry_delay' => 1000000, // 1秒
'max_retry_count' => 3,
// 处理配置
'max_processing_time' => 300,
'heartbeat_interval' => 30,
];
监控和管理
获取队列统计信息
$stats = $queueManager->getQueueStats('email_queue');
echo "主队列长度: " . $stats['main_queue_length'] . "\n";
echo "处理中队列长度: " . $stats['processing_length'] . "\n";
echo "死信队列长度: " . $stats['dead_queue_length'] . "\n";
获取性能指标
$metrics = $queueManager->getPerformanceMetrics('email_queue');
echo "成功率: " . $metrics['success_rate'] . "%\n";
echo "失败率: " . $metrics['failure_rate'] . "%\n";
echo "处理速率: " . $metrics['processing_rate_per_minute'] . " 消息/分钟\n";
清理过期数据
// 清理24小时前的数据
$result = $queueManager->cleanupExpiredData('email_queue', 86400);
echo "清理了 " . $result['dead_messages'] . " 条死信消息\n";
处理重试队列
$result = $queueManager->processRetryQueues('email_queue');
echo "处理了 " . $result['processed'] . " 条重试消息\n";
完整示例
生产者示例
<?php
use Lzw7758\PhpRedisMq\QueueManager;
$queueManager = new QueueManager([
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0,
]);
// 发送普通消息
$messageId = $queueManager->push('email_queue', [
'to' => 'user@example.com',
'subject' => '欢迎邮件',
'body' => '欢迎使用我们的服务!'
]);
// 发送高优先级消息
$messageId = $queueManager->push('email_queue', [
'to' => 'admin@example.com',
'subject' => '紧急通知',
'body' => '系统出现异常,请立即处理!'
], 'high');
// 发送延迟消息
$messageId = $queueManager->pushDelayed('email_queue', [
'to' => 'user@example.com',
'subject' => '提醒邮件',
'body' => '您的订单即将到期,请及时处理。'
], 3600); // 1小时后发送
echo "消息发送成功,ID: {$messageId}\n";
消费者示例
<?php
use Lzw7758\PhpRedisMq\QueueManager;
$queueManager = new QueueManager([
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0,
]);
// 定义消息处理函数
$messageHandler = function($message, $messageData) {
echo "处理消息: " . $messageData['id'] . "\n";
echo "优先级: " . $messageData['priority'] . "\n";
echo "重试次数: " . $messageData['attempts'] . "\n";
try {
// 模拟邮件发送
$result = sendEmail($message['to'], $message['subject'], $message['body']);
if ($result) {
echo "邮件发送成功: {$message['to']}\n";
return true; // 处理成功
} else {
echo "邮件发送失败: {$message['to']}\n";
return false; // 处理失败,将进入重试队列
}
} catch (Exception $e) {
echo "处理异常: " . $e->getMessage() . "\n";
return false; // 处理失败
}
};
// 启动消费者
$queueManager->consume('email_queue', $messageHandler, [
'timeout' => 5,
'max_memory' => 128 * 1024 * 1024, // 128MB
'max_execution_time' => 3600,
]);
多进程消费者示例
<?php
use Lzw7758\PhpRedisMq\QueueManager;
$queueManager = new QueueManager([
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0,
]);
// 启动4个进程并发消费
$queueManager->consumeMulti('email_queue', function($message, $messageData) {
echo "进程 " . getmypid() . " 处理消息: " . $messageData['id'] . "\n";
// 处理消息逻辑
$result = sendEmail($message['to'], $message['subject'], $message['body']);
return $result;
}, [
'timeout' => 5,
'max_memory' => 128 * 1024 * 1024,
'max_execution_time' => 3600,
], 4);
监控脚本示例
<?php
use Lzw7758\PhpRedisMq\QueueManager;
$queueManager = new QueueManager([
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0,
]);
// 获取队列统计信息
$stats = $queueManager->getQueueStats('email_queue');
echo "=== 队列统计信息 ===\n";
echo "主队列长度: " . $stats['main_queue_length'] . "\n";
echo "高优先级队列长度: " . $stats['high_queue_length'] . "\n";
echo "低优先级队列长度: " . $stats['low_queue_length'] . "\n";
echo "处理中队列长度: " . $stats['processing_length'] . "\n";
echo "延迟队列长度: " . $stats['delayed_queue_length'] . "\n";
echo "死信队列长度: " . $stats['dead_queue_length'] . "\n";
echo "总长度: " . $stats['total_length'] . "\n";
// 获取性能指标
$metrics = $queueManager->getPerformanceMetrics('email_queue');
echo "\n=== 性能指标 ===\n";
echo "总推送数: " . $metrics['total_pushed'] . "\n";
echo "总处理数: " . $metrics['total_processed'] . "\n";
echo "总失败数: " . $metrics['total_failed'] . "\n";
echo "成功率: " . $metrics['success_rate'] . "%\n";
echo "失败率: " . $metrics['failure_rate'] . "%\n";
echo "处理速率: " . $metrics['processing_rate_per_minute'] . " 消息/分钟\n";
// 获取活跃工作者
$workers = $queueManager->getActiveWorkers('email_queue');
echo "\n=== 活跃工作者 ===\n";
foreach ($workers as $worker) {
echo "工作者ID: " . $worker['worker_id'] . "\n";
echo "最后心跳: " . date('Y-m-d H:i:s', $worker['last_heartbeat']) . "\n";
echo "是否活跃: " . ($worker['is_active'] ? '是' : '否') . "\n";
echo "---\n";
}
部署建议
系统配置
# 增加文件描述符限制
ulimit -n 65536
# 调整内核参数
echo 'net.core.somaxconn = 65535' >> /etc/sysctl.conf
sysctl -p
Redis配置优化
# redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru
timeout 300
tcp-keepalive 60
进程管理脚本
#!/bin/bash
# start_consumers.sh
QUEUE_NAME="email_queue"
WORKER_COUNT=4
SCRIPT_PATH="/path/to/consumer.php"
for i in $(seq 1 $WORKER_COUNT); do
php $SCRIPT_PATH --queue=$QUEUE_NAME --worker-id=worker_$i &
done
wait
注意事项
- Redis扩展: 确保PHP安装了Redis扩展
- PCNTL扩展: 多进程功能需要PCNTL扩展支持
- 内存限制: 根据实际情况调整内存限制
- 连接池: 建议在生产环境中使用连接池
- 监控告警: 建议设置队列积压告警
- 数据备份: 定期备份Redis数据
许可证
MIT License
贡献
欢迎提交Issue和Pull Request!
更新日志
v1.0.0
- 初始版本发布
- 支持基本的消息队列功能
- 支持重试机制和死信队列
- 支持多进程消费
- 支持优先级队列和延迟队列
- 提供完整的监控和管理功能