ensi/laravel-phprdkafka-consumer

Opiniated High Level consumer for laravel-phprdkafka

2.0.3 2024-12-02 06:59 UTC

This package is auto-updated.

Last update: 2025-01-02 07:10:15 UTC


README

Latest Version on Packagist Tests Total Downloads

Opiniated High Level consumer for ensi/laravel-phprdkafka

Installation

Firstly, you have to install and configure ensi/laravel-phprdkafka

Then,

composer require ensi/laravel-phprdkafka-consumer

Publish the config file with:

php artisan vendor:publish --provider="Ensi\LaravelPhpRdKafkaConsumer\LaravelPhpRdKafkaConsumerServiceProvider" --tag="kafka-consumer-config"

Now go to config/kafka-consumer.php and add processors there.

Version Compatibility

Basic Usage

The package provides php artisan kafka:consume {topic} {consumer=default} {--max-events=0} {--max-time=0} {--once} command that executes the first processor that matches given topic and consumer name. Consumer name is taken from ensi/laravel-phprdkafka config file.

Processors in config have the following configuration options:

[
   /*
   | Optional, defaults to `null`.
   | Here you may specify which topic should be handled by this processor.
   | Processor handles all topics by default.
   */
   'topic' => 'stage.crm.fact.registrations.1',

   /*
   | Optional, defaults to `null`.
   | Here you may specify which ensi/laravel-phprdkafka consumer should be handled by this processor.
   | Processor handles all consumers by default.
   */
   'consumer' => 'default',

   /*
   | Optional, defaults to `action`.
   | Here you may specify processor's type. Defaults to `action`
   | Supported types:
   |  - `action` - a simple class with execute method;
   |  - `job` - Laravel Queue Job. It will be dispatched using `dispatch` or `dispatchSync` method;
   */
   'type' => 'action',

   /*
   | Required.
   | Fully qualified class name of a processor class.
   */
   'class' => \App\Domain\Communication\Actions\SendConfirmationEmailAction::class,
   
   /*
   | Optional, defaults to `false`.
   | Proxy messages to Laravel's queue.
   | Supported values:
   |  - `false` - do not stream message. Execute processor in syncronous mode;
   |  - `true` - stream message to Laravel's default queue;
   |  - `<your-favorite-queue-name-as-string>` - stream message to this queue;
   */
   'queue' => false,

   /*
   | Optional, defaults to 5000.
   | Kafka consume timeout in milliseconds .
   */
   'consume_timeout' => 5000,
]

Important! Some topics have to have different consumer settings, such as start reading topic from the beginning or don't create topic if it is not exists yet.
For such cases you need to configure several consumers and use suitable one.

Synchronous processors

Most of the time all tou need is a synchronous processor. A simple example of such processor:

use RdKafka\Message;

class SendConfirmationEmailAction
{
   public function execute(Message $message): void
   {
      // var_dump($message->payload);
   }
}

Queueable processors

If you want to stream message to Laravel's own queue you can use spatie/laravel-queueable-action

If for some reason you don't want to rely on that package you can swich to Laravel Jobs

In both cases you also need to specify 'queue' => true or 'queue' => 'my-favorite-queue' in the package's config for a given processor.

Processor using Laravel Job example:

use RdKafka\Message;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;

class ConsumeMessageJob implements ShouldQueue
{
   use Dispatchable, InteractsWithQueue, Queueable;

   public function __construct(protected Message $message)
   {
   }

   public function handle(): void
   {
      // var_dump($this->message->payload);
   }
}

Handling signals

php artisan kafka:consume ... command can be configured to gracefully stop after receiving some OS signals.
Such signals can be set in the stop_signals key of the package config, e.g 'stop_signals' => [SIGINT, SIGQUIT].
You can use any of the constants defined by the pcntl extension https://www.php.net/manual/en/pcntl.constants.php

Contributing

Please see CONTRIBUTING for details.

Consumer faking

Testing tools have been added to test the developed handlers. You can create a fake Consumer and call the topic listening command:

use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand;
use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker;
use RdKafka\Message;

ConsumerFaker::new(['test-model'])
    ->addMessage(new Message())
    ->addMessage(new Message())
    ->consume();

Testing

Testing

  1. composer install
  2. composer test

Security Vulnerabilities

Please review our security policy on how to report security vulnerabilities.

License

The MIT License (MIT). Please see License File for more information.