lexdss / nats
nats jetstream client for php
Requires
- php: >=8.0
Requires (Dev)
- ext-sodium: *
- friendsofphp/php-cs-fixer: ^3.9
- monolog/monolog: ^2.3.5
- phan/phan: ^5.3
- phpunit/phpunit: ^9.5
Suggests
- ext-sodium: Provides Ed25519 for nkey authentication
- paragonie/sodium_compat: Provides Ed25519 for nkey authentication if sodium is not available
This package is auto-updated.
Last update: 2025-03-24 19:39:19 UTC
README
Feel free to contribute or give any feedback.
- Installation
- Connection
- Publish Subscribe
- Request Response
- JetStream Api Usage
- Key Value Storage
- Using NKeys with JWT
- Performance
- Configuration Options
Installation
The recommended way to install the library is through Composer:
$ composer require basis-company/nats
The NKeys functionality requires Ed25519, which is provided in libsodium
extension or sodium_compat
package.
Connection
use Basis\Nats\Client; use Basis\Nats\Configuration; // this is default options, you can override anyone $configuration = new Configuration([ 'host' => 'localhost', 'jwt' => null, 'lang' => 'php', 'pass' => null, 'pedantic' => false, 'port' => 4222, 'reconnect' => true, 'timeout' => 1, 'token' => null, 'user' => null, 'nkey' => null, 'verbose' => false, 'version' => 'dev', ]); // default delay mode is constant - first retry be in 1ms, second in 1ms, third in 1ms $configuration->setDelay(0.001); // linear delay mode - first retry be in 1ms, second in 2ms, third in 3ms, fourth in 4ms, etc... $configuration->setDelay(0.001, Configuration::DELAY_LINEAR); // exponential delay mode - first retry be in 10ms, second in 100ms, third in 1s, fourth if 10 seconds, etc... $configuration->setDelay(0.01, Configuration::DELAY_EXPONENTIAL); $client = new Client($configuration); $client->ping(); // true
Connecting to a cluster with TLS enabled
Typically, when connecting to a cluster with TLS enabled the connection settings do not change. The client lib will automatically switch over to TLS 1.2. However, if you're using a self-signed certificate you may have to point to your local CA file using the tlsCaFile setting.
When connecting to a nats cluster that requires the client to provide TLS certificates use the tlsCertFile and tlsKeyFile to point at your local TLS certificate and private key file.
Nats Server documentation for:
Connection settings when connecting to a nats server that has TLS and TLS Client verify enabled.
use Basis\Nats\Client; use Basis\Nats\Configuration; // this is default options, you can override anyone $configuration = new Configuration([ 'host' => 'localhost', 'jwt' => null, 'lang' => 'php', 'pass' => null, 'pedantic' => false, 'port' => 4222, 'reconnect' => true, 'timeout' => 1, 'token' => null, 'user' => null, 'nkey' => null, 'verbose' => false, 'version' => 'dev', 'tlsCertFile' => "./certs/client-cert.pem", 'tlsKeyFile' => "./certs/client-key.pem", 'tlsCaFile' => "./certs/client-key.pem", ]); $configuration->setDelay(0.001); $client = new Client($configuration); $client->ping(); // true
Publish Subscribe
$client->subscribe('hello', function ($message) { var_dump('got message', $message); // tester }); $client->publish('hello', 'tester'); $client->process(); // if you want to append some headers, construct payload manually use Basis\Nats\Message\Payload; $payload = new Payload('tester', [ 'Nats-Msg-Id' => 'payload-example' ]); $client->publish('hello', $payload);
Request Response
There is a simple wrapper over publish and feedback processing, so payload can be constructed manually same way.
$client->subscribe('hello.request', function ($name) { return "Hello, " . $name; }); // async interaction $client->request('hello.request', 'Nekufa1', function ($response) { var_dump($response); // Hello, Nekufa1 }); $client->process(); // process request // sync interaction (block until response get back) $client->dispatch('hello.request', 'Nekufa2'); // Hello, Nekufa2
JetStream Api Usage
use Basis\Nats\Stream\RetentionPolicy; use Basis\Nats\Stream\StorageBackend; $accountInfo = $client->getApi()->getInfo(); // account_info_response object $stream = $client->getApi()->getStream('mailer'); $stream->getConfiguration() ->setRetentionPolicy(RetentionPolicy::WORK_QUEUE) ->setStorageBackend(StorageBackend::MEMORY) ->setSubjects(['mailer.greet', 'mailer.bye']); // stream is created with given configuration $stream->create(); // and put some tasks so workers would be doing something $stream->put('mailer.greet', 'nekufa@gmail.com'); $stream->put('mailer.bye', 'nekufa@gmail.com'); var_dump($stream->info()); // can stream info // this should be set in your worker $greeter = $stream->getConsumer('greeter'); $greeter->getConfiguration()->setSubjectFilter('mailer.greet'); // consumer would be created would on first handle call $greeter->handle(function ($address) { mail($address, "Hi there!"); }); var_dump($greater->info()); // can consumer info $goodbyer = $stream->getConsumer('goodbyer'); $goodbyer->getConfiguration()->setSubjectFilter('mailer.bye'); $goodbyer->create(); // create consumer if you don't want to handle anything right now $goodbyer->handle(function ($address) { mail($address, "See you later"); }); // you can configure batching and iteration count using chain api $goodbyer ->setBatching(2) // how many messages would be requested from nats stream ->setIterations(3) // how many times message request should be sent ->handle(function () { // if you need to break on next iteration simply call interrupt method // batch will be processed to the end and the handling would be stopped // $goodbyer->interrupt(); }); // if you need to append some headers, construct payload manually use Basis\Nats\Message\Payload; $payload = new Payload('nekufa@gmail.com', [ 'Nats-Msg-Id' => 'single-send' ]); $stream->put('mailer.bye', $payload);
Key Value Storage
$bucket = $client->getApi()->getBucket('bucket_name'); // basics $bucket->put('username', 'nekufa'); echo $bucket->get('username'); // nekufa // safe update (given revision) $entry = $bucket->getEntry('username'); echo $entry->value; // nekufa $bucket->update('username', 'bazyaba', $entry->revision); // delete value $bucket->delete('username'); // purge value history $bucket->purge('username'); // get bucket stats var_dump($bucket->getStatus());
Using NKeys with JWT
To use NKeys with JWT, simply provide them in the Configuration
options as jwt
and nkey
.
You can also provide a credentials file with CredentialsParser
use Basis\Nats\Client; use Basis\Nats\Configuration; use Basis\Nats\NKeys\CredentialsParser; $configuration = new Configuration( [ 'host' => 'localhost', 'port' => 4222 ], CredentialsParser::fromFile($credentialPath) ); $client = new Client($configuration);
Performance
Testing on i5-4670k with nats running in docker gives 420k rps for publish and 350k rps for receive in non-verbose mode.
You can run tests on your environment.
% wget https://getcomposer.org/download/latest-stable/composer.phar ... Saving to: ‘composer.phar’ % ./composer.phar install Installing dependencies from lock file (including require-dev) ... % export NATS_HOST=0.0.0.0 % export NATS_PORT=4222 % export NATS_CLIENT_LOG=1 % composer run perf-test PHPUnit 9.5.10 by Sebastian Bergmann and contributors. Runtime: PHP 8.1.1 Configuration: /home/nekufa/software/github/nats.php/phpunit.xml.dist Warning: No code coverage driver available [2022-01-19T10:42:14.008230+00:00] SubjectTest.testPerformance.INFO: start performance test [] [] [2022-01-19T10:42:14.246606+00:00] SubjectTest.testPerformance.INFO: publishing {"rps":421871.0,"length":100000,"time":0.23703885078430176} [] [2022-01-19T10:42:14.530670+00:00] SubjectTest.testPerformance.INFO: processing {"rps":355120.0,"length":100000,"time":0.2839939594268799} [] % export NATS_CLIENT_VERBOSE=1 % composer run perf-test PHPUnit 9.5.10 by Sebastian Bergmann and contributors. Runtime: PHP 8.1.1 Configuration: /home/nekufa/software/github/nats.php/phpunit.xml.dist Warning: No code coverage driver available [2022-01-19T10:42:21.319838+00:00] SubjectTest.testPerformance.INFO: start performance test [] [] [2022-01-19T10:42:21.766501+00:00] SubjectTest.testPerformance.INFO: publishing {"rps":224640.0,"length":100000,"time":0.4451560974121094} [] [2022-01-19T10:42:21.922010+00:00] SubjectTest.testPerformance.INFO: processing {"rps":353317.0,"length":100000,"time":0.15544414520263672} [] . 1 / 1 (100%) nekufa@fasiga ~ % cat /proc/cpuinfo | grep i5 model name : Intel(R) Core(TM) i5-4670K CPU @ 3.40GHz
Configuration Options
The following is the list of configuration options and default values.
Option | Default | Description |
---|---|---|
inboxPrefix |
"_INBOX" |
Sets de prefix for automatically created inboxes |
jwt |
Token for JWT Authentication. Alternatively you can use CredentialsParser | |
nkey |
Ed25519 based public key signature used for NKEY Authentication. | |
pass |
Sets the password for a connection. | |
pedantic |
false |
Turns on strict subject format checks. |
pingInterval |
2 |
Number of seconds between client-sent pings. |
port |
4222 |
Port to connect to (only used if servers is not specified). |
timeout |
1 | Number of seconds the client will wait for a connection to be established. |
token |
Sets a authorization token for a connection. | |
tlsKeyFile |
TLS 1.2 Client key file path. | |
tlsCertFile |
TLS 1.2 Client certificate file path. | |
tlsCaFile |
TLS 1.2 CA certificate filepath. | |
user |
Sets the username for a connection. | |
verbose |
false |
Turns on +OK protocol acknowledgements. |