phalanx/websocket

WebSocket support for Phalanx async coordination library

Maintainers

Package info

github.com/havy-tech/phalanx-websocket

pkg:composer/phalanx/websocket

Statistics

Installs: 0

Dependents: 0

Suggesters: 1

Stars: 0

Open Issues: 0

v0.2.0 2026-03-27 04:01 UTC

This package is auto-updated.

Last update: 2026-03-27 19:31:13 UTC


README

Phalanx

phalanx/websocket

Production-grade WebSocket support with RFC 6455 handshake, topic-based pub/sub, and leak-free connection tracking via WeakMap. Integrates directly with the Phalanx HTTP runner -- WebSocket and HTTP traffic share a single port.

Table of Contents

Installation

composer require phalanx/websocket

Requires PHP 8.4+, phalanx/core, phalanx/stream, ratchet/rfc6455, and react/stream.

Quick Start

<?php

declare(strict_types=1);

use Phalanx\Scope;
use Phalanx\Task\Scopeable;
use Phalanx\WebSocket\WsMessage;
use Phalanx\WebSocket\WsScope;

final readonly class EchoHandler implements Scopeable
{
    public function __invoke(Scope $scope): mixed
    {
        assert($scope instanceof WsScope);
        $conn = $scope->connection;

        foreach ($conn->inbound->consume() as $msg) {
            $conn->send(WsMessage::text("echo: {$msg->payload}"));
        }

        return null;
    }
}
<?php

use Phalanx\Http\Runner;
use Phalanx\WebSocket\WsRoute;
use Phalanx\WebSocket\WsRouteGroup;

$ws = WsRouteGroup::of([
    '/ws/echo' => new WsRoute(fn: new EchoHandler()),
]);

$app = Application::starting()->compile();

Runner::from($app)
    ->withWebsockets($ws)
    ->run('0.0.0.0:8080');

Connect with any WebSocket client to ws://localhost:8080/ws/echo and every message comes back prefixed with echo:.

Connections

WsConnection represents a single WebSocket peer. Each connection holds two channels -- inbound for received frames, outbound for frames to send:

Send frames with $conn->send(WsMessage::text('hello')), $conn->sendBinary($bytes), $conn->ping(), or $conn->close(). Check state with $conn->id and $conn->isOpen.

Inbound messages arrive through a channel that supports iteration:

<?php

foreach ($conn->inbound->consume() as $msg) {
    // Process each frame as it arrives
}
// Loop exits when the connection closes

Messages

WsMessage wraps a payload and opcode with named constructors for every frame type:

<?php

use Phalanx\WebSocket\WsMessage;

$text   = WsMessage::text('{"action": "join"}');
$json   = WsMessage::json(['action' => 'join']); // Encode to JSON text frame
$binary = WsMessage::binary($protobuf);
$ping   = WsMessage::ping();
$pong   = WsMessage::pong();
$close  = WsMessage::close(WsCloseCode::Normal, 'goodbye');

Type checks use property hooks:

<?php

if ($msg->isText) {
    $data = $msg->decode(); // Decode JSON payload, throws on invalid JSON
}

if ($msg->isBinary) {
    processBuffer($msg->payload);
}

if ($msg->isClose) {
    echo "Closed with code: {$msg->closeCode->value}\n";
}

Routes

WsRoute defines a handler for a specific WebSocket path. The handler receives a WsScope with the connection, the upgrade request, route parameters, and the full ExecutionScope:

<?php

declare(strict_types=1);

use Phalanx\Scope;
use Phalanx\Task\Scopeable;
use Phalanx\WebSocket\WsScope;

final readonly class ChatHandler implements Scopeable
{
    public function __invoke(Scope $scope): mixed
    {
        assert($scope instanceof WsScope);
        $conn = $scope->connection;
        $request = $scope->request;
        $params = $scope->params;

        foreach ($conn->inbound->consume() as $msg) {
            // Handle messages
        }

        return null;
    }
}
<?php

use Phalanx\WebSocket\WsRoute;

$chatRoute = new WsRoute(fn: new ChatHandler());

Group routes into a WsRouteGroup:

<?php

use Phalanx\WebSocket\WsRouteGroup;

$ws = WsRouteGroup::of([
    '/ws/chat'        => $chatRoute,
    '/ws/feed'        => $feedRoute,
    '/ws/admin'       => $adminRoute,
]);

Route keys accept bare paths (/ws/chat) or the explicit WS /ws/chat prefix.

Gateway Pub/Sub

WsGateway manages connections and topics. It uses WeakMap internally -- when a connection object is garbage collected, its subscriptions vanish automatically. No manual cleanup, no memory leaks.

<?php

use Phalanx\WebSocket\WsGateway;
use Phalanx\WebSocket\WsMessage;

$gateway = $scope->service(WsGateway::class);

// Register a connection
$gateway->register($conn);

// Subscribe to topics
$gateway->subscribe($conn, 'chat.room.42', 'notifications');

// Publish to all subscribers of a topic
$gateway->publish('chat.room.42', WsMessage::text($json));

// Publish excluding the sender
$gateway->publish('chat.room.42', WsMessage::text($json), exclude: $conn);

// Broadcast to every connected client
$gateway->broadcast(WsMessage::json(['type' => 'system', 'text' => 'Maintenance in 5 minutes']));

// Unsubscribe or remove entirely
$gateway->unsubscribe($conn, 'chat.room.42');
$gateway->unregister($conn);

Connection Lifecycle

A typical chat handler that registers with the gateway, subscribes to a room, and relays messages:

<?php

declare(strict_types=1);

use Phalanx\Scope;
use Phalanx\Task\Scopeable;
use Phalanx\WebSocket\WsGateway;
use Phalanx\WebSocket\WsMessage;
use Phalanx\WebSocket\WsScope;

final readonly class ChatRoomHandler implements Scopeable
{
    public function __invoke(Scope $scope): mixed
    {
        assert($scope instanceof WsScope);
        $conn = $scope->connection;
        $gateway = $scope->service(WsGateway::class);
        $room = $scope->params->get('room');

        $gateway->register($conn);
        $gateway->subscribe($conn, "chat.{$room}");

        $gateway->publish(
            "chat.{$room}",
            WsMessage::json(['type' => 'join', 'id' => $conn->id]),
            exclude: $conn,
        );

        foreach ($conn->inbound->consume() as $msg) {
            if ($msg->isText) {
                $gateway->publish("chat.{$room}", $msg, exclude: $conn);
            }

            if ($msg->isClose) {
                break;
            }
        }

        $gateway->publish(
            "chat.{$room}",
            WsMessage::json(['type' => 'leave', 'id' => $conn->id]),
        );

        $gateway->unregister($conn);

        return null;
    }
}
<?php

use Phalanx\WebSocket\WsRoute;

$chat = new WsRoute(fn: new ChatRoomHandler());

The foreach loop over $conn->inbound->consume() blocks the fiber (not the event loop) until the next frame arrives. When the client disconnects, the iterator completes and execution continues with cleanup.

Route Parameters

WebSocket routes support the same {param} syntax as HTTP routes:

<?php

$ws = WsRouteGroup::of([
    '/ws/rooms/{room}'          => $roomRoute,
    '/ws/users/{id:\\d+}/feed'  => $userFeedRoute,
]);

Access parameters through $scope->params:

<?php

$room = $scope->params->get('room');
$userId = $scope->params->get('id');

Integration with phalanx/http

The HTTP Runner handles WebSocket upgrades on the same port as HTTP traffic. No separate server needed:

<?php

use Phalanx\Http\RouteGroup;
use Phalanx\Http\Runner;
use Phalanx\WebSocket\WsRouteGroup;

$http = RouteGroup::of([
    'GET /api/rooms'     => $listRooms,
    'POST /api/rooms'    => $createRoom,
]);

$ws = WsRouteGroup::of([
    '/ws/rooms/{room}' => $roomRoute,
]);

Runner::from($app)
    ->withRoutes($http)
    ->withWebsockets($ws)
    ->run('0.0.0.0:8080');

HTTP requests go to the route group. Upgrade requests (with Connection: Upgrade and Upgrade: websocket headers) are routed to the WsRouteGroup. The WsHandshake class handles RFC 6455 negotiation and subprotocol selection automatically.