reactphp-x / tunnel-stream
v1.0.0
2025-03-30 11:15 UTC
Requires
- laravel/serializable-closure: ^2.0
- ramsey/uuid: ^4.7
- react/promise: ^3.2
- react/promise-timer: ^1.11
- react/stream: ^1.4
- rybakit/msgpack: ^0.9.1
Requires (Dev)
- react/child-process: ^0.6.6
This package is auto-updated.
Last update: 2025-03-30 11:18:20 UTC
README
一个基于 ReactPHP 的流式隧道通信库,支持在不同进程间传输数据流和执行闭包函数。
特性
- 支持进程间双向数据流传输
- 支持序列化闭包函数在不同进程间执行
- 基于 MessagePack 的高效二进制协议
- 内置心跳检测机制
- 完整的错误处理和事件通知
- 支持异步 Promise 操作
安装
composer require reactphp-x/tunnel-stream -vvv
基本用法
创建隧道流
use ReactphpX\TunnelStream\TunnelStream; use React\Stream\ThroughStream; // 创建读写流 $read = new ThroughStream(); $write = new ThroughStream(); // 初始化隧道流 $tunnelStream = new TunnelStream($read, $write);
执行远程闭包
// 在远程进程执行闭包函数 $stream = $tunnelStream->run(function () { return file_get_contents('example.txt'); }); // 处理返回的数据流 $stream->on('data', function ($data) { echo $data; }); $stream->on('end', function () { echo "Stream ended\n"; }); $stream->on('error', function (Exception $e) { echo "Error: " . $e->getMessage() . "\n"; });
心跳检测
$tunnelStream->ping(3)->then( function () { echo "Ping successful\n"; }, function (\Exception $e) { echo "Ping failed: " . $e->getMessage() . "\n"; } );
进阶示例
子进程通信
在父子进程间建立隧道流通信是一个常见的使用场景。以下是一个完整的示例:
主进程 (process.php)
use ReactphpX\TunnelStream\TunnelStream; use React\ChildProcess\Process; use React\EventLoop\Loop; // 创建子进程 $process = new Process(sprintf( 'exec php %s/child_process_init.php', __DIR__ )); $process->start(); // 创建隧道流 $tunnelStream = new TunnelStream($process->stdout, $process->stdin); // 监听子进程输出 $process->stdout->on('data', function ($data) { echo "STDOUT: " . $data . PHP_EOL; }); $process->stderr->on('data', function ($data) { echo "STDERR: " . $data . PHP_EOL; }); $process->on('exit', function ($exitCode) { echo "Process exited with code $exitCode\n"; }); // 执行文件读取操作 $fileStream = $tunnelStream->run(function () { return file_get_contents(__DIR__ . '/composer.json'); }); $fileStream->on('data', function ($data) { echo "File Stream: " . $data . PHP_EOL; }); $fileStream->on('error', function ($error) { echo "File Stream Error: " . $error->getMessage() . PHP_EOL; }); // 执行异步延迟操作 $promiseStream = $tunnelStream->run(function () { return \React\Promise\Timer\sleep(2)->then(function () { return 'Hello World'; }); }); $promiseStream->on('data', function ($data) { echo "Promise Stream: " . $data . PHP_EOL; }); // 持续数据流示例 $alwayStream = $tunnelStream->run(function ($stream) { $i = 0; $timer = Loop::addPeriodicTimer(1, function () use ($stream, &$i) { $stream->write('Hello World' . $i . PHP_EOL); $i++; }); $stream->on('close', function () use ($timer) { Loop::cancelTimer($timer); echo "Always Stream Close\n"; }); return $stream; }); $alwayStream->on('data', function ($data) { echo "Always Stream: " . $data . PHP_EOL; }); // 5秒后关闭持续数据流 Loop::addTimer(5, function () use ($alwayStream) { $alwayStream->close(); });
子进程 (child_process_init.php)
use React\Stream\WritableResourceStream; use React\Stream\ReadableResourceStream; use React\EventLoop\Loop; use ReactphpX\TunnelStream\TunnelStream; use React\Stream\ThroughStream; // 创建读写流 $read = new ThroughStream(); $write = new ThroughStream(); // 初始化子进程隧道流 $tunnelStream = new TunnelStream($read, $write, true); // 处理标准输出 $STDOUT = new WritableResourceStream(STDOUT); $write->on('data', function ($buffer) use ($STDOUT) { $STDOUT->write($buffer); }); $write->on('close', function () use ($tunnelStream) { $tunnelStream->close(); }); // 处理标准输入 $STDIN = new ReadableResourceStream(STDIN); $STDIN->on('data', function ($chunk) use ($read) { $read->write($chunk); }); $STDIN->on('close', function () use ($tunnelStream) { $tunnelStream->close(); });
这个示例展示了:
- 如何在父子进程间建立双向通信
- 如何在子进程中执行文件操作
- 如何处理异步 Promise 操作
- 如何实现持续的数据流传输
- 如何优雅地关闭数据流
- 如何处理错误和异常
最佳实践
-
错误处理
- 始终监听 error 事件
- 在关键操作处添加错误处理逻辑
- 使用 try-catch 包装可能抛出异常的代码
-
资源管理
- 及时关闭不再使用的流
- 使用 close 事件清理相关资源
- 避免内存泄漏
-
性能优化
- 合理使用缓冲区大小
- 避免过大的数据包
- 适时使用心跳检测保持连接
API 文档
TunnelStream 类
构造函数
public function __construct( Stream\ReadableStreamInterface $readStream, Stream\WritableStreamInterface $writStream, bool $canCallback = false )
方法
-
run(callable $closure): Stream\DuplexStreamInterface
执行远程闭包函数 -
ping(int $timeout = 3): PromiseInterface
发送心跳包并等待响应 -
close(): void
关闭所有流
依赖
- react/stream: ^1.4
- laravel/serializable-closure: ^2.0
- ramsey/uuid: ^4.7
- rybakit/msgpack: ^0.9.1
- react/promise: ^3.2
- react/promise-timer: ^1.11
许可证
MIT License