kovey / pulsar
pulsar Of The Kovey Framework
v1.0.6
2022-07-29 10:52 UTC
Requires
- php: >=8.0
- ext-swoole: >=4.6.0
- kovey/logger: ^2.0
Requires (Dev)
- ext-swoole: >=4.5.5
- phpstan/phpstan: ^1.2
- phpunit/phpunit: ^9.3
README
Description
Library
Usage:
- composer require kovey/pulsar
Examples
use Kovey\Pulsar\Client\Producer; use Kovey\Pulsar\Client\Comsumer; use Kovey\Pulsar\Message\Publish; use Kovey\Pulsar\Message\Acknowledge; use function Swoole\Coroutine\run; use Swoole\Timer; use Swoole\Coroutine; run(function () { $producer = new Producer('ws://127.0.0.1:8080'); $producer->setTenant('tenant') ->setNamespace('namespace') ->setTopic('topic') ->create(); Timer::tick(5000, function (int $timerId, Producer $producer) { global $context; $context ++; $publish = new Publish(); $publish->setPayload('hello') ->setProperties(array('key' => 'value')) ->setContext($context); $producer->send($publish); $result = $producer->recv(); echo sprintf('response: %s', $result) . PHP_EOL; if ($result->isSuccess()) { echo 'send message success' . PHP_EOL; return; } echo sprintf('send message failure, error: %s', $result->getErrorMsg()) . PHP_EOL; }, $producer); go (fn () => comsume()); }); function comsume() : void { $comsumer = new Comsumer('ws://127.0.0.1:8080') $comsumer->setTenant('tenant') ->setNamespace('namespace') ->setTopic('topic') ->setSubscription('subscription') ->create(); while (true) { $receive = $comsumer->recv(); echo sprintf('receive data: %s', $receive) . PHP_EOL; if (!empty($receive->getMessageId())) { $ack = new Acknowledge(); $comsumer->send($ack->setMessageId($receive->getMessageId())); } Coroutine::sleep(0.01); } }