lzw7758/php_redis_mq

PHP Redis消息队列完整实现方案,支持可靠队列、重试机制、死信队列、多进程消费等功能

1.0.3 2025-08-07 07:52 UTC

This package is not auto-updated.

Last update: 2025-08-08 06:13:54 UTC


README

一个功能完整的PHP Redis消息队列实现,支持可靠队列、重试机制、死信队列、多进程消费等功能。

特性

  • 可靠队列: 使用BRPOPLPUSH确保消息不丢失
  • 重试机制: 指数退避重试策略
  • 死信队列: 处理无法处理的消息
  • 进程管理: 心跳检测和优雅关闭
  • 监控告警: 完整的队列状态监控
  • 多进程支持: 支持多进程并发消费
  • 连接管理: 自动重连和连接池
  • 错误处理: 完善的异常处理机制
  • 优先级队列: 支持高、中、低优先级
  • 延迟队列: 支持延迟消息处理
  • 批量操作: 支持批量推送消息
  • Web管理界面: 提供可视化的队列管理界面

安装

composer require Lzw7758/php_redis_mq

快速开始

基本使用

<?php

use Lzw7758\PhpRedisMq\QueueManager;

// 初始化队列管理器
$queueManager = new QueueManager([
    'host' => '127.0.0.1',
    'port' => 6379,
    'password' => null,
    'database' => 0,
    'max_retry_count' => 3,
]);

// 推送消息
$messageId = $queueManager->push('email_queue', [
    'to' => 'user@example.com',
    'subject' => '测试邮件',
    'body' => '这是一封测试邮件'
]);

// 消费消息
$queueManager->consume('email_queue', function($message, $messageData) {
    echo "处理消息: " . $messageData['id'] . "\n";
    
    // 处理消息逻辑
    $result = sendEmail($message['to'], $message['subject'], $message['body']);
    
    return $result; // 返回true表示处理成功,false表示处理失败
});

高级功能

优先级队列

// 高优先级消息
$queueManager->push('email_queue', $message, 'high');

// 低优先级消息
$queueManager->push('email_queue', $message, 'low');

延迟队列

// 延迟60秒处理
$queueManager->pushDelayed('email_queue', $message, 60);

批量推送

$messages = [
    ['to' => 'user1@example.com', 'subject' => '邮件1'],
    ['to' => 'user2@example.com', 'subject' => '邮件2'],
    ['to' => 'user3@example.com', 'subject' => '邮件3'],
];

$messageIds = $queueManager->pushBatch('email_queue', $messages);

多进程消费

// 启动4个进程并发消费
$queueManager->consumeMulti('email_queue', function($message, $messageData) {
    // 消息处理逻辑
    return true;
}, [], 4);

常量定义

项目提供了完整的常量定义,用于统一配置管理和错误处理:

使用常量

use Lzw7758\PhpRedisMq\Constants;

// 使用预定义的优先级
$messageId = $queueManager->push('email_queue', $message, Constants::PRIORITY_HIGH);

// 使用预定义的消息状态
$messageData[Constants::MESSAGE_FIELD_STATUS] = Constants::MESSAGE_STATUS_PENDING;

// 使用预定义的错误码
if ($error) {
    throw new Exception(Constants::getErrorMessage(Constants::ERROR_CODE_MESSAGE_PUSH_FAILED));
}

// 获取默认配置
$config = Constants::getDefaultConfig();
$consumerOptions = Constants::getDefaultConsumerOptions();

主要常量分类

  • 队列类型常量: 定义不同类型的队列前缀
  • 优先级常量: 定义消息优先级级别
  • 消息状态常量: 定义消息生命周期状态
  • 工作者状态常量: 定义工作者运行状态
  • 重试策略常量: 定义消息重试策略
  • 错误码常量: 定义系统错误码
  • 配置常量: 定义默认配置选项

详细使用说明请参考 常量使用文档

消息处理器系统

项目提供了完整的消息处理器系统,支持灵活、可扩展的消息处理机制:

使用消息处理器

use Lzw7758\PhpRedisMq\HandlerManager;
use Lzw7758\PhpRedisMq\Handlers\EmailMessageHandler;
use Lzw7758\PhpRedisMq\Constants;

// 创建处理器管理器
$handlerManager = new HandlerManager();

// 创建并注册邮件处理器
$emailHandler = new EmailMessageHandler([
    'email' => [
        'default_from' => 'noreply@example.com',
        'smtp_host' => 'localhost',
    ]
]);

$handlerManager->registerHandler($emailHandler);

// 处理消息
$message = [
    Constants::MESSAGE_FIELD_TYPE => 'email',
    'to' => 'user@example.com',
    'subject' => '测试邮件',
    'body' => '邮件内容'
];

$results = $handlerManager->handleMessage($message, $messageData);

主要特性

  • 灵活的消息处理: 支持多种消息类型和处理器
  • 完整的生命周期管理: 初始化、处理、清理
  • 强大的错误处理: 重试策略和异常处理
  • 丰富的监控功能: 统计信息和健康检查
  • 可扩展的架构: 易于创建自定义处理器

详细使用说明请参考 消息处理器文档

Web管理界面

项目提供了完整的Web管理界面,支持可视化的队列监控和管理:

快速集成

  1. 安装包

    composer require Lzw7758/php_redis_mq
    
  2. 配置路由

    // 在 config/route.php 中添加
    Route::any('queue-manage/:action?', function ($action = 'index') {
        $handler = new \Lzw7758\PhpRedisMq\Web\ThinkPhpWebHandler();
        $_GET['action'] = $action;
        return $handler->handle();
    });
    
  3. 访问管理界面

    http://your-domain.com/queue-manage/
    

主要特性

  • 零配置: 无需编写控制器代码
  • 一行路由: 一行配置搞定所有功能
  • 完整功能: 支持所有队列管理操作
  • 可扩展: 支持自定义配置和中间件
  • ThinkPHP原生: 完全兼容ThinkPHP框架

详细使用说明请参考 ThinkPHP集成指南

配置选项

$config = [
    // Redis连接配置
    'host' => '127.0.0.1',
    'port' => 6379,
    'password' => null,
    'database' => 0,
    
    // 连接超时配置
    'connection_timeout' => 5,
    'read_timeout' => 5,
    'write_timeout' => 5,
    
    // 重试配置
    'retry_attempts' => 3,
    'retry_delay' => 1000000, // 1秒
    'max_retry_count' => 3,
    
    // 处理配置
    'max_processing_time' => 300,
    'heartbeat_interval' => 30,
];

监控和管理

获取队列统计信息

$stats = $queueManager->getQueueStats('email_queue');
echo "主队列长度: " . $stats['main_queue_length'] . "\n";
echo "处理中队列长度: " . $stats['processing_length'] . "\n";
echo "死信队列长度: " . $stats['dead_queue_length'] . "\n";

获取性能指标

$metrics = $queueManager->getPerformanceMetrics('email_queue');
echo "成功率: " . $metrics['success_rate'] . "%\n";
echo "失败率: " . $metrics['failure_rate'] . "%\n";
echo "处理速率: " . $metrics['processing_rate_per_minute'] . " 消息/分钟\n";

清理过期数据

// 清理24小时前的数据
$result = $queueManager->cleanupExpiredData('email_queue', 86400);
echo "清理了 " . $result['dead_messages'] . " 条死信消息\n";

处理重试队列

$result = $queueManager->processRetryQueues('email_queue');
echo "处理了 " . $result['processed'] . " 条重试消息\n";

完整示例

生产者示例

<?php

use Lzw7758\PhpRedisMq\QueueManager;

$queueManager = new QueueManager([
    'host' => '127.0.0.1',
    'port' => 6379,
    'database' => 0,
]);

// 发送普通消息
$messageId = $queueManager->push('email_queue', [
    'to' => 'user@example.com',
    'subject' => '欢迎邮件',
    'body' => '欢迎使用我们的服务!'
]);

// 发送高优先级消息
$messageId = $queueManager->push('email_queue', [
    'to' => 'admin@example.com',
    'subject' => '紧急通知',
    'body' => '系统出现异常,请立即处理!'
], 'high');

// 发送延迟消息
$messageId = $queueManager->pushDelayed('email_queue', [
    'to' => 'user@example.com',
    'subject' => '提醒邮件',
    'body' => '您的订单即将到期,请及时处理。'
], 3600); // 1小时后发送

echo "消息发送成功,ID: {$messageId}\n";

消费者示例

<?php

use Lzw7758\PhpRedisMq\QueueManager;

$queueManager = new QueueManager([
    'host' => '127.0.0.1',
    'port' => 6379,
    'database' => 0,
]);

// 定义消息处理函数
$messageHandler = function($message, $messageData) {
    echo "处理消息: " . $messageData['id'] . "\n";
    echo "优先级: " . $messageData['priority'] . "\n";
    echo "重试次数: " . $messageData['attempts'] . "\n";
    
    try {
        // 模拟邮件发送
        $result = sendEmail($message['to'], $message['subject'], $message['body']);
        
        if ($result) {
            echo "邮件发送成功: {$message['to']}\n";
            return true; // 处理成功
        } else {
            echo "邮件发送失败: {$message['to']}\n";
            return false; // 处理失败,将进入重试队列
        }
    } catch (Exception $e) {
        echo "处理异常: " . $e->getMessage() . "\n";
        return false; // 处理失败
    }
};

// 启动消费者
$queueManager->consume('email_queue', $messageHandler, [
    'timeout' => 5,
    'max_memory' => 128 * 1024 * 1024, // 128MB
    'max_execution_time' => 3600,
]);

多进程消费者示例

<?php

use Lzw7758\PhpRedisMq\QueueManager;

$queueManager = new QueueManager([
    'host' => '127.0.0.1',
    'port' => 6379,
    'database' => 0,
]);

// 启动4个进程并发消费
$queueManager->consumeMulti('email_queue', function($message, $messageData) {
    echo "进程 " . getmypid() . " 处理消息: " . $messageData['id'] . "\n";
    
    // 处理消息逻辑
    $result = sendEmail($message['to'], $message['subject'], $message['body']);
    
    return $result;
}, [
    'timeout' => 5,
    'max_memory' => 128 * 1024 * 1024,
    'max_execution_time' => 3600,
], 4);

监控脚本示例

<?php

use Lzw7758\PhpRedisMq\QueueManager;

$queueManager = new QueueManager([
    'host' => '127.0.0.1',
    'port' => 6379,
    'database' => 0,
]);

// 获取队列统计信息
$stats = $queueManager->getQueueStats('email_queue');
echo "=== 队列统计信息 ===\n";
echo "主队列长度: " . $stats['main_queue_length'] . "\n";
echo "高优先级队列长度: " . $stats['high_queue_length'] . "\n";
echo "低优先级队列长度: " . $stats['low_queue_length'] . "\n";
echo "处理中队列长度: " . $stats['processing_length'] . "\n";
echo "延迟队列长度: " . $stats['delayed_queue_length'] . "\n";
echo "死信队列长度: " . $stats['dead_queue_length'] . "\n";
echo "总长度: " . $stats['total_length'] . "\n";

// 获取性能指标
$metrics = $queueManager->getPerformanceMetrics('email_queue');
echo "\n=== 性能指标 ===\n";
echo "总推送数: " . $metrics['total_pushed'] . "\n";
echo "总处理数: " . $metrics['total_processed'] . "\n";
echo "总失败数: " . $metrics['total_failed'] . "\n";
echo "成功率: " . $metrics['success_rate'] . "%\n";
echo "失败率: " . $metrics['failure_rate'] . "%\n";
echo "处理速率: " . $metrics['processing_rate_per_minute'] . " 消息/分钟\n";

// 获取活跃工作者
$workers = $queueManager->getActiveWorkers('email_queue');
echo "\n=== 活跃工作者 ===\n";
foreach ($workers as $worker) {
    echo "工作者ID: " . $worker['worker_id'] . "\n";
    echo "最后心跳: " . date('Y-m-d H:i:s', $worker['last_heartbeat']) . "\n";
    echo "是否活跃: " . ($worker['is_active'] ? '是' : '否') . "\n";
    echo "---\n";
}

部署建议

系统配置

# 增加文件描述符限制
ulimit -n 65536

# 调整内核参数
echo 'net.core.somaxconn = 65535' >> /etc/sysctl.conf
sysctl -p

Redis配置优化

# redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru
timeout 300
tcp-keepalive 60

进程管理脚本

#!/bin/bash
# start_consumers.sh

QUEUE_NAME="email_queue"
WORKER_COUNT=4
SCRIPT_PATH="/path/to/consumer.php"

for i in $(seq 1 $WORKER_COUNT); do
    php $SCRIPT_PATH --queue=$QUEUE_NAME --worker-id=worker_$i &
done

wait

注意事项

  1. Redis扩展: 确保PHP安装了Redis扩展
  2. PCNTL扩展: 多进程功能需要PCNTL扩展支持
  3. 内存限制: 根据实际情况调整内存限制
  4. 连接池: 建议在生产环境中使用连接池
  5. 监控告警: 建议设置队列积压告警
  6. 数据备份: 定期备份Redis数据

许可证

MIT License

贡献

欢迎提交Issue和Pull Request!

更新日志

v1.0.0

  • 初始版本发布
  • 支持基本的消息队列功能
  • 支持重试机制和死信队列
  • 支持多进程消费
  • 支持优先级队列和延迟队列
  • 提供完整的监控和管理功能