binsoul/net-mqtt-client-react

Asynchronous MQTT client built on React

Installs: 782 979

Dependents: 46

Suggesters: 0

Security: 0

Stars: 48

Watchers: 5

Forks: 24

Open Issues: 5

pkg:composer/binsoul/net-mqtt-client-react

0.7.3 2021-04-12 16:59 UTC

This package is auto-updated.

Last update: 2026-01-13 09:44:49 UTC


README

Latest Version on Packagist Software License Total Downloads Build Status

This package provides an asynchronous MQTT client built on the React socket library. All client methods return a promise which is fulfilled if the operation succeeded or rejected if the operation failed. Incoming messages of subscribed topics are delivered via the "message" event.

Install

Via composer:

$ composer require binsoul/net-mqtt-client-react

Example

Connect to a public broker and run forever.

<?php

declare(strict_types=1);

use BinSoul\Net\Mqtt\Client\React\ReactMqttClient;
use BinSoul\Net\Mqtt\Connection;
use BinSoul\Net\Mqtt\DefaultMessage;
use BinSoul\Net\Mqtt\DefaultSubscription;
use BinSoul\Net\Mqtt\Message;
use BinSoul\Net\Mqtt\Subscription;
use React\Socket\DnsConnector;
use React\Socket\TcpConnector;

include 'vendor/autoload.php';

// Setup client
$loop = \React\EventLoop\Loop::get();
$dnsResolverFactory = new \React\Dns\Resolver\Factory();
$connector = new DnsConnector(new TcpConnector($loop), $dnsResolverFactory->createCached('8.8.8.8', $loop));
$client = new ReactMqttClient($connector, $loop);

// Bind to events
$client->on(
    'open',
    function () use ($client) {
        // Network connection established
        echo sprintf("Open: %s:%d\n", $client->getHost(), $client->getPort());
    }
);

$client->on(
    'close',
    function () use ($client, $loop) {
        // Network connection closed
        echo sprintf("Close: %s:%d\n", $client->getHost(), $client->getPort());

        $loop->stop();
    }
);

$client->on(
    'connect',
    function (Connection $connection) {
        // Broker connected
        echo sprintf("Connect: client=%s\n", $connection->getClientID());
    }
);

$client->on(
    'disconnect',
    function (Connection $connection) {
        // Broker disconnected
        echo sprintf("Disconnect: client=%s\n", $connection->getClientID());
    }
);

$client->on(
    'message',
    function (Message $message) {
        // Incoming message
        echo 'Message';

        if ($message->isDuplicate()) {
            echo ' (duplicate)';
        }

        if ($message->isRetained()) {
            echo ' (retained)';
        }

        echo ': ' . $message->getTopic() . ' => ' . mb_strimwidth($message->getPayload(), 0, 50, '...');
        echo "\n";
    }
);

$client->on(
    'warning',
    function (Exception $e) {
        echo sprintf("Warning: %s\n", $e->getMessage());
    }
);

$client->on(
    'error',
    function (Exception $e) use ($loop) {
        echo sprintf("Error: %s\n", $e->getMessage());

        $loop->stop();
    }
);

// Connect to broker
$client->connect('broker.hivemq.com')->then(
    function () use ($client) {
        // Subscribe to all topics
        $client->subscribe(new DefaultSubscription('#'))
            ->then(
                function (Subscription $subscription) {
                    echo sprintf("Subscribe: %s\n", $subscription->getFilter());
                }
            )
            ->catch(
                function (Exception $e) {
                    echo sprintf("Error: %s\n", $e->getMessage());
                }
            );

        // Publish humidity once
        $client->publish(new DefaultMessage('sensors/humidity', '55%'))
            ->then(
                function (Message $message) {
                    echo sprintf("Publish: %s => %s\n", $message->getTopic(), $message->getPayload());
                }
            )
            ->catch(
                function (Exception $e) {
                    echo sprintf("Error: %s\n", $e->getMessage());
                }
            );

        // Publish a random temperature every 10 seconds
        $generator = function (): int {
            return random_int(-20, 30);
        };

        $onProgress = function (Message $message) {
            echo sprintf("Publish: %s => %s\n", $message->getTopic(), $message->getPayload());
        };

        $client->publishPeriodically(10, new DefaultMessage('sensors/temperature'), $generator, $onProgress)
            ->catch(
                function (Exception $e) {
                    echo sprintf("Error: %s\n", $e->getMessage());
                }
            );
    }
);

$loop->run();

Testing

$ composer test

License

The MIT License (MIT). Please see License File for more information.