cooper / postgre-cdc
PostgreSQL Change Data Capture using logical replication
Installs: 0
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
pkg:composer/cooper/postgre-cdc
Requires
- php: >=8.0
 - ext-pcntl: *
 - ext-pdo: *
 - ext-pdo_pgsql: *
 - ext-pgsql: *
 - monolog/monolog: ^2.0
 
This package is auto-updated.
Last update: 2025-10-04 15:07:03 UTC
README
这个 PHP 库提供了一个简单易用的方式来监听和处理 PostgreSQL 数据库的变更,使用逻辑复制(Logical Replication)功能。它使用 wal2json 插件解析 PostgreSQL 的逻辑复制输出,并将其转换为易于使用的 PHP 数组,方便开发者实现各种 CDC(变更数据捕获)应用场景。
功能特点
- 自动设置 PostgreSQL 逻辑复制槽和发布
 - 使用 wal2json 插件解析 PostgreSQL 的逻辑复制输出为 JSON 格式
 - 支持插入、更新、删除等操作的解析
 - 自动重连和错误处理
 - 可定制的日志记录
 - 信号处理(优雅关闭)
 
系统要求
- PHP 8.0+
 - PostgreSQL 10+(启用了逻辑复制功能和安装了 wal2json 插件)
 - PHP 扩展:
- pdo
 - pdo_pgsql
 - pgsql
 - pcntl(可选,用于信号处理)
 
 - Monolog 2.0+(用于日志记录)
 
安装
通过 Composer 安装:
composer require cooper/postgre-cdc
配置 PostgreSQL
确保 PostgreSQL 已启用逻辑复制功能,并安装了 wal2json 插件。在 postgresql.conf 中设置:
wal_level = logical         # 启用逻辑复制
max_replication_slots = 5   # 复制槽数量
max_wal_senders = 5         # 并发复制连接数
shared_preload_libraries = 'wal2json'  # 加载插件
然后重启 PostgreSQL 服务器。
基本用法
<?php require_once 'vendor/autoload.php'; use Cooper\PostgreCDC\PostgreLogicalReplication; // 数据库配置 $dbConfig = [ 'host' => 'localhost', 'port' => '5432', 'dbname' => 'your_database', 'user' => 'your_username', 'password' => 'your_password', 'replication_slot_name' => 'my_replication_slot', 'publication_name' => 'my_publication' ]; // 创建复制实例 $replication = new PostgreLogicalReplication($dbConfig); // 连接数据库 if (!$replication->connect()) { die("无法连接到 PostgreSQL 数据库\n"); } // 设置复制 if (!$replication->setupReplication()) { die("无法设置复制环境\n"); } // 定义变更处理回调函数 $handleChange = function($data, $rawJsonData = null) { // 根据变更类型处理数据 if (isset($data['change'])) { foreach ($data['change'] as $change) { $kind = $change['kind'] ?? ''; $table = $change['table'] ?? ''; switch ($kind) { case 'insert': echo "插入操作: 表 {$table}\n"; if (isset($change['columnvalues'])) { print_r($change['columnvalues']); } break; case 'update': echo "更新操作: 表 {$table}\n"; if (isset($change['columnvalues'])) { echo "新值:\n"; print_r($change['columnvalues']); } if (isset($change['oldkeys'])) { echo "旧键值:\n"; print_r($change['oldkeys']); } break; case 'delete': echo "删除操作: 表 {$table}\n"; if (isset($change['oldkeys'])) { print_r($change['oldkeys']); } break; } } } }; // 开始监听变更 try { $replication->startReplication($handleChange); } catch (Exception $e) { echo "错误: " . $e->getMessage() . "\n"; } finally { $replication->close(); }
配置选项
数据库配置
| 参数 | 描述 | 默认值 | 
|---|---|---|
| host | PostgreSQL 服务器主机 | - | 
| port | PostgreSQL 服务器端口 | - | 
| dbname | 数据库名称 | - | 
| user | 用户名 | - | 
| password | 密码 | - | 
| replication_slot_name | 复制槽名称 | php_logical_slot | 
| publication_name | 发布名称 | php_publication | 
| application_name | 应用名称 | php_logical_replication | 
方法
| 方法 | 描述 | 
|---|---|
| connect() | 连接到 PostgreSQL 数据库 | 
| setupReplication() | 设置复制环境(创建复制槽和发布) | 
| startReplication(callable $callback) | 开始监听变更数据 | 
| close() | 关闭连接 | 
| setHeartbeatInterval(int $seconds) | 设置心跳间隔(秒) | 
| setMaxReconnectAttempts(int $attempts) | 设置最大重连次数 | 
| setReconnectDelay(int $seconds) | 设置重连延迟(秒) | 
| recreateReplicationSlot() | 重新创建复制槽 | 
wal2json 输出格式
wal2json 插件输出的 JSON 数据格式如下:
插入操作
{
  "change": [
    {
      "kind": "insert",
      "schema": "public",
      "table": "users",
      "columnnames": ["id", "name", "email"],
      "columntypes": ["integer", "character varying(255)", "character varying(255)"],
      "columnvalues": [1, "张三", "zhangsan@example.com"]
    }
  ]
}
更新操作
{
  "change": [
    {
      "kind": "update",
      "schema": "public",
      "table": "users",
      "columnnames": ["id", "name", "email"],
      "columntypes": ["integer", "character varying(255)", "character varying(255)"],
      "columnvalues": [1, "张三", "zhangsan_new@example.com"],
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["integer"],
        "keyvalues": [1]
      }
    }
  ]
}
删除操作
{
  "change": [
    {
      "kind": "delete",
      "schema": "public",
      "table": "users",
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["integer"],
        "keyvalues": [1]
      }
    }
  ]
}
关于 wal2json 插件
wal2json 是 PostgreSQL 的一个逻辑解码输出插件,它将 WAL(预写式日志)中的变更转换为 JSON 格式。这使得处理和消费这些变更变得更加容易,特别是对于需要与其他系统集成的应用程序。
wal2json 支持以下功能:
- 将 INSERT、UPDATE、DELETE 操作转换为 JSON 格式
 - 支持事务边界(开始和提交)
 - 提供列名、类型和值
 - 提供主键信息
 - 支持多种配置选项,如时间戳、模式名称等
 
更多关于 wal2json 的信息,请访问 wal2json GitHub 仓库。
许可证
The MIT License (MIT). Please see License File for more information.