kode / fibers
高性能 Fiber 线程池与协程调度器
Requires
- php: ^8.1
- guzzlehttp/psr7: ^2.8
- kode/aop: ^2.0
- kode/attributes: ^1.0
- kode/console: ^2.1
- kode/context: ^2.1
- kode/facade: ^2.0
- kode/http-client: ^2.1
Requires (Dev)
- phpunit/phpunit: ^12.5
Suggests
- ext-curl: 用于 HTTP 客户端 cURL 驱动
- ext-swoole: 用于 Swoole 协程支持
- ext-swow: 用于 Swow 协程支持
README
面向 PHP 8.1+ 的高性能 Fiber 纤程客户端,兼容主流框架并提供可降级、可诊断、可扩展的并发执行能力。
✅ 特性概览
- ✅ PHP 8.1+ 原生 Fiber 支持
- ⚠️ PHP <8.4 析构函数中禁止切换 Fiber 的自动降级处理
- 🧩 一键启用协程模式(非侵入式)
- 🔁 高性能 Fiber 池 + 自动回收机制
- 💬 Fiber 间通信:Channel、Queue、Event Bus
- 🛰️ 集成常见操作支持:MySQL、PgSQL、Redis、HTTP Client、文件 IO
- ⏱️ 超时控制、异常捕获、资源监控
- 🔄 任务重试机制
- 🖥️ CPU 核心感知 + 动态线程池配置
- 🔌 多框架适配:Laravel / Symfony / Yii3 / ThinkPHP8 / Plain PHP
- 🛠️ 命令行工具生成配置 & 注册服务
- 📝 原生 PHP 8.1 Attributes + PHPDoc 实现 IDE 完整识别
- 🚫 禁用函数检测 + 运行环境诊断
📖 项目背景
kode/fibers 旨在为 Laravel、Symfony、Yii3、ThinkPHP8 及自建框架提供统一的纤程运行时能力,减少业务接入并发模型时的改造成本。
项目重点解决三类问题:PHP 版本差异(尤其是 PHP<8.4 的析构限制)、生产场景下的并发治理(池化、超时、重试、通信)、以及跨框架可移植性(统一配置与 CLI 初始化)。
⚙️ PHP 8.5 兼容与便捷 API
新增便捷入口以降低接入成本并兼容未来 PHP 8.5 运行时能力:
use Kode\Fibers\Fibers; $result = Fibers::go(fn() => 'hello'); $ctxResult = Fibers::withContext( ['trace_id' => 'trace-001'], fn() => \Kode\Context\Context::get('trace_id') ); $batch = Fibers::batch([1, 2, 3], fn(int $item) => $item * 2, 2); $features = Fibers::runtimeFeatures();
新增健壮架构 API
- 上下文并发透传:
Fibers::concurrentWithContext() - 健壮单任务:
Fibers::resilientRun() - 远程调度分发:
Fibers::scheduleDistributedRemote() - 运行时桥接:
Fibers::runtimeBridgeInfo()、Fibers::runOnBridge() - 可视化监控:
Fibers::profile()、Fibers::profilerDashboard() - ORM 适配层:
Fibers::eloquent()、Fibers::fixtures()
对应文档:
📦 安装
composer require Kode/fibers
框架快速集成(可选)
使用内置命令工具初始化框架配置:
# 自动检测框架类型并生成配置 php vendor/bin/fibers init # 指定框架类型 php vendor/bin/fibers init --framework=laravel
各框架特定命令:
| 框架 | 命令 | 说明 |
|---|---|---|
| Laravel | php artisan vendor:publish --tag=fibers-config |
生成 config/fibers.php |
| Symfony | bin/console fibers:install |
创建 config/packages/fibers.yaml |
| Yii3 | php yii fibers/setup |
初始化模块配置 |
| ThinkPHP8 | php think fibers:config |
生成 config/fibers.php |
| 其他/原生 | vendor/bin/fibers init |
交互式创建配置文件 |
环境要求检查
安装完成后,可以运行诊断命令检查环境兼容性:
# 运行环境诊断
php vendor/bin/fibers diagnose
诊断结果将显示PHP版本、禁用函数、必要扩展等信息,并给出优化建议。
🧱 架构设计原则
本包采用 “轻量内核 + 插件扩展” 设计:
Kode\Fibers\Core\FiberPool // 主纤程池 Kode\Fibers\Channel\Channel // 通信通道 Kode\Fibers\Task\TaskRunner // 任务执行器 Kode\Fibers\Support\CpuInfo // CPU 核心探测 Kode\Fibers\Contracts\Runnable // 可运行接口
所有组件均实现 PSR 标准,支持 DI 容器注入。
🧪 快速开始
1. 基础使用:一键启动纤程任务
使用门面或辅助函数快速启动纤程:
use Kode\Fibers\Facades\Fiber; // 使用门面 $result = Fiber::run(fn() => { // 模拟异步操作 usleep(100000); return 'Hello from Fiber!'; }); // 或使用辅助函数 $result = fiber_run(fn() => { usleep(100000); return 'Hello from Fiber!'; }); echo $result; // 输出: Hello from Fiber!
2. 使用纤程池(推荐生产环境)
纤程池支持资源复用、超时控制和错误重试:
use Kode\Fibers\Core\FiberPool; use Kode\Fibers\Support\CpuInfo; // 创建纤程池,自动检测CPU核心数 $pool = new FiberPool([ 'size' => CpuInfo::getRecommendedPoolSize(4), // CPU核心数 * 4 'max_exec_time' => 30, // 单任务最长执行时间(秒) 'gc_interval' => 100, // 每执行100次触发GC 'max_retries' => 3, // 自动重试次数 'retry_delay' => 0.5 // 重试间隔(秒) ]); // 并行执行多个任务 $results = $pool->concurrent([ fn() => file_get_contents('http://api.a.com'), fn() => file_get_contents('http://api.b.com'), fn() => \RedisClient::get('key') ], 10); // 总超时10秒 print_r($results); // 使用事件回调监控任务状态 $pool = new FiberPool([ 'onTaskStart' => fn($task) => logger()->info('Task started'), 'onTaskComplete' => fn($task, $result) => logger()->info('Task completed'), 'onTaskFail' => fn($task, $error) => logger()->error('Task failed', ['error' => $error->getMessage()]) ]);
🧰 核心功能详解
✅ 1. PHP 版本兼容性处理(含析构限制规避)
⚠️ PHP 8.4 之前:不允许在
__destruct()中调用Fiber::suspend()
我们通过静态分析和运行时检测自动处理此问题,确保在不同PHP版本中都能正常运行:
// 内部实现示例(用户无需关心) if (PHP_VERSION_ID < 80400) { // 启用延迟析构任务队列,避免直接在析构函数中suspend Fibers::enableSafeDestructMode(); }
自动降级策略:
| 条件 | 行为 |
|---|---|
| PHP < 8.1 | 抛出异常,不支持 |
| PHP >= 8.1 && < 8.4 | 启用延迟析构任务队列,安全处理析构中的suspend操作 |
| PHP >= 8.4 | 正常允许在析构函数中使用suspend操作 |
可通过配置关闭严格模式:
// config/fibers.php return [ 'strict_destruct_check' => false, // 开发调试时可关闭 ];
✅ 2. 纤程池(Fiber Pool)高级用法
获取 CPU 数量(用于动态配置)
use Kode\Fibers\Support\CpuInfo; $cpuCount = CpuInfo::get(); // 获取系统CPU核心数 $recommendedSize = CpuInfo::getRecommendedPoolSize(4); // CPU核心数 × 4
自定义池配置示例
use Kode\Fibers\Core\FiberPool; use Kode\Fibers\Support\CpuInfo; $pool = new FiberPool([ 'size' => CpuInfo::getRecommendedPoolSize(3), // CPU × 3 'name' => 'http-worker', 'max_exec_time' => 30, // 单任务超时时间 'max_retries' => 3, // 失败重试次数 'retry_delay' => 0.5, // 重试间隔(秒) 'gc_interval' => 100, // GC触发间隔 'concurrent_limit' => 500, // 最大并发任务数 'onCreate' => fn($id) => logger()->info("Fiber #$id created"), 'onDestroy' => fn($id) => logger()->info("Fiber #$id destroyed"), 'onTaskStart' => fn($taskId) => logger()->debug("Task $taskId started"), 'onTaskComplete' => fn($taskId, $result) => logger()->debug("Task $taskId completed"), 'onTaskFail' => fn($taskId, $error) => logger()->error("Task $taskId failed: {$error->getMessage()}") ]); // 执行单个任务 $singleResult = $pool->run(fn() => file_get_contents('https://api.example.com')); // 并行执行多个任务 $results = $pool->concurrent([ fn() => file_get_contents('https://api.a.com'), fn() => file_get_contents('https://api.b.com'), fn() => file_get_contents('https://api.c.com') ], 10); // 总超时10秒 // 获取池状态信息 $stats = $pool->getStats(); print_r($stats); /* 输出示例: Array( [active_fibers] => 12 [total_tasks] => 156 [completed_tasks] => 144 [failed_tasks] => 3 [average_execution_time] => 0.125 )*/ // 关闭池并释放资源 $pool->shutdown(); // 优雅关闭(等待当前任务完成) $pool->shutdown(true);
支持的操作类型
| 类型 | 是否支持 | 示例 |
|---|---|---|
| MySQL | ✅ | PDO::query() in fiber |
| PgSQL | ✅ | \PDO 或 \pg_connect() |
| Redis | ✅ | \Redis, \Predis\Client |
| HTTP | ✅ | file_get_contents, curl_exec, Guzzle |
| 文件 IO | ✅ | fopen, fwrite(需异步驱动) |
| Queue | ✅ | Channel / MessageQueue |
| Sleep | ✅ | usleep() 被拦截为非阻塞 |
💡 提示:建议配合异步 I/O 扩展如
swow或swoole使用以获得最佳性能。
✅ 3. 多框架适配方案
Kode/fibers提供了统一的API和自动检测机制,可以无缝集成到各种PHP框架中:
统一配置结构 (config/fibers.php)
return [ // 默认纤程池配置 'default_pool' => [ 'size' => env('FIBER_POOL_SIZE', CpuInfo::getRecommendedPoolSize(4)), 'timeout' => 30, 'max_retries' => 3, 'retry_delay' => 0.5, 'context' => ['user_id' => null] ], // 通信通道配置 'channels' => [ 'orders' => ['buffer_size' => 100], 'logs' => ['buffer_size' => 50] ], // 功能开关 'features' => [ 'auto_suspend_io' => true, 'enable_monitoring' => true, 'strict_destruct_check' => env('APP_ENV') === 'production' ], // 环境检测配置 'environment' => [ 'check_disabled_functions' => true, 'required_extensions' => ['curl', 'mbstring'], 'warn_only' => env('APP_ENV') !== 'production' ], 'framework' => [ 'name' => env('APP_FRAMEWORK', 'laravel'), 'service_provider' => true, 'provider_class' => env('FIBER_PROVIDER_CLASS', 'Kode\\Fibers\\Providers\\LaravelServiceProvider'), ], ];
框架集成示例
Laravel 集成:
// 在 app/Providers/AppServiceProvider.php 中注册 public function register() { $this->app->singleton(FiberPool::class, function () { return new FiberPool(config('fibers.default_pool')); }); } // 使用 public function index(FiberPool $pool) { $results = $pool->concurrent([...]); return response()->json($results); }
Symfony 集成:
# config/packages/fibers.yaml kode_fibers: default_pool: size: 64 timeout: 30
// 在控制器中使用 public function index(FiberPool $pool) { $results = $pool->concurrent([...]); return $this->json($results); }
Yii3 框架集成:
// 在 config/common.php 中配置 return [ 'components' => [ 'fibers' => [ 'class' => \Kode\Fibers\Core\FiberManager::class, 'poolConfig' => $params['fibers.default_pool'] ] ] ];
ThinkPHP8 框架集成:
// 在 config/service.php 中注册 return [ 'fibers' => \Kode\Fibers\Providers\ThinkPHPService::class ];
原生 PHP 项目:
// 创建配置文件 $config = require_once 'config/fibers.php'; // 初始化纤程池 $pool = new FiberPool($config['default_pool']); // 使用纤程池 $results = $pool->concurrent([...]);
✅ 4. PHP 8.1 原生注解 + IDE 可识别设计
Kode/fibers充分利用PHP 8.1的原生注解功能,提供更好的IDE支持和类型安全:
使用 Attribute 实现元数据标记
use Kode\Fibers\Attributes\FiberSafe; use Kode\Fibers\Attributes\Timeout; use Kode\Fibers\Attributes\ChannelListener; #[FiberSafe] // 表示该方法可在纤程中安全调用 class ApiService { #[Timeout(10)] // 设置10秒超时 public function fetchUser(int $id): array { return json_decode(file_get_contents("https://api.com/users/$id"), true); } #[ChannelListener('order.created')] // 监听通道事件 public function onOrderCreated(array $data): void { // 处理订单创建事件 } }
PHPDoc 辅助 IDE 提示
/** * @method static mixed run(callable $task, float $timeout = null) * @method static FiberPool pool(array $options = []) * @method static Channel channel(string $name, int $buffer = 0) * @method static array concurrent(array $tasks, float $timeout = null) */ class Fiber {}
✅ 在 PhpStorm / VSCode + Intelephense 中均可获得完整补全!
自动类型推断与验证
Kode/fibers通过PHP 8.1的类型系统和注解,可以在开发阶段捕获潜在问题:
#[FiberSafe] function processUserData(User $user): array { // 函数逻辑... return ['id' => $user->id, 'name' => $user->name]; } // IDE会自动提示类型错误 Fiber::run(fn() => processUserData(null)); // 提示:参数1应为User类型
✅ 5. 通信机制:Channel 与 Event Bus
Kode/fibers提供了强大的纤程间通信机制,包括Channel(类似Go Channel)和Event Bus(发布/订阅模式):
创建通信通道(类似 Go Channel)
use Kode\Fibers\Channel\Channel; // 创建带缓冲区的通道(缓冲区大小为10) $ch = Channel::make('download-results', 10); // 或者使用辅助函数 $ch = fiber_channel('download-results', 10); // 生产者:向通道发送数据 Fiber::run(function () use ($ch) { foreach ([1, 2, 3] as $i) { $ch->push("Data $i"); usleep(100000); // 模拟异步操作 } $ch->close(); // 关闭通道,防止内存泄漏 }); // 消费者:从通道接收数据 while ($msg = $ch->pop(1)) { // 超时1秒 echo $msg . "\n"; } // 非阻塞模式 if ($ch->canPush()) { $ch->push('non-blocking data'); } // 带超时的非阻塞模式 if ($ch->tryPush('data', 0.5)) { // 0.5秒超时 echo 'Data pushed successfully'; }
发布/订阅模型(Event Bus)
use Kode\Fibers\Event\EventBus; use Kode\Fibers\Event\Event; // 定义事件类 class PaymentSuccessEvent extends Event { public function __construct(public array $paymentData) { parent::__construct('payment.success', $paymentData); } } // 注册事件监听器 EventBus::on('payment.success', function (Event $event) { $paymentData = $event->getData(); // 处理支付成功事件 notifyAdmin($paymentData); }); // 注册一次性监听器(只触发一次) EventBus::once('user.login', fn($event) => logFirstLogin($event->getData())); // 触发事件 EventBus::fire(new PaymentSuccessEvent([ 'order_id' => 123, 'amount' => 99.99, 'user_id' => 456 ])); // 移除事件监听器 EventBus::off('payment.success'); // 带有优先级的事件监听器 EventBus::on('system.shutdown', fn() => saveCriticalData(), 100); // 高优先级 EventBus::on('system.shutdown', fn() => cleanTempFiles(), 50); // 中优先级 EventBus::on('system.shutdown', fn() => logShutdown(), 10); // 低优先级
✅ 6. 禁用函数检测与环境诊断
Kode/fibers提供了全面的环境检测功能,可以识别潜在的兼容性问题:
检测黑名单函数
use Kode\Fibers\Support\Environment; // 运行全面的环境诊断 $issues = Environment::diagnose(); // 打印诊断结果 foreach ($issues as $issue) { echo "⚠️ {$issue['type']}: {$issue['message']}" . PHP_EOL; if (isset($issue['recommendation'])) { echo "💡 建议: {$issue['recommendation']}" . PHP_EOL; } } // 检查特定功能是否可用 if (Environment::hasDisabledFunctions(['exec', 'shell_exec'])) { // 提供替代方案 } // 检查必要的扩展是否安装 if (!Environment::hasRequiredExtensions(['curl', 'mbstring'])) { // 提示用户安装扩展 } // 检查是否可以在析构函数中安全使用Fiber if (Environment::supportsDestructInFiber()) { // PHP 8.4+ 行为 } else { // PHP < 8.4 兼容行为 }
常见禁用函数影响
| 函数 | 影响 | 建议 |
|---|---|---|
pcntl_* |
多进程冲突 | 关闭或隔离使用 |
set_time_limit |
可能中断 suspend | 使用 @ini_set 局部关闭 |
exec, shell_exec |
阻塞调用 | 替换为异步执行器 |
sleep, usleep |
阻塞主线程 | 已被 Fiber 内部重写为非阻塞 |
exit, die |
终止整个进程 | 避免使用,改用抛出异常 |
header, session_start |
可能破坏上下文 | 在主纤程中使用,或使用上下文管理器 |
✅ 我们会在初始化时尝试模拟这些函数的安全替代品。
📘 使用场景指南
🧩 何时使用 Fiber::run()?(一键协程)
适用于小规模并发请求,无需长期维护状态:
// 场景:并行获取多个 API 数据 $data = [ Fiber::run(fn() => httpGet('/users')), Fiber::run(fn() => httpGet('/posts')), Fiber::run(fn() => cacheGet('stats')) ]; // 场景:非阻塞读取文件 $content = Fiber::run(fn() => { $handle = fopen('large-file.txt', 'r'); $content = fread($handle, 1024 * 1024); fclose($handle); return $content; }); // 场景:异步等待外部服务响应 $response = Fiber::run(function() { $client = new HttpClient(); return $client->get('https://api.example.com'); }, 5); // 5秒超时
✅ 优点:零配置、无副作用
❌ 缺点:无法复用、无资源管控
🏗️ 何时使用 FiberPool?(生产推荐)
适用于高并发服务,如微服务网关、批量任务处理器:
// 场景1:API网关并行请求聚合 $pool = new FiberPool(['size' => 128]); $userRequests = array_map(fn($userId) => fn() => getUserData($userId), $userIds); $userData = $pool->concurrent($userRequests, 10); // 场景2:批量数据处理 $pool = new FiberPool([ 'size' => CpuInfo::getRecommendedPoolSize(4), 'max_exec_time' => 60, 'max_retries' => 3 ]); $results = $pool->concurrent(array_map(function($item) { return function() use ($item) { // 处理单个项目 return processItem($item); }; }, $batchItems)); // 场景3:定时任务执行器 $pool = new FiberPool(['name' => 'scheduler']); foreach ($tasks as $task) { $pool->run(function() use ($task) { $delay = $task->getDelay(); usleep($delay * 1000000); // 非阻塞睡眠 return $task->execute(); }); }
✅ 支持:
- 资源复用
- 错误重试
- 超时熔断
- 监控埋点
- 并发控制
🛠️ CLI 命令列表
Kode/fibers提供了一系列命令行工具,方便开发和管理:
# 初始化配置文件 php vendor/bin/fibers init # 指定框架类型初始化 php vendor/bin/fibers init --framework=laravel # 运行环境诊断 php vendor/bin/fibers diagnose # 查看当前运行的 Fiber ID 和状态 php vendor/bin/fibers status # 清理僵尸纤程 php vendor/bin/fibers cleanup # 性能压测(测试最大吞吐) php vendor/bin/fibers benchmark --concurrency=1000 # 查看帮助信息 php vendor/bin/fibers --help # 查看特定命令帮助 php vendor/bin/fibers diagnose --help
常用命令选项
# 初始化命令选项 --framework=FRAMEWORK 指定框架类型 (laravel, symfony, yii3, thinkphp8, native) --config=PATH 指定配置文件路径 --quiet, -q 静默模式 # 诊断命令选项 --format=FORMAT 输出格式 (text, json, markdown) --strict 严格模式,遇到问题立即退出 --detailed 详细输出 # 基准测试命令选项 --concurrency=N 并发数量 --duration=SECONDS 测试持续时间 --task=TYPE 测试任务类型 (io, cpu, mixed)
🔄 自动降级机制
kode/fibers 支持智能降级,根据已安装的依赖包自动选择最佳实现:
安装方式
# 最小安装(仅核心功能,使用原生实现) composer require kode/fibers # 完整安装(推荐,使用增强包实现) composer require kode/fibers kode/http-client guzzlehttp/psr7 kode/console kode/facade
功能对比
| 功能 | 最小安装 | 完整安装 |
|---|---|---|
| 核心 Fiber API | ✅ 完整支持 | ✅ 完整支持 |
| HTTP 客户端 | 原生 cURL | kode/http-client + PSR-7 |
| 命令行工具 | 基础输出 | kode/console 完整功能 |
| Facade 模式 | 基础静态代理 | kode/facade 完整功能 |
| 上下文传递 | ✅ 完整支持 | ✅ 完整支持 |
| 连接池 | ✅ 完整支持 | ✅ 完整支持 |
使用示例
use Kode\Fibers\HttpClient\HttpClient; // 自动选择最佳驱动 $client = new HttpClient(); $response = $client->get('https://api.example.com'); // 检查当前使用的驱动 if ($client->isUsingNativeDriver()) { echo "使用原生 cURL 驱动(无需额外依赖)"; } else { echo "使用 kode/http-client 包驱动(功能更完整)"; }
🧩 功能实现状态
✅ 已实现功能
- 上下文传递机制:使用
kode/context实现纤程上下文变量传递,支持跨设备 - 分布式 Fiber 调度:
DistributedScheduler支持跨机器任务调度 - 性能监控面板:
FiberProfiler和WebUI可视化监控(已合并) - 生态系统集成:
RuntimeBridge支持 Swoole/OpenSwoole/Swow/Workerman 桥接 - ORM 适配层:
EloquentAdapter和FixturesAdapter支持 Eloquent、Doctrine 等 - 断路器模式:
CircuitBreaker实现自动熔断和恢复机制 - 负载均衡:
RoundRobinBalancer智能任务分发算法 - 热重载支持:
HotReloader实现不中断服务更新代码 - 可视化管理界面:
WebUI提供 Web UI 管理纤程池和任务 - 连接池支持:
ConnectionPool支持 PDO、Redis 连接池管理 - 协程调试器:
FiberDebugger支持断点、日志、状态监控 - PHP 8.5 特性支持:
Php85Features自动适配新特性 - 多框架支持:Laravel、Lumen、Symfony、Hyperf、Webman、Yii3、ThinkPHP8
详细开发计划见 路线图文档。
📚 参考资料
- PHP Fibers RFC - PHP 官方纤程规范
- PHP 8.1 Fibers 文档 - 官方 API 文档
- Swoole Coroutine Docs - Swoole 协程文档
- Swow Fiber Guide - Swow 框架纤程指南
- Go Channels in PHP - PHP 中的 Go 通道实现
- RevoltPHP Event Loop - 事件循环实现参考
🙌 贡献者
欢迎提交 PR!请确保:
- ✅ 单元测试覆盖率 ≥ 90%
- ✅ 符合 PSR-12 编码规范
- ✅ 更新文档与 CHANGELOG
Maintained by Byte Team - Kode PHP Lab
🌐 https://github.com/Kode-php/fibers