kode/parallel

适用于 PHP 8.1+ 的高性能并行并发扩展库,支持 Runtime、Task、Future、Channel、Events、Fiber、Sync、Cluster 等完整功能,提供跨机器分布式任务执行能力

Maintainers

Package info

github.com/kodephp/parallel

pkg:composer/kode/parallel

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

1.3.1 2026-03-19 15:42 UTC

This package is auto-updated.

Last update: 2026-03-19 15:44:46 UTC


README

高性能 PHP 并行并发扩展库,基于 PHP ext-parallel 实现,为 PHP 8.1+ 提供简洁、健壮的并行编程接口。支持跨机器分布式任务执行

PHP Version License Package Version

目录

简介

kode/parallel 是适用于 PHP 8.1+ 的高性能并行并发扩展库。该库基于 PHP 官方的 ext-parallel 扩展构建,提供了更高级别的面向对象 API、完整的中文文档支持,以及 PHP 8.5 新特性的前向兼容实现。

架构模型:多线程 + 分布式

┌─────────────────────────────────────────────────────────────────┐
│                      本地并行 (多线程)                            │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  Runtime (PHP 解释器线程 #1)  ──── Channel ──── 线程 #N │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                      分布式集群 (跨机器)                          │
│                                                                 │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐    │
│  │   节点 #1    │◄────►│   节点 #2    │◄────►│   节点 #N    │    │
│  │ tcp://host1 │      │ tcp://host2 │      │ tcp://hostN │    │
│  │ :8001       │      │ :8002       │      │ :800N       │    │
│  └─────────────┘      └─────────────┘      └─────────────┘    │
│                                                                 │
│              ClusterManager (主节点调度)                          │
└─────────────────────────────────────────────────────────────────┘

核心能力

层级 能力 组件
本地并行 多线程执行 Runtime, Task, Future, Channel, Events
同步原语 互斥/信号量 Mutex, Semaphore, Cond, Barrier
协程支持 Fiber 协程 Fiber, FiberManager
跨机器 分布式集群 Node, TcpNodeTransport, ClusterManager, ClusterServer
HTTP 并行 并行请求 CurlMulti

功能特性

组件 说明
Runtime PHP 解释器线程管理,本地并行执行的基础
Task 并行任务闭包封装
Future 异步任务返回值访问
Channel Task 间双向通信,支持有/无界限通道
Events 事件循环驱动
Fiber PHP Fiber 协程封装(PHP 8.1+)
Sync 同步原语:Mutex、Semaphore、Cond、Barrier
Pipe 进程间通信管道
CurlMulti 并行 HTTP 请求封装
Node 集群节点表示(跨机器)
TcpNodeTransport TCP 节点传输层(跨机器)
ClusterManager 集群管理器,支持主节点选举、负载均衡
ClusterServer 集群服务器(任务执行器)
Util PHP 8.5 兼容工具:管道操作符、Clone With 等
Installation 自动检测 ext-parallel 并提供安装提示

系统要求

要求 说明
PHP 版本 >= 8.1
必需扩展 ext-parallel
必需包 kode/fibers, kode/context, kode/facade
可选扩展 ext-curl (用于 CurlMulti)

PHP 版本适配

PHP 版本 支持状态 特性
8.1 ✅ 完全支持 基础 Fiber, readonly 属性
8.2 ✅ 完全支持 随机字节改进
8.3 ✅ 完全支持 改进的类型系统
8.4 ✅ 完全支持 改进的性能
8.5 ✅ 最佳支持 管道操作符、Clone With、持久化 cURL

安装

1. 安装 ext-parallel 扩展(必需)

# Linux/macOS via PECL
pecl install parallel

# 或者从源码编译
git clone https://github.com/krakjoe/parallel.git
cd parallel
phpize && ./configure && make && sudo make install

php.ini 中添加:

extension=parallel.so

安装后验证:

php -r "echo extension_loaded('parallel') ? 'OK' : '请先安装 ext-parallel';"
composer run-script check

2. 安装 Composer 包

composer require kode/parallel

这将自动安装所有依赖:kode/fibers, kode/context, kode/facade

快速开始

1. 本地并行

<?php
require_once __DIR__ . '/vendor/autoload.php';

use Kode\Parallel\Runtime\Runtime;

$runtime = new Runtime();

// 并行执行 5 个任务
$futures = [];
for ($i = 0; $i < 5; $i++) {
    $futures[] = $runtime->run(
        fn($args) => $args['n'] * $args['n'],
        ['n' => $i + 1]
    );
}

foreach ($futures as $f) {
    echo "结果: " . $f->get() . "\n";
}

$runtime->close();

2. Channel 通信

<?php
use Kode\Parallel\Runtime\Runtime;
use Kode\Parallel\Channel\Channel;

$runtime = new Runtime();
$ch = Channel::make('data');

$runtime->run(fn() => $ch->send(range(1, 100)));
$runtime->run(fn() => $ch->send(range(101, 200)));

$sum = fn() => array_sum($ch->recv()) + array_sum($ch->recv());
echo "总和: " . $runtime->run($sum)->get() . "\n";

$runtime->close();

3. 分布式集群(跨机器)

<?php
// 主节点 (master.php)
use Kode\Parallel\Cluster\ClusterManager;
use Kode\Parallel\Network\Node;
use Kode\Parallel\Runtime\Runtime;

$manager = new ClusterManager(new Runtime());

$manager->setLocalNode(new Node('192.168.1.100', 9000, 'master'));
$manager->addNode(new Node('192.168.1.101', 8001, 'worker-1'));
$manager->addNode(new Node('192.168.1.102', 8001, 'worker-2'));

$result = $manager->submitTask(
    'my-task',
    fn($args) => 'Hello from ' . gethostname(),
    []
);

echo $result['result'] . "\n";
<?php
// 工作节点 (worker.php)
use Kode\Parallel\Cluster\ClusterServer;
use Kode\Parallel\Network\Node;
use Kode\Parallel\Runtime\Runtime;

$node = new Node('0.0.0.0', 8001, $argv[1] ?? 'worker');
$server = new ClusterServer($node, new Runtime());

echo "启动: {$node->getAddress()}\n";
$server->start();

4. Fiber 协程

<?php
use Kode\Parallel\Fiber\Fiber;

$fiber = new Fiber(function() {
    $value = Fiber::suspend('暂停');
    return "收到: $value";
});

echo $fiber->start() . "\n";       // 暂停
echo $fiber->resume('恢复数据') . "\n"; // 收到: 恢复数据

核心组件详解

Runtime - 运行时

$runtime = new Runtime();
$runtime = new Runtime('/path/to/bootstrap.php');

$future = $runtime->run($task, $args);
$future = $runtime->run(fn($a) => $a['x'] + $a['y'], ['x' => 1, 'y' => 2]);

$runtime->isRunning();
$runtime->getBootstrap();
$runtime->close();

Channel - 通道

// 无界限通道
$ch = Channel::make('name');

// 有界限通道(容量为10)
$ch = Channel::bounded(10);

// 发送/接收
$ch->send($data);
$data = $ch->recv();
$ch->close();
$ch->isEmpty();

Events - 事件循环

$events = new Events();
$events->attachChannel('ch1', $channel);
$events->attachFuture('f1', $future);

foreach ($events as $event) {
    // $event['source'] = 'ch1' | 'f1'
    // $event['data'] = received data
}

Sync - 同步原语

// Mutex - 互斥锁
$mutex = new Mutex();
$mutex->withLock(fn() => $shared++);

// Semaphore - 信号量(限流,最多3个并发)
$sem = new Semaphore(3);
$sem->withResource(fn() => process());

// Cond - 条件变量
$cond = new Cond();
$cond->wait($mutex);
$cond->broadcast();

// Barrier - 屏障(等待 N 个任务)
$barrier = new Barrier(3);
$barrier->wait();
$runtime = new Runtime();
$future = $runtime->run($task, $args);
$runtime->close();

Task - 任务

Task 中禁止使用:yield、引用传递、类声明、命名函数。

Future - 异步结果

$future->get();        // 阻塞获取
$future->getOrNull();   // 非阻塞获取
$future->wait(1000);    // 等待1秒
$future->cancel();      // 取消任务

Channel - 通道

// 无界限通道
$ch = Channel::make('name');

// 有界限通道(容量为10)
$ch = Channel::bounded(10);

// 发送/接收
$ch->send($data);
$data = $ch->recv();

Events - 事件循环

$events = new Events();
$events->attachChannel('ch1', $channel);
$events->attachFuture('f1', $future);

foreach ($events as $event) {
    // 处理事件
}

分布式集群

支持跨机器分布式任务执行,包含节点管理、主节点选举、负载均衡等功能。

ClusterManager - 集群管理器

use Kode\Parallel\Cluster\ClusterManager;
use Kode\Parallel\Network\Node;

$manager = new ClusterManager(new Runtime());
$manager->setLocalNode(new Node('192.168.1.100', 9000, 'master'));

$manager->addNode(new Node('192.168.1.101', 8001, 'worker-1'));
$manager->addNode(new Node('192.168.1.102', 8001, 'worker-2'));

$result = $manager->submitTask('task-1', fn($args) => $args['x'] * 2, ['x' => 21]);
echo "结果: " . $result['result'] . "\n";

$manager->broadcast(fn() => gethostname());
$manager->healthCheck();

ClusterServer - 集群服务器

use Kode\Parallel\Cluster\ClusterServer;
use Kode\Parallel\Network\Node;

$node = new Node('0.0.0.0', 8001, 'worker-1');
$server = new ClusterServer($node, new Runtime());
$server->start();

详见 CLUSTER.md

PHP 8.5 新特性

管道操作符(Pipe Operator)

<?php
use function Kode\Parallel\Util\pipe;

// 类似 Unix 的管道操作
$result = pipe(
    '  Hello World  ',
    'trim',
    'strtoupper',
    fn($s) => str_replace('WORLD', 'PHP', $s)
);

echo $result; // 输出: HELLO PHP

Clone With

<?php
use function Kode\Parallel\Util\clone_with;

class Color {
    public function __construct(
        public int $red,
        public int $green,
        public int $blue,
        public int $alpha = 255
    ) {}
}

$blue = new Color(79, 91, 147);
$transparent = clone_with($blue, ['alpha' => 128]);

持久化 cURL(PHP 8.5)

// CurlMulti 自动支持连接复用
$curl = new CurlMulti();
$curl->get('https://api.example.com/1');
$curl->get('https://api.example.com/2');
$curl->get('https://api.example.com/3');
$results = $curl->execute(); // 自动复用连接

Fiber 协程

基本用法

<?php
use Kode\Parallel\Fiber\Fiber;
use Kode\Parallel\Fiber\FiberManager;

// 单个 Fiber
$fiber = new Fiber(function($input) {
    $step1 = Fiber::suspend('第一步完成');
    return "最终结果: {$step1}";
});

$fiber->start();
$result = $fiber->resume('恢复数据');

// Fiber 管理器
$manager = new FiberManager();
$manager->spawn('task1', fn() => compute1());
$manager->spawn('task2', fn() => compute2());
$manager->startAll();
$results = $manager->collect();

详见 FIBER.md

高级用法

1. Sync 同步原语

<?php
use Kode\Parallel\Sync\Mutex;
use Kode\Parallel\Sync\Semaphore;

// Mutex - 互斥锁
$mutex = new Mutex();
$mutex->withLock(function() {
    // 临界区代码
});

// Semaphore - 信号量(限流)
$sem = new Semaphore(3); // 最多3个并发
$sem->withResource(function() {
    // 限流执行
});

2. Pipe 管道

<?php
use Kode\Parallel\Pipe\Pipe;

$pipe = Pipe::make('my_pipe');
$pipe->write('Hello');
$data = $pipe->read();

详见 PIPE.md

3. CurlMulti 并行请求

<?php
use Kode\Parallel\Curl\CurlMulti;

$curl = new CurlMulti();
$curl->get('https://api.example.com/users');
$curl->post('https://api.example.com/posts', ['title' => 'Hello']);
$results = $curl->execute();

详见 CURL.md

4. 生产者消费者模式

<?php
use Kode\Parallel\Runtime\Runtime;
use Kode\Parallel\Channel\Channel;

$runtime = new Runtime();
$channel = Channel::bounded(10);

$producer = fn() => produce($channel);
$consumer = fn() => consume($channel);

$runtime->run($producer);
$runtime->run($consumer);

详见 ADVANCED_USAGE.md

性能压测

========================================
     Kode/Parallel 性能压测报告
========================================

Task 创建: 1.52 μs/个
简单任务: 7,958 tasks/sec
Channel通信: 22,000+ ops/sec
Mutex操作: 4,200,000+ ops/sec

并行加速比:
  4任务: 3.21x (效率 80.3%)
  8任务: 5.35x (效率 66.9%)
 10任务: 5.63x (效率 56.3%)

详见 BENCHMARK.md

最佳实践

任务设计

// ✅ 推荐:简单、单一职责
$task = new Task(fn($args) => processData($args['data']));

// ❌ 避免:复杂业务逻辑
$task = new Task(function($args) {
    // 大量代码...
});

错误处理

try {
    $runtime = new Runtime('/invalid/path.php');
} catch (ParallelException $e) {
    echo "错误: " . $e->getMessage() . "\n";
}

资源管理

$runtime = new Runtime();
try {
    $result = $runtime->run($task)->get();
} finally {
    $runtime->close();
}

常见问题

Q: Task 中不能使用 yield?

// ❌ 不行
$task = new Task(function() { yield 1; });

// ✅ 改用返回数组
$task = new Task(function() { return [1, 2, 3]; });

Q: 如何传递大数据?

// ✅ 使用 Channel
$channel = Channel::make();
$runtime->run(fn($args) => $args['ch']->send($data), ['ch' => $channel]);

Q: Fiber 和 Thread 的区别?

特性 Fiber Thread
调度 用户态 内核态
切换开销 微秒级 毫秒级
共享内存 不共享 共享

文档索引

文档 内容
README 项目概述和快速开始
DEVELOPMENT.md 开发指南和 API 参考
CLUSTER.md 分布式集群详解(跨机器)
FIBER.md Fiber 协程详解
PIPE.md Pipe 管道详解
CURL.md CurlMulti 并行请求
ADVANCED_USAGE.md 高级用法和案例
PTHREADS_COMPARISON.md 与 pthreads 对比
PCNTL_COMPARISON.md 与 pcntl 对比
BENCHMARK.md 完整性能压测数据

许可证

Apache-2.0

相关链接