opennebel / laravel-kafka
A Laravel wrapper around php-rdkafka to produce Kafka messages easily.
Fund package maintenance!
OpenNebel
Installs: 237
Dependents: 0
Suggesters: 0
Security: 0
Stars: 3
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/opennebel/laravel-kafka
Requires
- php: ^8.0
- ext-rdkafka: *
- illuminate/support: ^9.0|^10.0|^11.0|^12.0
Requires (Dev)
- kwn/php-rdkafka-stubs: ^2.0
- orchestra/testbench: ^8.0
- phpunit/phpunit: ^10.0
Suggests
- kwn/php-rdkafka-stubs: For IDE autocompletion and static analysis
This package is auto-updated.
Last update: 2025-09-30 16:02:40 UTC
README
LaravelKafka is a simple yet powerful library for producing messages to Apache Kafka from your Laravel applications. It builds upon php-rdkafka and adheres to Laravel conventions, including Service Providers, Facades, Queueable Jobs, Artisan commands, dependency injection, and configuration.
๐ง Maintained by OpenNebel
๐งฉ Features
- โ Synchronous Kafka message sending
- ๐ Asynchronous support via Laravel's queue system
- ๐งฑ Modular structure (producers, config, jobs, DLQ)
- ๐ง Support for headers, keys, partitions, msgflags
- ๐ Error handling (Dead Letter Queue with full Kafka options)
- ๐งช Integrated Artisan commands (
status
,retry-failed
) - ๐ฆ Compatible with Laravel Horizon, Telescope, Docker, CI/CD
๐ Installation
1. Install the PHP Kafka extension
pecl install rdkafka
๐ Ensure
php.ini
loads the extension:extension=rdkafka
2. Install the library
composer require opennebel/laravel-kafka
โ๏ธ Configuration
Method 1 โ Full Publication (config + migration)
php artisan vendor:publish --tag=laravel-kafka
Method 2 โ Separate Publication
# Only the config php artisan vendor:publish --tag=laravel-kafka-config # Only the DLQ migration php artisan vendor:publish --tag=laravel-kafka-migrations
.env
File
KAFKA_BROKERS=localhost:9092 KAFKA_DEFAULT_TOPIC=notification-events # For asynchronous sending via Laravel Queue KAFKA_ASYNC_ENABLED=true KAFKA_ASYNC_QUEUE=default
โ๏ธ Sending Messages
๐น Synchronous Sending
use OpenNebel\LaravelKafka\Facades\Kafka; // Raw message with options Kafka::produce('notification-events', 'Hello Kafka', [ 'key' => 'user:123', 'headers' => ['x-app' => 'mondialgp'], 'partition' => 0, 'flag' => 0 ]); // JSON message Kafka::produceJson('notification-events', [ 'type' => 'email', 'to' => 'user@example.com', 'subject' => 'Welcome', 'variables' => ['name' => 'John'] ]);
๐ธ Asynchronous Sending (Laravel Queue)
Kafka::produceAsync('notification-events', [ 'type' => 'sms', 'to' => '+33600000000', 'message' => 'Your code is 1234' ], [ 'key' => 'job:456', 'headers' => ['x-job' => 'welcome'] ]); Kafka::produceAsyncToDefault([ 'type' => 'sms', 'to' => '+33600000000', 'message' => 'Your code is 1234' ], [ 'key' => 'default-key' ]);
Start the worker to process jobs:
php artisan queue:work
๐งฉ Usage with Dependency Injection
use OpenNebel\LaravelKafka\KafkaService; public function handle(KafkaService $kafka) { $kafka->produceToDefault('message via DI', [ 'headers' => ['x-di' => 'used'] ]); }
๐ฅ Error Handling (DLQ)
If flush()
fails or the broker is unreachable, the message is:
- recorded in the
kafka_failed_messages
table - all Kafka
options
are stored as JSON - accessible via Artisan command
- manually re-attemptable
Migration:
php artisan migrate
Retry Failed Messages:
php artisan kafka:retry-failed
๐ Artisan Commands
Command | Description |
---|---|
kafka:status |
Checks Kafka broker connectivity and lists metadata |
kafka:retry-failed |
Retries messages in the DLQ |
These commands are auto-registered via KafkaServiceProvider
.
๐งฐ Service API
Method | Description |
---|---|
produce() |
Raw string to topic with optional headers/key/partition/msgflags |
produceJson() |
JSON payload to topic |
produceToDefault() |
Raw string to default topic |
produceJsonToDefault() |
JSON payload to default topic |
produceAsync() |
Dispatch async job to a topic |
produceAsyncToDefault() |
Dispatch async job to default topic |
flush($timeoutMs) |
Flush local queue to Kafka broker |
getQueueLength() |
Messages in local Kafka buffer |
ping() |
Kafka connection test |
getMetadata() |
Cluster info: topics, partitions, brokers |
isConnected() |
Indicates producer was created correctly |
๐ Configuration (config/kafka.php
)
return [ 'brokers' => env('KAFKA_BROKERS', 'localhost:9092'), 'default_topic' => env('KAFKA_DEFAULT_TOPIC', 'notification-events'), 'async' => [ 'enabled' => env('KAFKA_ASYNC_ENABLED', true), 'queue' => env('KAFKA_ASYNC_QUEUE', 'default'), ], 'options' => [ // Kafka global options (passed to php-rdkafka Producer config) // e.g. 'compression.codec' => 'snappy' ], ];
โ Prerequisites
- PHP >= 8.0
- Laravel >= 9.x
ext-rdkafka
PHP extension- Kafka broker (local, Docker, or cloud)
๐ง Recommendations
- Use Laravel Horizon to manage async jobs
- Monitor failures with Telescope or Sentry
- Track Kafka logs via
storage/logs/laravel.log
๐ License
MIT ยฉ OpenNebel