cuongdinhngo / lara-kafka
Laravel Kafka
1.1
2022-03-21 15:30 UTC
Requires
- php: ^7.4
This package is auto-updated.
Last update: 2025-03-29 01:03:33 UTC
README
Laravel Kafka
1 - Install cuongdinhngo/lara-kafka
using Composer.
$ composer require cuongdinhngo/lara-kafka
2 - Add the following service provider in config/app.php
/* * Package Service Providers... */ LaraAssistant\LaraKafka\LaraKafkaServiceProvider::class,
This command copies Libraries/Kafka folder into app that contains KafkaConsumer & KafkaProducer classes
3 - Create a topic
You must start Zookeeper and Kafka service, please read more detail about how to install kafka
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
4 - Create a message
use App\Libraries\Kafka\KafkaProducer; use App\Libraries\Kafka\KafkaConsumer; . . . public function testKafka() { dump(date('Y/m/d h:i:s', time())); $kafkProducer = new KafkaProducer("TutorialTopic"); $kafkProducer->setBrokers("localhost:9092"); $kafkProducer->init(); $kafkProducer->addPayload("HELLO ... ".date('Y/m/d h:i:s', time())); dd(__METHOD__); }
Please member that message content must be string
5 - Consume a message
public function consumeKafka() { dump('start ...'); dump(date('Y/m/d h:i:s', time())); $consumer = new KafkaConsumer(); $consumer->init( ['TutorialTopic'], 'myConsumerGroup', 'localhost:9092', [ 'group.id' => 'myConsumerGroup', 'auto.offset.reset' => 'earliest' ] ); echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n"; $consumer->consume(); }
You can modify Libraries/Kafka/KafkaConsumer class to handle message and error
protected function handleMessage($message) { dump($message); } protected function handleErrors($message) { switch ($message->err) { case RD_KAFKA_RESP_ERR__PARTITION_EOF: dump("No more messages; will wait for more"); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: dump('Timed out: '.date('Y/m/d h:i:s', time())); break; default: throw new \Exception($message->errstr(), $message->err); break; } }