kode / messaging
Fund package maintenance!
2.2.2
2026-06-08 09:51 UTC
Requires
- php: >=8.2
- ext-json: *
- ext-mbstring: *
- kode/limiting: ^1.7
- psr/log: ^2.0|^3.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.40
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^10.5|^11.0
Suggests
- ext-curl: 长轮询回退(与 kode/http-client 协作)
- ext-msgpack: MsgPack 编解码中间件
- ext-openssl: 支持 TLS(wss://、mqtts://)
- ext-pcntl: 信号处理(kode/process 协作时需要)
- ext-sockets: 使用 ext-sockets 传输层以获得更高性能(推荐生产环境)
- kode/ai-agent: AI Agent 流式消息(kode/ai-agent)
- kode/cache: 会话状态、广播订阅关系
- kode/context: 上下文追踪(TraceMiddleware)
- kode/event: 事件派发器(kode/event)
- kode/fibers: Fiber 协程支持
- kode/http-client: 长轮询回退(kode/http-client)
- kode/jwt: JWT 鉴权中间件(kode/jwt)
- kode/process: 多 Worker 进程管理、集群(kode/process)
- kode/queue: 异步消息落地、延迟消息
- nyholm/psr7: PSR-7 消息实现(SSE 嵌入 HTTP 服务时需要)
- swoole/swoole: Swoole 协程传输层(百万级并发)
- swow/swow: Swow 协程传输层(跨平台)
README
统一消息层 | WebSocket / SSE / MQTT / UDP / Long-Polling / CoAP / NATS / STOMP / gRPC / WebTransport / RTMP | PHP 8.2+ | PSR 合规 | 可插拔适配器
简介
kode/messaging 是 kode/* 家族中的统一消息层 Composer 包,封装 WebSocket、SSE、MQTT、UDP、Long-Polling、CoAP、NATS、STOMP、gRPC、WebTransport、RTMP 等 11 种长连接 / 实时消息协议,提供一致的 API、协议无关的消息抽象和可插拔的扩展点。
一个 Messaging::server() 启动所有协议,业务代码面向接口编程,不感知具体协议。
特性
| 特性 | 说明 |
|---|---|
| 🌐 11 协议统一 | WebSocket / SSE / MQTT / UDP / Long-Polling / CoAP / NATS / STOMP / gRPC / WebTransport / RTMP |
| 🔌 可插拔适配器 | 新增协议不改动核心代码(3 步:Adapter / 注册 / 文档) |
| 📡 协议无关 | 业务层只依赖 MessageInterface |
| 🧩 中间件管道 | 鉴权 / 限流 / 编解码 / 校验 / 追踪 |
| 🏷️ 路由 | 基于 event / topic 路由消息 |
| 📢 统一 Pub/Sub | 进程内 / 跨进程 / 跨节点(Redis) |
| 🛡️ PSR 合规 | PSR-3 / PSR-4 / PSR-7 / PSR-14 / PSR-18 |
| ⚡ 协程友好 | 与 kode/fibers 协作,无 Fiber 自动降级 |
| 🔒 安全 | TLS、JWT、Origin 校验、签名鉴权、限流 |
| 📊 可观测 | 事件、指标、结构化日志 |
| 🚀 高性能 | Swoole / Swow / stream 多传输层 |
| 🔁 重连 / 心跳 | 客户端内置指数退避重连、ping/pong |
安装
composer require kode/messaging
按需安装可选依赖:
composer require kode/fibers # 协程 composer require kode/event # 事件派发 composer require kode/process # 多 Worker / 集群 composer require kode/queue # MQTT QoS 落地、延迟消息 composer require kode/jwt # JWT 鉴权 composer require nyholm/psr7 # SSE 的 PSR-7 基础
快速开始
WebSocket
<?php require __DIR__ . '/vendor/autoload.php'; use Kode\Messaging\Messaging; Messaging::server('ws://0.0.0.0:8080') ->on('connection.open', fn($c) => $c->send('welcome')) ->on('message.received', fn($c, $m) => $c->send("echo: {$m->payload()}")) ->start();
SSE
Messaging::server('sse://0.0.0.0:8081') ->interval(1000) ->on('interval', fn($c) => $c->send(['event' => 'tick', 'data' => time()])) ->start();
MQTT
Messaging::client('mqtt://broker.example.com:1883') ->withClientId('device-001') ->subscribe('sensors/+/temperature', function ($topic, $payload) { echo "[$topic] $payload\n"; }) ->connect() ->loop();
NATS(pub/sub、request/reply)
$client = Messaging::client('nats://broker:4222'); $client->subscribe('orders.*', function ($subject, $payload) { echo "[$subject] $payload\n"; }); $client->connect(); $client->publish('orders.created', json_encode(['id' => 1]));
STOMP(消息队列客户端)
$client = Messaging::client('stomp://broker:61613'); $client->subscribe('/queue/orders', function ($data) { echo $data['body'] . "\n"; }); $client->connect(); $client->send('/queue/orders', 'hello');
gRPC Streaming
$client = Messaging::client('grpc://api.example.com:50051'); $response = $client->call('/helloworld.Greeter/SayHello', $reqPayload);
WebTransport(HTTP/3 双工)
$client = Messaging::client('webtransport://example.com:4433'); $conn = $client->connect(); $conn->sendBidirectional('hello'); $conn->sendDatagram('ping', reliable: false);
RTMP(直播源接入)
Messaging::server('rtmp://0.0.0.0:1935') ->on('message.received', fn($c, $m) => log_rtmp($m)) ->start();
协议矩阵
| 协议 | 方案 | 服务端 | 客户端 | 适用 |
|---|---|---|---|---|
| WebSocket | ws:// / wss:// |
✅ | ✅ | 浏览器长连接、聊天、游戏 |
| SSE | sse:// |
✅ | ✅ | 服务端推送、通知、大屏 |
| MQTT 3.1.1 / 5.0 | mqtt:// / mqtts:// |
实验性 Broker | ✅ | IoT、移动推送、Pub/Sub |
| UDP / Datagram | udp:// |
✅ | ✅ | 实时音视频、游戏、广播 |
| Long-Polling | poll:// / http:// |
✅ | ✅ | WebSocket 回退、低频推送 |
| CoAP (RFC 7252) | coap:// / coaps:// |
✅ | ✅ | IoT 传感器、NB-IoT、LoRa |
| NATS | nats:// |
✅ 嵌入式 Broker | ✅ | 微服务 Pub/Sub、request/reply |
| STOMP 1.2 | stomp:// |
✅ 嵌入式 Broker | ✅ | 消息队列(兼容 RabbitMQ / ActiveMQ) |
| gRPC Streaming | grpc:// |
✅ | ✅ | 微服务 RPC、4 种流式调用 |
| WebTransport | wt:// / webtransport:// |
HTTP/3-fallback | ✅ | HTTP/3 双工(依赖 aioquic / msquic) |
| RTMP | rtmp:// / rtmps:// |
✅ | ✅ | 直播源接入(OBS / FMLE) |
架构
┌────────────────────────────────────────────────────────┐
│ Layer 5 — 应用层 │
│ Messaging::server()->on('message.received') │
├────────────────────────────────────────────────────────┤
│ Layer 4 — 中间件管道(Auth → RateLimit → Codec → ...) │
├────────────────────────────────────────────────────────┤
│ Layer 3 — 协议适配器(WebSocket / SSE / MQTT / ...) │
├────────────────────────────────────────────────────────┤
│ Layer 2 — 消息抽象(MessageInterface / Connection) │
├────────────────────────────────────────────────────────┤
│ Layer 1 — 传输层(stream / sockets / swoole / swow) │
└────────────────────────────────────────────────────────┘
与 kode/* 家族协作
| 场景 | 依赖包 | 协作方式 |
|---|---|---|
| 日志 | kode/log 或 PSR-3 |
LoggerInterface 注入 |
| 协程 | kode/fibers |
长连接内启动 Fiber |
| 事件 | kode/event |
connection.open 等事件派发 |
| 上下文 | kode/context |
连接 ID、追踪 |
| 队列 | kode/queue |
MQTT QoS 1/2 落地 |
| 进程 | kode/process |
多 Worker、集群 |
| 缓存 | kode/cache |
会话、广播订阅 |
| HTTP | kode/http / nyholm/psr7 |
SSE 复用 HTTP |
| HTTP 客户端 | kode/http-client |
长轮询回退 |
| 鉴权 | kode/jwt |
JWT 鉴权中间件 |
PHP 8.2 / 8.3 / 8.4 / 8.5 兼容
- 最低:PHP 8.2
- 推荐:PHP 8.3 / 8.4
- 已验证:PHP 8.5
支持的现代特性:
| 特性 | 用法 | 版本 |
|---|---|---|
readonly class |
不可变消息体 | ≥ 8.2 |
enum |
协议状态机 | ≥ 8.1 |
Fibers |
协程 | ≥ 8.1 |
| typed class constants | 协议常量 | ≥ 8.3 |
#[\Override] |
覆盖标记 | ≥ 8.3 |
| property hooks | 连接属性 | ≥ 8.4 |
pipe operator |> |
链式构造 | ≥ 8.5 |
8.5 不可用时自动提供 Messaging::pipeline() 等价实现。
文档
- docs/index.md — 文档总览
- docs/quick-start.md — 快速开始
- docs/architecture.md — 架构设计
- docs/websocket.md — WebSocket 协议指南
- docs/sse.md — SSE 协议指南
- docs/mqtt.md — MQTT 协议指南
- docs/udp.md — UDP 协议指南
- docs/long-polling.md — Long-Polling 协议指南
- docs/coap.md — CoAP 协议指南
- docs/nats.md — NATS 协议指南
- docs/stomp.md — STOMP 协议指南
- docs/grpc.md — gRPC Streaming 协议指南
- docs/webtransport.md — WebTransport 协议指南
- docs/rtmp.md — RTMP 协议指南
- docs/roadmap.md — 协议扩展路线图
- docs/pubsub.md — 发布订阅总线
- docs/middleware.md — 中间件
- docs/configuration.md — 配置
- docs/deployment.md — 部署
- docs/release.md — 发布流程
- docs/migration.md — 从其它框架迁移
- docs/examples/ — 完整示例
示例
examples/websocket_server.php— WebSocket 服务端examples/websocket_client.php— WebSocket 客户端examples/sse_server.php— SSE 服务端examples/mqtt_publish.php— MQTT 发布examples/mqtt_subscribe.php— MQTT 订阅examples/udp_client.php— UDP 客户端examples/coap_server.php/coap_client.php— CoAP 服务端 / 客户端examples/nats_server.php/nats_client.php— NATS 服务端 / 客户端examples/stomp_server.php— STOMP 服务端examples/grpc_server.php— gRPC 服务端examples/longpolling_server.php/longpolling_client.php— Long-Pollingexamples/webtransport_server.php— WebTransportexamples/rtmp_server.php— RTMP 直播源接入docs/examples/chat.php— 聊天室docs/examples/push.php— 实时通知docs/examples/iot.php— IoT 设备docs/examples/rpc.php— RPC over WebSocket
许可证
Apache-2.0