sonicgd / php-nats-streaming
PHP Client for nats-streaming-server.
0.2.6
2018-11-09 08:05 UTC
Requires
- protobuf-php/protobuf: ^0.1.3
- repejota/nats: dev-develop
Requires (Dev)
- github.com/gogo/protobuf: dev-master
- phpunit/phpunit: 5.3.*
- protobuf-php/protobuf-plugin: ^0.1.2
- satooshi/php-coveralls: dev-master
- squizlabs/php_codesniffer: ~2.0
- symfony/console: ^2.8@dev
This package is auto-updated.
Last update: 2025-01-09 21:40:34 UTC
README
Build
Coverage
Intro
A php client for Nats Streaming Server.
Uses phpnats under the hood and closesly resembles it's api.
Requirements
- php 5.6+
- stan
Installation
Get composer:
curl -O http://getcomposer.org/composer.phar && chmod +x composer.phar
Add php-nats-streaming as a dependency to your project
php composer.phar require 'byrnedo/php-nats-streaming:^0.2.4'
Usage
Publish
$options = new \NatsStreaming\ConnectionOptions(); $options->setClientID("test"); $options->setClusterID("test-cluster"); $c = new \NatsStreaming\Connection($options); $c->connect(); // Publish $r = $c->publish('special.subject', 'some serialized payload...'); // optionally wait for the ack $gotAck = $r->wait(); if (!$gotAck) { ... } $c->close();
Note
If publishing many messages at a time, you might at first do this:
foreach ($req as $data){ $r = $c->publish(...); $gotAck = $r->wait(); if (!$gotAck) { ... } }
It's actually much faster to do the following:
$rs = []; foreach ($req as $data){ $rs[] = $c->publish(...); } foreach ($rs as $r){ $r->wait(); }
Subscribe
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setStartAt(\NatsStreamingProtos\StartPosition::First()); $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->unsubscribe(); // or $sub->close(); $c->close();
If you want to subscribe to multiple channels you can use $c->wait()
:
... $c->connect(); ... $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub2 = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $c->wait();
Queue Group Subscribe
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $sub = $c->queueSubscribe('specialer.subject', 'workgroup', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->close(); // or $sub->unsubscribe(); $c->close();
Manual Ack
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setManualAck(true); $sub = $c->subscribe('special.subject', function ($message) { $message->ack(); }, $subOptions); $sub->wait(1); $c->close();
License
MIT, see LICENSE