nirmalsharma/laravel-kafka-consumer

0.0.6 2022-12-23 09:50 UTC

This package is auto-updated.

Last update: 2024-09-23 13:21:33 UTC


README

Install Kafka Consumer Warpper

  composer require nirmalsharma/laravel-kafka-consumer

Examples

Laravel 6

Use Kafka in code.

Sample code for console

namespace App\Console\Commands;

use App\Handlers\TestHandler;
use Illuminate\Console\Command;
use KafkaConsumer;

class TestTopicConsumer extends Command
{
    protected $signature = 'kafka-consume {--partition=} {--consumer-group=} {--topic=}';

    protected $description = 'Kafka consumer!!';

    public function handle(): void
    {
      KafkaConsumer::createConsumer(new TestHandler);
    }

    public function setKafkaConfig(){
        $partition = $this->option('partition');
        if( $partition != null){
            config([
                "kafka.partition" => $partition
            ]);
        }

        $consumer_group_id = $this->option('consumer-group');
        if( !empty($consumer_group_id)){
            config([
                "kafka.consumer_group_id" => $consumer_group_id
            ]);
        }

        $topic = $this->option('topic');
        if( !empty($topic)){
            config([
                "kafka.topic" => $topic
            ]);
        }
        
    }
}

TestHandler.php
-----------------

namespace App\Handlers;

use Illuminate\Support\Facades\Log;

class TestHandler
{
  public function __invoke( $message)
  {   
    print_r([$message]);
    Log::debug('Message received!', [
        $message
    ]);
  }
}

To start listening messages by run following command:

  php artisan kafka-consume {--partition=} {--consumer-group=} {--topic=}

You can check handler log message in "storage/logs/laravel.log" file or run the following command in your terminal:

  tail -f storage/logs/laravel.log

Environment Variables

To run this, you will need to add the following environment variables to your .env file Config reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

IS_KAFKA_ENABLED=             // Default:  1
KAFKA_BROKERS=
KAFKA_DEBUG=                  // Default: false
KAFKA_SSL_PROTOCOL=           // Default: plaintext or ssl for consumer
KAFKA_COMPRESSION_TYPE=       // Default: none
KAFKA_IDEMPOTENCE=            // Default: false
KAFKA_CONSUMER_GROUP_ID=      // Default: group
KAFKA_OFFSET_RESET=           // Default: latest
KAFKA_AUTO_COMMIT=            // Default: true
KAFKA_ERROR_SLEEP=            // Default: 5
KAFKA_PARTITION=              // Default: 0
KAFKA_TOPIC=                  // 

Authors

License

MIT

Features

  • Light weight kakfa wrapper
  • Easy to use event produce in code.