cooper / canal-client
A PHP client for Alibaba Canal, supporting native socket, Swoole, and Clue socket adapters.
v1.0.3
2024-02-19 08:58 UTC
Requires
- php: >=8.0
- ext-sockets: *
- clue/socket-raw: ^1.4
- google/protobuf: ^3.8
- swoole/ide-helper: ^5.0
This package is auto-updated.
Last update: 2026-03-13 08:38:53 UTC
README
PHP client for Alibaba Canal — MySQL binlog incremental subscription and consumption middleware.
Forked from canal-php, with PHP 8.0+ and Swoole 5.0+ support.
Requirements
- PHP >= 8.0
- ext-sockets
- google/protobuf ^3.8
Optional:
- ext-swoole — for Swoole adapter
- clue/socket-raw — for Clue socket adapter
Installation
composer require cooper/canal-client
Adapter Types
| Type | Constant | Description |
|---|---|---|
| Native Socket | CanalClient::TYPE_SOCKET |
PHP native socket (ext-sockets) |
| Swoole | CanalClient::TYPE_SWOOLE |
Swoole coroutine client |
| Clue Socket | CanalClient::TYPE_SOCKET_CLUE |
clue/socket-raw library |
Usage
<?php use Cooper\CanalClient\CanalClient; use Cooper\CanalClient\CanalConnectorFactory; use Cooper\CanalClient\Fmt; require_once __DIR__ . '/vendor/autoload.php'; try { // Create client (choose adapter type) $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET); // $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE); // $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE); // Connect to Canal server $client->connect("127.0.0.1", 11111); // Subscribe to changes (clientId, destination, filter) $client->subscribe("1001", "example", ".*\\..*"); // $client->subscribe("1001", "example", "db_name.tb_name"); while (true) { $message = $client->get(100); if ($entries = $message->getEntries()) { foreach ($entries as $entry) { Fmt::println($entry); } } sleep(1); } } catch (\Exception $e) { echo $e->getMessage(), PHP_EOL; }
API
CanalConnectorFactory
createClient(int $clientType): BaseCanalConnector— Create a connector instance
BaseCanalConnector
connect(string $host, int $port, string $user, string $password, ...)— Connect to Canal serversubscribe(string $clientId, string $destination, string $filter)— Subscribe to binlog changesunSubscribe()— Unsubscribeget(int $size)— Get messages with auto ackgetWithoutAck(int $batchSize, int $timeout, int $unit)— Get messages without ackack(mixed $messageId)— Acknowledge messagerollback(int $batchId)— RollbackdisConnect()— Disconnect
Fmt
Fmt::println(Entry $entry)— Print binlog entry to stdout
Testing
composer test