danikdantist / queue-wrapper
kafka wrapper
Installs: 4 990
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 2
Forks: 3
Open Issues: 0
Requires
- php: >=8.0
README
Install
composer require danikdantist/queue-wrapper
If you want to use Kafka, you need kafka lib https://github.com/edenhill/librdkafka and rdkafka extension for php https://github.com/arnaud-lb/php-rdkafka
You can use dockerfile to create docker image where librdkafka and PHP kafka extension already installed: https://github.com/danikdantist/Dockerfiles/blob/master/php/php5-fpm-nginx-kafka/Dockerfile
Consumer usage:
<?php class Receiver implements DanikDantist\QueueWrapper\Interfaces\iReceivable { public function receiveMessage(DanikDantist\QueueWrapper\Message $message) { echo $message->toString()."\n"; } } class EchoLogger implements DanikDantist\QueueWrapper\Interfaces\iLogable { public function info($info) { echo 'Info: '.$info."\n"; } public function error($error) { echo 'Error: '.$error."\n"; } } $config = new DanikDantist\QueueWrapper\Drivers\Kafka\Config(); $config ->setGroup('my_group') ->addBroker('172.17.0.31:9092') ->addTopic('my-test-topic') ->addTopic('my-test-topic-2') ; $demon = new DanikDantist\QueueWrapper\Manager(new DanikDantist\QueueWrapper\Drivers\Kafka\Connector($config, new EchoLogger())); $demon->addReceiver(new Receiver()); $demon->listenMessage();
Producer usage:
<?php class EchoLogger implements DanikDantist\QueueWrapper\Interfaces\iLogable { public function info($info) { echo 'Info: '.$info."\n"; } public function error($error) { echo 'Error: '.$error."\n"; } } $config = new DanikDantist\QueueWrapper\Drivers\Kafka\Config(); $config->addBroker('172.17.0.31:9092'); $demon = new DanikDantist\QueueWrapper\Manager(new DanikDantist\QueueWrapper\Drivers\Kafka\Connector($config, new EchoLogger())); $demon->sendMessage(new DanikDantist\QueueWrapper\Message('My test message', 'my-test-topic'));