ensi / laravel-phprdkafka-consumer
Opiniated High Level consumer for laravel-phprdkafka
Installs: 42 498
Dependents: 0
Suggesters: 0
Security: 0
Stars: 6
Watchers: 1
Forks: 2
Open Issues: 0
Requires
- php: ^8.1
- ext-rdkafka: *
- ensi/laravel-phprdkafka: ^0.4.0
- laravel/framework: ^10.0 || ^11.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.2
- nunomaduro/collision: ^6.0 || ^7.0 || ^8.1
- orchestra/testbench: ^7.0 || ^8.0 || ^9.0
- pestphp/pest: ^1.22 || ^2.0
- pestphp/pest-plugin-laravel: ^1.1 || ^2.0
- phpstan/extension-installer: ^1.3
- phpstan/phpstan: ^1.11
- spaze/phpstan-disallowed-calls: ^2.15
README
Opiniated High Level consumer for ensi/laravel-phprdkafka
Installation
Firstly, you have to install and configure ensi/laravel-phprdkafka
Then,
composer require ensi/laravel-phprdkafka-consumer
Publish the config file with:
php artisan vendor:publish --provider="Ensi\LaravelPhpRdKafkaConsumer\LaravelPhpRdKafkaConsumerServiceProvider" --tag="kafka-consumer-config"
Now go to config/kafka-consumer.php
and add processors there.
Version Compatibility
Basic Usage
The package provides php artisan kafka:consume {topic} {consumer=default} {--max-events=0} {--max-time=0} {--once}
command that executes the first processor that matches given topic and consumer name. Consumer name is taken from ensi/laravel-phprdkafka config
file.
Processors in config have the following configuration options:
[ /* | Optional, defaults to `null`. | Here you may specify which topic should be handled by this processor. | Processor handles all topics by default. */ 'topic' => 'stage.crm.fact.registrations.1', /* | Optional, defaults to `null`. | Here you may specify which ensi/laravel-phprdkafka consumer should be handled by this processor. | Processor handles all consumers by default. */ 'consumer' => 'default', /* | Optional, defaults to `action`. | Here you may specify processor's type. Defaults to `action` | Supported types: | - `action` - a simple class with execute method; | - `job` - Laravel Queue Job. It will be dispatched using `dispatch` or `dispatchSync` method; */ 'type' => 'action', /* | Required. | Fully qualified class name of a processor class. */ 'class' => \App\Domain\Communication\Actions\SendConfirmationEmailAction::class, /* | Optional, defaults to `false`. | Proxy messages to Laravel's queue. | Supported values: | - `false` - do not stream message. Execute processor in syncronous mode; | - `true` - stream message to Laravel's default queue; | - `<your-favorite-queue-name-as-string>` - stream message to this queue; */ 'queue' => false, /* | Optional, defaults to 5000. | Kafka consume timeout in milliseconds . */ 'consume_timeout' => 5000, ]
Important! Some topics have to have different consumer settings, such as start reading topic from the beginning or don't create topic if it is not exists yet.
For such cases you need to configure several consumers and use suitable one.
Synchronous processors
Most of the time all tou need is a synchronous processor. A simple example of such processor:
use RdKafka\Message; class SendConfirmationEmailAction { public function execute(Message $message): void { // var_dump($message->payload); } }
Queueable processors
If you want to stream message to Laravel's own queue you can use spatie/laravel-queueable-action
If for some reason you don't want to rely on that package you can swich to Laravel Jobs
In both cases you also need to specify 'queue' => true
or 'queue' => 'my-favorite-queue'
in the package's config for a given processor.
Processor using Laravel Job example:
use RdKafka\Message; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; class ConsumeMessageJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable; public function __construct(protected Message $message) { } public function handle(): void { // var_dump($this->message->payload); } }
Handling signals
php artisan kafka:consume ...
command can be configured to gracefully stop after receiving some OS signals.
Such signals can be set in the stop_signals
key of the package config, e.g 'stop_signals' => [SIGINT, SIGQUIT]
.
You can use any of the constants defined by the pcntl extension https://www.php.net/manual/en/pcntl.constants.php
Contributing
Please see CONTRIBUTING for details.
Consumer faking
Testing tools have been added to test the developed handlers. You can create a fake Consumer and call the topic listening command:
use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand; use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker; use RdKafka\Message; ConsumerFaker::new(['test-model']) ->addMessage(new Message()) ->addMessage(new Message()) ->consume();
Testing
Testing
- composer install
- composer test
Security Vulnerabilities
Please review our security policy on how to report security vulnerabilities.
License
The MIT License (MIT). Please see License File for more information.