opencck/server

HTTP/WS/SSE non-blocking server based on revolt event loop and amphp-v3

1.0.0 2023-12-30 11:48 UTC

README

This package provides a non-blocking HTTP/1.1 and HTTP/2 application server (with WebSocket and SSE) written in PHP based on AmpPHP v3 and Revolt Event Loop.

Requirements

  • PHP 8.1
  • php sockets extension

Installation

This package can be installed as a Composer dependency.

composer require opencck/server

Documentation

Example server

<?php

use Amp\Http\Server\Driver\ConnectionLimitingClientFactory;
use Amp\Http\Server\Driver\ConnectionLimitingServerSocketFactory;
use Amp\Http\Server\Driver\SocketClientFactory;
use Amp\Http\Server\SocketHttpServer;
use Amp\Http\Server\DefaultErrorHandler;
use Amp\Http\Server\RequestHandler\ClosureRequestHandler;
use Amp\Http\Server\Request;
use Amp\Http\Server\Response;
use Amp\Http\Server\Router;
use Amp\Http\Server\StaticContent\DocumentRoot;
use Amp\Http\Server\FormParser;
use Amp\Http\HttpStatus;

use Amp\Sync\LocalSemaphore;
use Amp\Websocket\Server\Websocket;
use Amp\Websocket\Server\WebsocketGateway;
use Amp\Websocket\Server\WebsocketClientGateway;
use Amp\Websocket\Server\WebsocketClientHandler;
use Amp\Websocket\Server\AllowOriginAcceptor;
use Amp\Websocket\WebsocketClient;

use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Monolog\Level;
use Monolog\Logger;

use Amp\Socket;
use Amp\ByteStream\WritableResourceStream;
use Revolt\EventLoop;
use function Amp\trapSignal;

require __DIR__ . '/vendor/autoload.php';

/**
 * @param int|string $val
 * @return int
 */
function return_bytes(int|string $val): int {
    $val = trim($val);

    $units = ['g' => 1_073_741_824, 'm' => 1_048_576, 'k' => 1024];
    $unit = strtolower($val[strlen($val) - 1]);

    return intval($val) * $units[$unit];
}

// logger
$logHandler = new StreamHandler(new WritableResourceStream(STDOUT));
$logHandler->setFormatter(new ConsoleFormatter());
$logHandler->setLevel(Level::Info);
$logger = new Logger('server');
$logger->pushHandler($logHandler);

// server
$serverSocketFactory = new ConnectionLimitingServerSocketFactory(new LocalSemaphore(maxLocks: 1000));
$clientFactory = new ConnectionLimitingClientFactory(
    clientFactory: new SocketClientFactory($logger),
    logger: $logger,
    connectionsPerIpLimit: 10
);
$server = new SocketHttpServer($logger, $serverSocketFactory, $clientFactory);

// router
$errorHandler = new DefaultErrorHandler();
$router = new Router($server, $logger, $errorHandler);

// http route (GET)
$router->addRoute(
    method: 'GET',
    uri: '/',
    requestHandler: new ClosureRequestHandler(function (Request $request): Response {
        return new Response(
            HttpStatus::OK,
            ['content-type' => 'text/html; charset=utf-8'],
            '
            <!DOCTYPE html>
            <html lang="en">
                <body>
                    <script>
                        const ws = new WebSocket(`ws://${location.host}/ws`);
                        ws.onopen = function () { console.log("Connected"); }
                        ws.onmessage = function (messageEvent) { console.log(messageEvent.data); }
                    </script>
                    <script>
                        const eventSource = new EventSource("/events");
                        const eventList = document.createElement("ol");
                        document.body.appendChild(eventList);
                        eventSource.addEventListener("notification", function (e) {
                            const element = document.createElement("li");
                            element.textContent = "Message: " + e.data;
                            eventList.appendChild(element);
                        });
                    </script>
                </body>
            </html>'
        );
    })
);

// http route (POST)
$router->addRoute(
    method: 'POST',
    uri: '/',
    requestHandler: new ClosureRequestHandler(function (Request $request): Response {
        $form = FormParser\Form::fromRequest($request, new FormParser\FormParser(return_bytes(ini_get('post_max_size'))));
        return new Response(
            HttpStatus::OK,
            ['content-type' => 'text/html; charset=utf-8'],
            '<!DOCTYPE html><html lang="en"><body><pre>' . print_r($form->getValues(), true) . '</pre></body></html>'
        );
    })
);

// websocket route
$gateway = new WebsocketClientGateway();
$router->addRoute(
    method: 'GET',
    uri: '/ws',
    requestHandler: new Websocket(
        httpServer: $server,
        logger: $logger,
        acceptor: new AllowOriginAcceptor([
            'http://' . ($_ENV['HOST_MACHINE_IP'] ?? '127.0.0.1') . ':8080',
            'http://localhost:8080',
            'http://[::1]:8080',
        ]),
        clientHandler: new class ($gateway) implements WebsocketClientHandler {
            public function __construct(private readonly WebsocketGateway $gateway = new WebsocketClientGateway()) {
            }

            public function handleClient(WebsocketClient $client, Request $request, Response $response): void {
                $this->gateway->addClient($client);

                while ($message = $client->receive()) {
                    $this->gateway->broadcastText($message->read());
                }
            }
        }
    )
);

// SSE route
$router->addRoute(
    method: 'GET',
    uri: '/events',
    requestHandler: new ClosureRequestHandler(function (Request $request): Response {
        // We stream the response here, one event every 500 ms.
        return new Response(
            status: HttpStatus::OK,
            headers: ['content-type' => 'text/event-stream; charset=utf-8'],
            body: new \Amp\ByteStream\ReadableIterableStream(
                (function () {
                    for ($i = 0; $i < 30; $i++) {
                        \Amp\delay(0.5);
                        yield "event: notification\ndata: Event {$i}\n\n";
                    }
                })()
            )
        );
    })
);

// set static files from directory for fallback
$dir = __DIR__ . DIRECTORY_SEPARATOR . 'public';
if (!is_dir($dir)) {
    mkdir($dir);
}
$router->setFallback(new DocumentRoot($server, $errorHandler, $dir));

// send micro time every 1 second to all WS clients
EventLoop::repeat(1, static function () use ($gateway) {
    $gateway->broadcastText(microtime(true));
});

// handle event loop errors
EventLoop::setErrorHandler(function ($e) use ($logger) {
    $logger->error($e->getMessage());
});

// start server
try {
    $server->expose(new Socket\InternetAddress('0.0.0.0', 8080));
    //$server->expose(new Socket\InternetAddress('[::]', 8080));
    $server->start($router, $errorHandler);

    if (defined('SIGINT') && defined('SIGTERM')) {
        // Await SIGINT or SIGTERM to be received.
        $signal = trapSignal([SIGINT, SIGTERM]);
        $logger->info(\sprintf('Received signal %d, stopping HTTP server', $signal));
        $server->stop();
    } else {
        EventLoop::run();
    }
} catch (Throwable $e) {
    $logger->error($e->getMessage());
}

Start example server (php)

php server.php

Start example server (docker)

cp .env.example .env
# set HOST_MACHINE_IP env for ws origin handshake
docker-compose up

License

The MIT License (MIT). Please see LICENSE for more information.