reactphp-x / register-center
Installs: 5
Dependents: 3
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/reactphp-x/register-center
Requires
- psr/log: ^3.0
- react/socket: ^1.16
- reactphp-x/tunnel-stream: ^1.0
Requires (Dev)
- monolog/monolog: ^3.0
- phpunit/phpunit: ^11.5
- react/async: ^4.3
Suggests
- monolog/monolog: Provides a powerful logging implementation that can be used with this package
This package is auto-updated.
Last update: 2026-02-05 14:51:05 UTC
README
基于 ReactPHP 构建的高性能分布式服务注册与发现中心,支持服务注册、发现、动态节点管理以及主节点与注册中心之间的实时通信。
特性
- 🚀 高性能异步: 基于 ReactPHP 事件循环,支持高并发
- 🔄 分布式架构: 支持多个注册中心和主节点
- 🎯 服务注册与发现: 动态服务注册、执行和管理
- 🔗 实时通信: 节点间双向实时通信
- 🔄 自动重连: 智能重连机制,断线自动恢复
- ⚙️ 可配置重试: 灵活的重试策略配置
- 📝 完善日志: 基于 Monolog 的全方位日志支持
- 🔐 身份验证: 基于令牌的安全认证机制
- 🎲 动态服务执行: 支持远程服务调用和执行
- 📡 节点管理: 动态添加、移除注册中心节点
系统要求
- PHP 8.0 或更高版本
- Composer
安装
composer require reactphp-x/register-center
快速开始
1. 启动注册中心
创建一个注册中心服务器,监听端口 8010:
<?php
require 'vendor/autoload.php';
use React\EventLoop\Loop;
use ReactphpX\RegisterCenter\Register;
use Monolog\Logger;
use Monolog\Level;
use Monolog\Handler\StreamHandler;
use Monolog\Formatter\LineFormatter;
// 创建事件循环
$loop = Loop::get();
// 创建日志器
$logger = new Logger('registration-center');
$handler = new StreamHandler('php://stdout', Level::Debug);
$handler->setFormatter(new LineFormatter(
"[%datetime%] %channel%.%level_name%: %message% %context%\n",
null,
true,
true
));
$logger->pushHandler($handler);
// 创建并启动注册中心
$center = new Register(8010, $loop, $logger);
$center->start();
// 运行事件循环
$loop->run();
2. 创建主节点
创建一个主节点并注册服务:
<?php
require 'vendor/autoload.php';
use React\EventLoop\Loop;
use ReactphpX\RegisterCenter\Master;
use ReactphpX\RegisterCenter\ServiceRegistry;
use Monolog\Logger;
use Monolog\Level;
use Monolog\Handler\StreamHandler;
use Monolog\Formatter\LineFormatter;
// 注册服务
ServiceRegistry::register('hello-world', new class {
public function sayHello() {
$date = date('Y-m-d H:i:s');
return "Hello, world! $date\n";
}
public function sayHello2($name) {
$date = date('Y-m-d H:i:s');
return [
'date' => $date,
'name' => $name,
'version' => '1.0.0',
'description' => 'Hello, world!',
'author' => 'ReactPHP X',
'email' => 'support@reactphp-x.com',
'url' => 'https://github.com/reactphp-x',
'license' => 'MIT',
];
}
});
// 创建日志器
$logger = new Logger('master');
$handler = new StreamHandler('php://stdout', Level::Debug);
$handler->setFormatter(new LineFormatter(
"[%datetime%] %channel%.%level_name%: %message% %context%\n",
null,
true,
true
));
$logger->pushHandler($handler);
// 创建主节点
$master = new Master(
retryAttempts: PHP_INT_MAX, // 无限重试
retryDelay: 3.0, // 重试间隔3秒
reconnectOnClose: true, // 断开时自动重连
logger: $logger
);
// 设置事件处理器
$master->on('error', function (\Exception $e, $context = []) {
echo "Error: " . $e->getMessage() . "\n";
echo "Context: " . json_encode($context) . "\n";
});
$master->on('connect', function ($tunnelStream) use ($master) {
// 身份验证
$tunnelStream->write([
'cmd' => 'auth',
'token' => 'register-center-token-2024'
]);
// 监听来自注册中心的命令
$tunnelStream->on('cmd', function ($cmd, $message) use ($tunnelStream, $master) {
echo "Received command: $cmd\n";
echo "Message: " . json_encode($message) . "\n";
if ($cmd === 'register') {
// 连接到新的注册中心
$registers = $message['registers'];
foreach ($registers as $register) {
$master->connectViaConnector($register['host'], $register['port']);
}
} elseif ($cmd === 'remove') {
// 移除注册中心连接
$registers = $message['registers'];
foreach ($registers as $register) {
$master->removeConnection($register['host'], $register['port']);
}
}
});
});
$master->on('close', function ($id, $url) {
echo "Disconnected from $url (ID: $id)\n";
});
// 连接到注册中心
$master->connectViaConnector('127.0.0.1', 8010);
// 运行事件循环
Loop::get()->run();
3. 运行示例
# 终端 1: 启动主注册中心
php examples/register.php
# 终端 2: 启动从注册中心
php examples/register1.php
# 终端 3: 启动主节点
php examples/master.php
高级特性
服务注册与本地调用
1. 服务注册
use ReactphpX\RegisterCenter\ServiceRegistry;
// 注册简单服务
ServiceRegistry::register('hello-service', new class {
public function sayHello() {
return "Hello, world! " . date('Y-m-d H:i:s');
}
public function greet($name, $title = 'Mr.') {
return "Hello, {$title} {$name}!";
}
});
// 注册业务服务
ServiceRegistry::register('user-service', new class {
private $users = [
1 => ['id' => 1, 'name' => 'Alice', 'email' => 'alice@example.com'],
2 => ['id' => 2, 'name' => 'Bob', 'email' => 'bob@example.com'],
];
public function getUser($id) {
return $this->users[$id] ?? null;
}
public function getAllUsers() {
return array_values($this->users);
}
public function createUser($name, $email) {
$id = max(array_keys($this->users)) + 1;
$this->users[$id] = [
'id' => $id,
'name' => $name,
'email' => $email
];
return $this->users[$id];
}
public function updateUser($id, $data) {
if (!isset($this->users[$id])) {
throw new \Exception("User not found");
}
$this->users[$id] = array_merge($this->users[$id], $data);
return $this->users[$id];
}
});
// 注册异步服务
ServiceRegistry::register('async-service', new class {
public function processTask($taskId, $data) {
// 模拟异步处理
$startTime = microtime(true);
// 处理业务逻辑
$result = array_map('strtoupper', $data);
$endTime = microtime(true);
return [
'taskId' => $taskId,
'result' => $result,
'processingTime' => $endTime - $startTime,
'timestamp' => time()
];
}
});
2. 本地服务调用
// 检查服务是否存在
if (ServiceRegistry::has('hello-service')) {
echo "Service registered successfully\n";
}
// 无参数调用
$result = ServiceRegistry::execute('hello-service', 'sayHello');
echo $result; // Hello, world! 2024-01-01 12:00:00
// 带参数调用
$result = ServiceRegistry::execute('hello-service', 'greet', [
'name' => 'John',
'title' => 'Dr.'
]);
echo $result; // Hello, Dr. John!
// 业务服务调用
$user = ServiceRegistry::execute('user-service', 'getUser', ['id' => 1]);
print_r($user);
$allUsers = ServiceRegistry::execute('user-service', 'getAllUsers');
print_r($allUsers);
// 错误处理
try {
$result = ServiceRegistry::execute('user-service', 'updateUser', [
'id' => 999,
'data' => ['name' => 'Updated Name']
]);
} catch (\Exception $e) {
echo "Service error: " . $e->getMessage() . "\n";
}
远程服务调用
1. 单次远程调用
// 在注册中心调用单个主节点上的服务
$masters = $center->getConnectedMasters();
if (!empty($masters)) {
$masterId = array_key_first($masters);
$stream = $center->runOnMaster($masterId, function ($stream) {
// 调用用户服务
$result = ServiceRegistry::execute('user-service', 'getAllUsers');
$stream->write($result);
$stream->end();
});
$stream->on('data', function ($data) use ($masterId) {
echo "Users from master {$masterId}: " . json_encode($data) . "\n";
});
}
2. 批量远程调用
// 在所有主节点上执行相同的服务
$loop->addPeriodicTimer(10, function () use ($center) {
$masters = $center->getConnectedMasters();
if (empty($masters)) {
echo "No masters connected\n";
return;
}
echo "Executing services on " . count($masters) . " masters\n";
// 批量执行多个服务方法
$streams = $center->runOnAllMasters(function ($stream) {
$results = [];
// 执行多个服务调用
$results['greeting'] = ServiceRegistry::execute('hello-service', 'sayHello');
$results['users'] = ServiceRegistry::execute('user-service', 'getAllUsers');
$results['task'] = ServiceRegistry::execute('async-service', 'processTask', [
'taskId' => uniqid(),
'data' => ['hello', 'world', 'reactphp']
]);
$stream->write($results);
$stream->end();
});
// 处理所有主节点的响应
foreach ($streams as $masterId => $stream) {
$stream->on('data', function ($data) use ($masterId) {
echo "\n=== Response from master {$masterId} ===\n";
if (is_array($data)) {
foreach ($data as $service => $result) {
echo "Service '{$service}': " . json_encode($result) . "\n";
}
} else {
echo "Raw response: {$data}\n";
}
});
$stream->on('error', function ($error) use ($masterId) {
echo "Error from master {$masterId}: {$error}\n";
});
$stream->on('close', function () use ($masterId) {
echo "Connection to master {$masterId} closed\n";
});
}
});
3. 异步服务调用模式
// 异步调用模式 - 任务分发
function distributeTask($center, $taskData) {
$masters = $center->getConnectedMasters();
$taskChunks = array_chunk($taskData, ceil(count($taskData) / count($masters)));
$results = [];
$completedTasks = 0;
$totalMasters = count($masters);
foreach ($masters as $masterId => $master) {
$chunk = array_shift($taskChunks);
if (!$chunk) continue;
$stream = $center->runOnMaster($masterId, function ($stream) use ($chunk) {
$result = ServiceRegistry::execute('async-service', 'processTask', [
'taskId' => uniqid(),
'data' => $chunk
]);
$stream->write($result);
$stream->end();
});
$stream->on('data', function ($data) use (&$results, &$completedTasks, $totalMasters, $masterId) {
$results[$masterId] = $data;
$completedTasks++;
echo "Task completed on master {$masterId} ({$completedTasks}/{$totalMasters})\n";
// 所有任务完成后的处理
if ($completedTasks === $totalMasters) {
echo "All tasks completed!\n";
$finalResult = array_merge(...array_column($results, 'result'));
echo "Final result: " . json_encode($finalResult) . "\n";
}
});
}
}
// 使用示例
$taskData = ['apple', 'banana', 'cherry', 'date', 'elderberry'];
distributeTask($center, $taskData);
4. 服务调用性能优化
// 连接池优化调用
class ServiceCaller {
private $center;
private $callQueue = [];
private $processingQueue = false;
public function __construct($center) {
$this->center = $center;
}
public function queueCall($service, $method, $params = [], $callback = null) {
$this->callQueue[] = [
'service' => $service,
'method' => $method,
'params' => $params,
'callback' => $callback,
'timestamp' => microtime(true)
];
if (!$this->processingQueue) {
$this->processQueue();
}
}
private function processQueue() {
if (empty($this->callQueue)) {
$this->processingQueue = false;
return;
}
$this->processingQueue = true;
$batch = array_splice($this->callQueue, 0, 10); // 批量处理10个调用
$masters = $this->center->getConnectedMasters();
if (empty($masters)) {
// 重新排队等待主节点连接
$this->callQueue = array_merge($batch, $this->callQueue);
$this->processingQueue = false;
return;
}
// 选择负载最低的主节点
$masterId = $this->selectLeastLoadedMaster($masters);
$stream = $this->center->runOnMaster($masterId, function ($stream) use ($batch) {
$results = [];
foreach ($batch as $call) {
try {
$result = ServiceRegistry::execute(
$call['service'],
$call['method'],
$call['params']
);
$results[] = [
'success' => true,
'result' => $result,
'call' => $call
];
} catch (\Exception $e) {
$results[] = [
'success' => false,
'error' => $e->getMessage(),
'call' => $call
];
}
}
$stream->write($results);
$stream->end();
});
$stream->on('data', function ($results) use ($batch) {
foreach ($results as $result) {
if (isset($result['call']['callback']) && is_callable($result['call']['callback'])) {
$result['call']['callback']($result);
}
}
// 继续处理队列
$this->processQueue();
});
}
private function selectLeastLoadedMaster($masters) {
// 简单的负载均衡算法,实际应用中可以基于更复杂的指标
return array_rand($masters);
}
}
// 使用示例
$caller = new ServiceCaller($center);
// 队列化调用
$caller->queueCall('user-service', 'getUser', ['id' => 1], function ($result) {
if ($result['success']) {
echo "User: " . json_encode($result['result']) . "\n";
} else {
echo "Error: " . $result['error'] . "\n";
}
});
$caller->queueCall('hello-service', 'greet', ['name' => 'Alice'], function ($result) {
echo "Greeting: " . $result['result'] . "\n";
});
服务调用最佳实践
1. 服务设计原则
// ✅ 好的服务设计
ServiceRegistry::register('product-service', new class {
// 返回明确的数据结构
public function getProduct($id) {
return [
'id' => $id,
'name' => 'Product Name',
'price' => 99.99,
'stock' => 10,
'status' => 'active'
];
}
// 参数验证
public function updateProduct($id, $data) {
if (!is_numeric($id) || $id <= 0) {
throw new \InvalidArgumentException('Invalid product ID');
}
$allowedFields = ['name', 'price', 'stock', 'status'];
$filteredData = array_intersect_key($data, array_flip($allowedFields));
// 更新逻辑...
return $this->getProduct($id);
}
// 幂等性操作
public function activateProduct($id) {
// 重复调用不会产生副作用
return $this->updateProduct($id, ['status' => 'active']);
}
});
// ❌ 避免的服务设计
ServiceRegistry::register('bad-service', new class {
// 不要返回不一致的数据结构
public function getData($type) {
if ($type === 'array') {
return ['data' => 'value'];
} else {
return 'string value'; // 不一致的返回类型
}
}
// 不要在服务中直接输出
public function processData($data) {
echo "Processing..."; // ❌ 避免直接输出
return $data;
}
});
2. 错误处理策略
// 统一的错误处理服务
ServiceRegistry::register('error-handler', new class {
public function handleServiceCall($service, $method, $params = []) {
try {
$result = ServiceRegistry::execute($service, $method, $params);
return [
'success' => true,
'data' => $result,
'timestamp' => time()
];
} catch (\InvalidArgumentException $e) {
return [
'success' => false,
'error' => 'validation_error',
'message' => $e->getMessage(),
'timestamp' => time()
];
} catch (\Exception $e) {
return [
'success' => false,
'error' => 'service_error',
'message' => $e->getMessage(),
'timestamp' => time()
];
}
}
});
// 远程调用的错误处理
function callServiceWithRetry($center, $service, $method, $params = [], $maxRetries = 3) {
$attempt = 0;
$executeCall = function() use ($center, $service, $method, $params, &$attempt, $maxRetries, &$executeCall) {
$attempt++;
$masters = $center->getConnectedMasters();
if (empty($masters)) {
if ($attempt < $maxRetries) {
// 等待后重试
Loop::get()->addTimer(1, $executeCall);
return;
}
throw new \Exception('No masters available after ' . $maxRetries . ' attempts');
}
$masterId = array_rand($masters);
$stream = $center->runOnMaster($masterId, function ($stream) use ($service, $method, $params) {
$result = ServiceRegistry::execute('error-handler', 'handleServiceCall', [
'service' => $service,
'method' => $method,
'params' => $params
]);
$stream->write($result);
$stream->end();
});
$stream->on('data', function ($data) use ($attempt, $maxRetries, $executeCall) {
if (!$data['success'] && $attempt < $maxRetries) {
echo "Call failed, retrying... (attempt {$attempt}/{$maxRetries})\n";
Loop::get()->addTimer(2, $executeCall);
} else {
echo "Final result: " . json_encode($data) . "\n";
}
});
$stream->on('error', function ($error) use ($attempt, $maxRetries, $executeCall) {
echo "Stream error: {$error}\n";
if ($attempt < $maxRetries) {
Loop::get()->addTimer(2, $executeCall);
}
});
};
$executeCall();
}
// 使用示例
callServiceWithRetry($center, 'product-service', 'getProduct', ['id' => 1]);
3. 性能监控与调试
// 服务调用监控
class ServiceMonitor {
private $metrics = [];
public function trackCall($service, $method, $duration, $success) {
$key = "{$service}.{$method}";
if (!isset($this->metrics[$key])) {
$this->metrics[$key] = [
'total_calls' => 0,
'success_calls' => 0,
'failed_calls' => 0,
'total_duration' => 0,
'avg_duration' => 0,
'min_duration' => PHP_FLOAT_MAX,
'max_duration' => 0
];
}
$metric = &$this->metrics[$key];
$metric['total_calls']++;
if ($success) {
$metric['success_calls']++;
} else {
$metric['failed_calls']++;
}
$metric['total_duration'] += $duration;
$metric['avg_duration'] = $metric['total_duration'] / $metric['total_calls'];
$metric['min_duration'] = min($metric['min_duration'], $duration);
$metric['max_duration'] = max($metric['max_duration'], $duration);
}
public function getMetrics() {
return $this->metrics;
}
public function reset() {
$this->metrics = [];
}
}
// 监控服务包装器
ServiceRegistry::register('monitored-service', new class {
private $monitor;
public function __construct() {
$this->monitor = new ServiceMonitor();
}
public function executeWithMonitoring($service, $method, $params = []) {
$startTime = microtime(true);
$success = false;
try {
$result = ServiceRegistry::execute($service, $method, $params);
$success = true;
return $result;
} catch (\Exception $e) {
throw $e;
} finally {
$duration = microtime(true) - $startTime;
$this->monitor->trackCall($service, $method, $duration, $success);
}
}
public function getMetrics() {
return $this->monitor->getMetrics();
}
});
// 定期输出性能指标
$loop->addPeriodicTimer(30, function () {
$metrics = ServiceRegistry::execute('monitored-service', 'getMetrics');
echo "\n=== Service Performance Metrics ===\n";
foreach ($metrics as $service => $metric) {
echo sprintf(
"%s: calls=%d, success=%.1f%%, avg=%.3fs, min=%.3fs, max=%.3fs\n",
$service,
$metric['total_calls'],
($metric['success_calls'] / $metric['total_calls']) * 100,
$metric['avg_duration'],
$metric['min_duration'],
$metric['max_duration']
);
}
});
4. 服务版本管理
// 版本化服务注册
class VersionedServiceRegistry {
private static $services = [];
public static function register($name, $version, $instance) {
$key = "{$name}@{$version}";
self::$services[$key] = $instance;
// 同时注册为默认版本(如果是最新的)
if (!isset(self::$services[$name]) || version_compare($version, self::getVersion($name), '>')) {
self::$services[$name] = $instance;
}
}
public static function execute($name, $method, $params = [], $version = null) {
$key = $version ? "{$name}@{$version}" : $name;
if (!isset(self::$services[$key])) {
throw new \Exception("Service {$key} not found");
}
return ServiceRegistry::execute($key, $method, $params);
}
private static function getVersion($name) {
// 从服务键中提取版本号
foreach (array_keys(self::$services) as $key) {
if (strpos($key, $name . '@') === 0) {
return substr($key, strlen($name) + 1);
}
}
return '1.0.0';
}
}
// 注册不同版本的服务
VersionedServiceRegistry::register('api-service', '1.0.0', new class {
public function getData() {
return ['version' => '1.0.0', 'data' => 'old format'];
}
});
VersionedServiceRegistry::register('api-service', '2.0.0', new class {
public function getData() {
return [
'version' => '2.0.0',
'data' => ['id' => 1, 'name' => 'new format'],
'metadata' => ['timestamp' => time()]
];
}
});
// 调用指定版本
$oldResult = VersionedServiceRegistry::execute('api-service', 'getData', [], '1.0.0');
$newResult = VersionedServiceRegistry::execute('api-service', 'getData', [], '2.0.0');
$defaultResult = VersionedServiceRegistry::execute('api-service', 'getData'); // 使用最新版本
身份验证与安全
主节点连接时需要进行身份验证:
$master->on('connect', function ($tunnelStream) {
// 发送认证令牌
$tunnelStream->write([
'cmd' => 'auth',
'token' => 'register-center-token-2024'
]);
// 监听认证结果
$tunnelStream->on('cmd', function ($cmd, $message) {
if ($cmd === 'auth-success') {
echo "Authentication successful\n";
} elseif ($cmd === 'auth-failed') {
echo "Authentication failed: " . $message['reason'] . "\n";
}
});
});
动态节点管理
注册中心支持动态添加和移除节点:
// 定时器:10秒后通知所有主节点有新的注册中心可连接
Loop::addTimer(10, function () use ($center) {
$center->writeRawMessageToAllMasters([
'cmd' => 'register',
'registers' => [
[
'host' => '127.0.0.1',
'port' => 8011,
]
]
]);
});
// 定时器:20秒后移除注册中心
Loop::addTimer(20, function () use ($center) {
$center->writeRawMessageToAllMasters([
'cmd' => 'remove',
'registers' => [
[
'host' => '127.0.0.1',
'port' => 8011,
]
]
]);
});
// 主节点处理注册中心管理命令
$master->on('connect', function ($tunnelStream) use ($master) {
$tunnelStream->on('cmd', function ($cmd, $message) use ($master) {
echo "Received command: $cmd\n";
echo "Message: " . json_encode($message) . "\n";
if ($cmd === 'register') {
// 连接到新的注册中心
$registers = $message['registers'];
foreach ($registers as $register) {
$master->connectViaConnector($register['host'], $register['port']);
}
} elseif ($cmd === 'remove') {
// 移除注册中心连接
$registers = $message['registers'];
foreach ($registers as $register) {
$master->removeConnection($register['host'], $register['port']);
}
}
});
});
服务监控
获取连接状态和服务信息:
// 获取已连接的主节点
$masters = $center->getConnectedMasters();
echo "Connected masters: " . count($masters) . "\n";
// 定期获取服务状态
Loop::addPeriodicTimer(1, function () use ($center) {
$services = $center->getServicesMaster();
echo "Available services: ";
var_export($services);
});
完整的日志配置
use Monolog\Logger;
use Monolog\Level;
use Monolog\Handler\StreamHandler;
use Monolog\Handler\FileHandler;
use Monolog\Formatter\LineFormatter;
// 创建日志器
$logger = new Logger('registration-center');
// 控制台输出
$consoleHandler = new StreamHandler('php://stdout', Level::Debug);
$consoleHandler->setFormatter(new LineFormatter(
"[%datetime%] %channel%.%level_name%: %message% %context%\n",
null,
true,
true
));
// 文件输出
$fileHandler = new FileHandler('logs/register-center.log', Level::INFO);
$fileHandler->setFormatter(new LineFormatter(
"[%datetime%] %channel%.%level_name%: %message% %context%\n"
));
$logger->pushHandler($consoleHandler);
$logger->pushHandler($fileHandler);
// 使用日志器
$center = new Register(8010, $loop, $logger);
$master = new Master(logger: $logger);
示例与使用场景
完整示例
在 examples 目录下提供了完整的工作示例:
register.php:主注册中心示例 - 监听端口 8010,管理主节点连接register1.php:从注册中心示例 - 监听端口 8011,作为备用注册中心master.php:主节点实现示例 - 注册服务并连接到注册中心
运行完整示例
# 终端 1: 启动主注册中心
php examples/register.php
# 终端 2: 启动从注册中心
php examples/register1.php
# 终端 3: 启动主节点
php examples/master.php
运行后您将看到:
- 主节点连接到主注册中心 (8010)
- 注册中心定期在主节点上执行服务方法
- 10秒后,主注册中心通知主节点连接到从注册中心 (8011)
- 20秒后,主注册中心通知主节点断开从注册中心连接
架构说明
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 注册中心 A │ │ 注册中心 B │ │ 注册中心 C │
│ (8010) │ │ (8011) │ │ (8012) │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
│ │ │
┌─────▼──────────────────────▼──────────────────────▼─────┐
│ 主节点 (Master) │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Service A │ │ Service B │ │
│ │ │ │ │ │
│ └─────────────────┘ └─────────────────┘ │
└──────────────────────────────────────────────────────────┘
使用场景
1. 微服务架构
// 用户服务
ServiceRegistry::register('user-service', new UserService());
// 订单服务
ServiceRegistry::register('order-service', new OrderService());
// 支付服务
ServiceRegistry::register('payment-service', new PaymentService());
2. 分布式任务处理
// 注册中心分发任务到各个主节点
$center->runOnAllMasters(function ($stream) use ($taskData) {
$result = ServiceRegistry::execute('task-processor', 'processTask', $taskData);
$stream->write($result);
});
3. 负载均衡
// 获取所有可用的主节点
$masters = $center->getConnectedMasters();
// 选择负载最低的主节点执行任务
$selectedMaster = selectLeastLoadedMaster($masters);
$center->runOnMaster($selectedMaster, function ($stream) {
// 执行特定任务
});
4. 故障转移
当主注册中心不可用时,主节点会自动连接到备用注册中心:
// 主节点会自动重连到可用的注册中心
$master = new Master(
retryAttempts: PHP_INT_MAX,
retryDelay: 3.0,
reconnectOnClose: true
);
API 参考
Register (注册中心)
构造函数
new Register(int $port, LoopInterface $loop, ?LoggerInterface $logger = null)
主要方法
start()- 启动注册中心getConnectedMasters()- 获取已连接的主节点列表getServicesMaster()- 获取主节点服务信息runOnAllMasters(callable $callback)- 在所有主节点上执行回调runOnMaster(string $masterId, callable $callback)- 在指定主节点上执行回调writeRawMessageToAllMasters(array $message)- 向所有主节点发送原始消息
Master (主节点)
构造函数
new Master(
int $retryAttempts = 3,
float $retryDelay = 1.0,
bool $reconnectOnClose = false,
?LoggerInterface $logger = null
)
主要方法
connectViaConnector(string $host, int $port)- 连接到注册中心removeConnection(string $host, int $port)- 移除到指定注册中心的连接on(string $event, callable $listener)- 添加事件监听器
事件
connect- 连接建立时触发error- 发生错误时触发close- 连接关闭时触发
ServiceRegistry (服务注册表)
静态方法
register(string $name, object $instance, array $metadata = [])- 注册服务get(string $name)- 获取服务实例has(string $name)- 检查服务是否存在remove(string $name)- 移除服务all()- 获取所有服务execute(string $name, string $method, array $arguments = [])- 执行服务方法
错误处理与调试
错误处理
// 主节点错误处理
$master->on('error', function (\Exception $e, $context = []) {
echo "错误:" . $e->getMessage() . "\n";
echo "上下文:" . json_encode($context) . "\n";
// 记录错误日志
$logger->error('Master error', [
'exception' => $e->getMessage(),
'context' => $context
]);
});
// 连接关闭处理
$master->on('close', function ($id, $url) {
echo "与 $url 的连接已关闭 (ID: $id)\n";
// 可以在这里实现重连逻辑
if ($shouldReconnect) {
$master->connectViaConnector($host, $port);
}
});
调试技巧
// 启用详细日志
$logger = new Logger('debug');
$logger->pushHandler(new StreamHandler('php://stdout', Level::DEBUG));
// 监控连接状态
$loop->addPeriodicTimer(10, function () use ($center) {
$masters = $center->getConnectedMasters();
echo "当前连接的主节点数量: " . count($masters) . "\n";
foreach ($masters as $id => $master) {
echo "主节点 ID: $id\n";
}
});
// 服务执行调试
try {
$result = ServiceRegistry::execute('service-name', 'method-name', $params);
echo "服务执行成功: " . json_encode($result) . "\n";
} catch (\Exception $e) {
echo "服务执行失败: " . $e->getMessage() . "\n";
}
性能优化
连接池管理
// 配置合理的重试参数
$master = new Master(
retryAttempts: 5, // 适中的重试次数
retryDelay: 2.0, // 合理的重试间隔
reconnectOnClose: true // 启用自动重连
);
日志级别优化
// 生产环境使用较高的日志级别
$handler = new StreamHandler('php://stdout', Level::WARNING);
// 开发环境使用详细日志
$handler = new StreamHandler('php://stdout', Level::DEBUG);
内存优化
// 定期清理不需要的服务
ServiceRegistry::remove('unused-service');
// 限制连接数量
if (count($center->getConnectedMasters()) > $maxConnections) {
// 拒绝新连接或关闭旧连接
}
常见问题
Q: 如何处理网络中断?
A: 主节点具备自动重连功能,配置 reconnectOnClose: true 即可在连接断开时自动重连。
Q: 如何在生产环境中使用?
A: 建议使用进程管理器如 Supervisor 来管理进程,并配置适当的日志级别和重试参数。
Q: 支持多少个并发连接?
A: 基于 ReactPHP,可以处理数千个并发连接,具体取决于服务器配置。
Q: 如何扩展到多个注册中心?
A: 使用动态节点管理功能,可以运行时添加和移除注册中心。
贡献
欢迎贡献代码!请遵循以下步骤:
- Fork 项目
- 创建功能分支 (
git checkout -b feature/amazing-feature) - 提交更改 (
git commit -m 'Add amazing feature') - 推送到分支 (
git push origin feature/amazing-feature) - 创建 Pull Request
开发环境设置
# 克隆项目
git clone https://github.com/reactphp-x/register-center.git
cd register-center
# 安装依赖
composer install
# 运行测试
vendor/bin/phpunit
# 运行示例
php examples/register.php
许可证
本项目采用 MIT 许可证 - 详见 LICENSE 文件
链接
ReactPHP X Register Center - 让分布式服务通信变得简单高效 🚀