RxPHP WAMP client

3.3.7 2022-03-06 03:37 UTC


This project is a WAMP v2 client written in PHP that uses RxPHP Observables instead of promises and event emitters.

If you don't know what WAMP is, you should read up on it.

If you don't know what RxPHP or ReactiveExtensions is, you're missing out...


composer require rx/thruway-client


use Rx\Observable;
use Rx\Thruway\Client;

require __DIR__ . '/vendor/autoload.php';

$wamp = new Client('ws://', 'realm1');


$wamp->call('add.rpc', [1, 2])
    ->map(function (Thruway\Message\ResultMessage $r) {
        return $r->getArguments()[0];
    ->subscribe(function ($r) {
        echo $r;


$wamp->register('add.rpc', function ($a, $b) { return $a + $b; })->subscribe();

If the Registration Handler throws an exception, thruway.error.invocation_exception is returned to the caller. If you would like to allow more specific error messages, you must throw a WampErrorException or, if using observable sequences that are returned from the RPC, you can onError a WampErrorException.

Publish to topic

$wamp->publish('example.topic', 'some value');
$wamp->publish('example.topic', Observable::interval(1000)); // you can also publish an observable

Subscribe to topic

    ->map(function(Thruway\Message\EventMessage $m) {
        return $m->getArguments()[0];
    ->subscribe(function ($v) { echo $v; });