opennebel/laravel-kafka

A Laravel wrapper around php-rdkafka to produce Kafka messages easily.

Fund package maintenance!
OpenNebel

Installs: 237

Dependents: 0

Suggesters: 0

Security: 0

Stars: 3

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/opennebel/laravel-kafka

v1.0.5 2025-07-30 14:35 UTC

This package is auto-updated.

Last update: 2025-09-30 16:02:40 UTC


README

LaravelKafka is a simple yet powerful library for producing messages to Apache Kafka from your Laravel applications. It builds upon php-rdkafka and adheres to Laravel conventions, including Service Providers, Facades, Queueable Jobs, Artisan commands, dependency injection, and configuration.

๐Ÿ”ง Maintained by OpenNebel

๐Ÿงฉ Features

  • โœ… Synchronous Kafka message sending
  • ๐Ÿ” Asynchronous support via Laravel's queue system
  • ๐Ÿงฑ Modular structure (producers, config, jobs, DLQ)
  • ๐Ÿง  Support for headers, keys, partitions, msgflags
  • ๐Ÿ›  Error handling (Dead Letter Queue with full Kafka options)
  • ๐Ÿงช Integrated Artisan commands (status, retry-failed)
  • ๐Ÿ“ฆ Compatible with Laravel Horizon, Telescope, Docker, CI/CD

๐Ÿš€ Installation

1. Install the PHP Kafka extension

pecl install rdkafka

๐Ÿ“Œ Ensure php.ini loads the extension: extension=rdkafka

2. Install the library

composer require opennebel/laravel-kafka

โš™๏ธ Configuration

Method 1 โ€“ Full Publication (config + migration)

php artisan vendor:publish --tag=laravel-kafka

Method 2 โ€“ Separate Publication

# Only the config
php artisan vendor:publish --tag=laravel-kafka-config

# Only the DLQ migration
php artisan vendor:publish --tag=laravel-kafka-migrations

.env File

KAFKA_BROKERS=localhost:9092
KAFKA_DEFAULT_TOPIC=notification-events

# For asynchronous sending via Laravel Queue
KAFKA_ASYNC_ENABLED=true
KAFKA_ASYNC_QUEUE=default

โœ‰๏ธ Sending Messages

๐Ÿ”น Synchronous Sending

use OpenNebel\LaravelKafka\Facades\Kafka;

// Raw message with options
Kafka::produce('notification-events', 'Hello Kafka', [
    'key' => 'user:123',
    'headers' => ['x-app' => 'mondialgp'],
    'partition' => 0,
    'flag' => 0
]);

// JSON message
Kafka::produceJson('notification-events', [
    'type' => 'email',
    'to' => 'user@example.com',
    'subject' => 'Welcome',
    'variables' => ['name' => 'John']
]);

๐Ÿ”ธ Asynchronous Sending (Laravel Queue)

Kafka::produceAsync('notification-events', [
    'type' => 'sms',
    'to' => '+33600000000',
    'message' => 'Your code is 1234'
], [
    'key' => 'job:456',
    'headers' => ['x-job' => 'welcome']
]);

Kafka::produceAsyncToDefault([
    'type' => 'sms',
    'to' => '+33600000000',
    'message' => 'Your code is 1234'
], [
    'key' => 'default-key'
]);

Start the worker to process jobs:

php artisan queue:work

๐Ÿงฉ Usage with Dependency Injection

use OpenNebel\LaravelKafka\KafkaService;

public function handle(KafkaService $kafka)
{
    $kafka->produceToDefault('message via DI', [
        'headers' => ['x-di' => 'used']
    ]);
}

๐Ÿ’ฅ Error Handling (DLQ)

If flush() fails or the broker is unreachable, the message is:

  • recorded in the kafka_failed_messages table
  • all Kafka options are stored as JSON
  • accessible via Artisan command
  • manually re-attemptable

Migration:

php artisan migrate

Retry Failed Messages:

php artisan kafka:retry-failed

๐Ÿ›  Artisan Commands

Command Description
kafka:status Checks Kafka broker connectivity and lists metadata
kafka:retry-failed Retries messages in the DLQ

These commands are auto-registered via KafkaServiceProvider.

๐Ÿงฐ Service API

Method Description
produce() Raw string to topic with optional headers/key/partition/msgflags
produceJson() JSON payload to topic
produceToDefault() Raw string to default topic
produceJsonToDefault() JSON payload to default topic
produceAsync() Dispatch async job to a topic
produceAsyncToDefault() Dispatch async job to default topic
flush($timeoutMs) Flush local queue to Kafka broker
getQueueLength() Messages in local Kafka buffer
ping() Kafka connection test
getMetadata() Cluster info: topics, partitions, brokers
isConnected() Indicates producer was created correctly

๐Ÿ“‚ Configuration (config/kafka.php)

return [
    'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
    'default_topic' => env('KAFKA_DEFAULT_TOPIC', 'notification-events'),

    'async' => [
        'enabled' => env('KAFKA_ASYNC_ENABLED', true),
        'queue' => env('KAFKA_ASYNC_QUEUE', 'default'),
    ],

    'options' => [
        // Kafka global options (passed to php-rdkafka Producer config)
        // e.g. 'compression.codec' => 'snappy'
    ],
];

โœ… Prerequisites

  • PHP >= 8.0
  • Laravel >= 9.x
  • ext-rdkafka PHP extension
  • Kafka broker (local, Docker, or cloud)

๐Ÿง  Recommendations

  • Use Laravel Horizon to manage async jobs
  • Monitor failures with Telescope or Sentry
  • Track Kafka logs via storage/logs/laravel.log

๐Ÿ“„ License

MIT ยฉ OpenNebel