kode/messaging

Maintainers

Package info

github.com/kodephp/messaging

Wiki

Documentation

pkg:composer/kode/messaging

Fund package maintenance!

kodephp

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

2.2.2 2026-06-08 09:51 UTC

This package is auto-updated.

Last update: 2026-06-08 09:51:41 UTC


README

统一消息层 | WebSocket / SSE / MQTT / UDP / Long-Polling / CoAP / NATS / STOMP / gRPC / WebTransport / RTMP | PHP 8.2+ | PSR 合规 | 可插拔适配器

PHP Version License kode Version

简介

kode/messagingkode/* 家族中的统一消息层 Composer 包,封装 WebSocketSSEMQTTUDPLong-PollingCoAPNATSSTOMPgRPCWebTransportRTMP 等 11 种长连接 / 实时消息协议,提供一致的 API协议无关的消息抽象可插拔的扩展点

一个 Messaging::server() 启动所有协议,业务代码面向接口编程,不感知具体协议

特性

特性 说明
🌐 11 协议统一 WebSocket / SSE / MQTT / UDP / Long-Polling / CoAP / NATS / STOMP / gRPC / WebTransport / RTMP
🔌 可插拔适配器 新增协议不改动核心代码(3 步:Adapter / 注册 / 文档)
📡 协议无关 业务层只依赖 MessageInterface
🧩 中间件管道 鉴权 / 限流 / 编解码 / 校验 / 追踪
🏷️ 路由 基于 event / topic 路由消息
📢 统一 Pub/Sub 进程内 / 跨进程 / 跨节点(Redis)
🛡️ PSR 合规 PSR-3 / PSR-4 / PSR-7 / PSR-14 / PSR-18
协程友好 kode/fibers 协作,无 Fiber 自动降级
🔒 安全 TLS、JWT、Origin 校验、签名鉴权、限流
📊 可观测 事件、指标、结构化日志
🚀 高性能 Swoole / Swow / stream 多传输层
🔁 重连 / 心跳 客户端内置指数退避重连、ping/pong

安装

composer require kode/messaging

按需安装可选依赖:

composer require kode/fibers        # 协程
composer require kode/event         # 事件派发
composer require kode/process       # 多 Worker / 集群
composer require kode/queue         # MQTT QoS 落地、延迟消息
composer require kode/jwt           # JWT 鉴权
composer require nyholm/psr7       # SSE 的 PSR-7 基础

快速开始

WebSocket

<?php
require __DIR__ . '/vendor/autoload.php';

use Kode\Messaging\Messaging;

Messaging::server('ws://0.0.0.0:8080')
    ->on('connection.open', fn($c) => $c->send('welcome'))
    ->on('message.received', fn($c, $m) => $c->send("echo: {$m->payload()}"))
    ->start();

SSE

Messaging::server('sse://0.0.0.0:8081')
    ->interval(1000)
    ->on('interval', fn($c) => $c->send(['event' => 'tick', 'data' => time()]))
    ->start();

MQTT

Messaging::client('mqtt://broker.example.com:1883')
    ->withClientId('device-001')
    ->subscribe('sensors/+/temperature', function ($topic, $payload) {
        echo "[$topic] $payload\n";
    })
    ->connect()
    ->loop();

NATS(pub/sub、request/reply)

$client = Messaging::client('nats://broker:4222');
$client->subscribe('orders.*', function ($subject, $payload) {
    echo "[$subject] $payload\n";
});
$client->connect();
$client->publish('orders.created', json_encode(['id' => 1]));

STOMP(消息队列客户端)

$client = Messaging::client('stomp://broker:61613');
$client->subscribe('/queue/orders', function ($data) {
    echo $data['body'] . "\n";
});
$client->connect();
$client->send('/queue/orders', 'hello');

gRPC Streaming

$client = Messaging::client('grpc://api.example.com:50051');
$response = $client->call('/helloworld.Greeter/SayHello', $reqPayload);

WebTransport(HTTP/3 双工)

$client = Messaging::client('webtransport://example.com:4433');
$conn = $client->connect();
$conn->sendBidirectional('hello');
$conn->sendDatagram('ping', reliable: false);

RTMP(直播源接入)

Messaging::server('rtmp://0.0.0.0:1935')
    ->on('message.received', fn($c, $m) => log_rtmp($m))
    ->start();

协议矩阵

协议 方案 服务端 客户端 适用
WebSocket ws:// / wss:// 浏览器长连接、聊天、游戏
SSE sse:// 服务端推送、通知、大屏
MQTT 3.1.1 / 5.0 mqtt:// / mqtts:// 实验性 Broker IoT、移动推送、Pub/Sub
UDP / Datagram udp:// 实时音视频、游戏、广播
Long-Polling poll:// / http:// WebSocket 回退、低频推送
CoAP (RFC 7252) coap:// / coaps:// IoT 传感器、NB-IoT、LoRa
NATS nats:// ✅ 嵌入式 Broker 微服务 Pub/Sub、request/reply
STOMP 1.2 stomp:// ✅ 嵌入式 Broker 消息队列(兼容 RabbitMQ / ActiveMQ)
gRPC Streaming grpc:// 微服务 RPC、4 种流式调用
WebTransport wt:// / webtransport:// HTTP/3-fallback HTTP/3 双工(依赖 aioquic / msquic)
RTMP rtmp:// / rtmps:// 直播源接入(OBS / FMLE)

架构

┌────────────────────────────────────────────────────────┐
│  Layer 5 — 应用层                                       │
│           Messaging::server()->on('message.received')  │
├────────────────────────────────────────────────────────┤
│  Layer 4 — 中间件管道(Auth → RateLimit → Codec → ...) │
├────────────────────────────────────────────────────────┤
│  Layer 3 — 协议适配器(WebSocket / SSE / MQTT / ...)   │
├────────────────────────────────────────────────────────┤
│  Layer 2 — 消息抽象(MessageInterface / Connection)    │
├────────────────────────────────────────────────────────┤
│  Layer 1 — 传输层(stream / sockets / swoole / swow)  │
└────────────────────────────────────────────────────────┘

与 kode/* 家族协作

场景 依赖包 协作方式
日志 kode/log 或 PSR-3 LoggerInterface 注入
协程 kode/fibers 长连接内启动 Fiber
事件 kode/event connection.open 等事件派发
上下文 kode/context 连接 ID、追踪
队列 kode/queue MQTT QoS 1/2 落地
进程 kode/process 多 Worker、集群
缓存 kode/cache 会话、广播订阅
HTTP kode/http / nyholm/psr7 SSE 复用 HTTP
HTTP 客户端 kode/http-client 长轮询回退
鉴权 kode/jwt JWT 鉴权中间件

PHP 8.2 / 8.3 / 8.4 / 8.5 兼容

  • 最低:PHP 8.2
  • 推荐:PHP 8.3 / 8.4
  • 已验证:PHP 8.5

支持的现代特性:

特性 用法 版本
readonly class 不可变消息体 ≥ 8.2
enum 协议状态机 ≥ 8.1
Fibers 协程 ≥ 8.1
typed class constants 协议常量 ≥ 8.3
#[\Override] 覆盖标记 ≥ 8.3
property hooks 连接属性 ≥ 8.4
pipe operator |> 链式构造 ≥ 8.5

8.5 不可用时自动提供 Messaging::pipeline() 等价实现。

文档

示例

  • examples/websocket_server.php — WebSocket 服务端
  • examples/websocket_client.php — WebSocket 客户端
  • examples/sse_server.php — SSE 服务端
  • examples/mqtt_publish.php — MQTT 发布
  • examples/mqtt_subscribe.php — MQTT 订阅
  • examples/udp_client.php — UDP 客户端
  • examples/coap_server.php / coap_client.php — CoAP 服务端 / 客户端
  • examples/nats_server.php / nats_client.php — NATS 服务端 / 客户端
  • examples/stomp_server.php — STOMP 服务端
  • examples/grpc_server.php — gRPC 服务端
  • examples/longpolling_server.php / longpolling_client.php — Long-Polling
  • examples/webtransport_server.php — WebTransport
  • examples/rtmp_server.php — RTMP 直播源接入
  • docs/examples/chat.php — 聊天室
  • docs/examples/push.php — 实时通知
  • docs/examples/iot.php — IoT 设备
  • docs/examples/rpc.php — RPC over WebSocket

许可证

Apache-2.0