daalvand / kafka
This package is for produce and consume from kafka
Installs: 1 917
Dependents: 2
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 0
Open Issues: 0
Requires
- php: ^7.4|^8.0
- ext-rdkafka: *
README
- This package is for kafka consume and produce in laravel
installation
Install Kafka
Apache Kafka is need for many sections of our ecosystem
-
install librdkafka from The Apache Kafka C/C++ client library
for the ubuntu:
apt install librdkafka-dev
.for the centos:
yum install librdkafka-devel
-
Then build php extension from Manually Installing the extension
git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make install
-
Then extension to
php.ini
extension=rdkafka.so
-
Then restart php-fpm service
service php-fpm restart
install package
- run
composer require daalvand/kafka
2 . publish provider:
Laravel
php artisan vendor:publish --provider="Daalvand\Kafka\KafkaServiceProvider"
Lumen
- Add the service provider to bootstrap/app.php file:
<?php $app->register(Daalvand\Kafka\KafkaServiceProvider::class);
- Copy the config file from
/vendor/daalvand/kafka/src/config
to/config
directory. Then configure it in/bootstrap/app.php
file:
<?php $app->configure("kafka");
Usage
Producer
<?php use Daalvand\Kafka\Message\ProducerMessage; use Daalvand\Kafka\Facades\Producer; $producer = Producer::withAdditionalBroker('localhost:9092') ->build(); $message = (new ProducerMessage('topic-name', 0)) ->withKey('test-key') ->withBody('some test message payload') ->withHeaders(['header' => 'value']); $producer->produce($message); $producer->flush(-1);
Consumer
<?php use Daalvand\Kafka\Facades\Consumer; use Daalvand\Kafka\Exceptions\ConsumerConsumeException; use Daalvand\Kafka\Exceptions\ConsumerEndOfPartitionException; use Daalvand\Kafka\Exceptions\ConsumerTimeoutException; $consumer = Consumer::withAdditionalConfig([ 'compression.codec' => 'lz4', 'auto.commit.interval.ms' => 500 ]) ->withAdditionalBroker('kafka:9092') ->withConsumerGroup('testGroup') ->withAdditionalSubscription('test-topic') ->build(); $consumer->subscribe(); while (true) { try { $message = $consumer->consume(); // your business logic $consumer->commit($message); } catch (ConsumerTimeoutException $e) { //no messages were read in a given time } catch (ConsumerEndOfPartitionException $e) { //only occurs if enable.partition.eof is true (default: false) } catch (ConsumerConsumeException $e) { // Failed } }
auto.offset.reset
option is (largest, smallest) valid values for kafka by older version than 0.9 and for after .9 is (earliest, latest)