siberfx / apache-kafka
Apache Kafka Service Helper
Requires
- php: ^8.4
- laravel/framework: ^12.0 | ^13.0
README
A Laravel queue driver backed by Apache Kafka (tested against
Confluent Cloud). It lets you dispatch Laravel jobs to a Kafka
topic and process them with the standard queue:work worker.
Requirements
- PHP
^8.4 - Laravel
^12.0 | ^13.0 - The
rdkafkaPHP extension (built on top oflibrdkafka) - A reachable Kafka cluster (self-hosted or Confluent Cloud)
Required before starting
If you are using Confluent Cloud, create an account at https://www.confluent.io/, create a new project and provision an "Apache Kafka on Confluent Cloud" cluster, then create an API key/secret that will be used as the SASL username/password below.
curl -L --http1.1 https://cnfl.io/ccloud-cli | sh -s -- -b /usr/local/bin
Make sure the rdkafka extension is installed and enabled before installing the package:
Installation
1. Install the rdkafka PHP extension
# install librdkafka first (Debian/Ubuntu) sudo apt-get update && sudo apt-get install -y librdkafka-dev # then the PHP extension pecl install rdkafka # enable it (add to your php.ini) echo "extension=rdkafka.so" >> "$(php -i | grep -i 'Loaded Configuration File' | awk '{print $5}')"
A ready-to-use container is provided under
docker/(PHP 8.4 withlibrdkafkaandrdkafkaalready compiled). Run it withdocker compose -f docker/docker-compose.yaml up.
2. Require the package via Composer
composer require siberfx/apache-kafka
The service provider (Siberfx\Kafka\KafkaServiceProvider) is auto-discovered — no manual
registration required.
Configuration
1. Add a kafka connection to config/queue.php
The connector reads its settings from the queue connection config. Add the following entry under
connections:
'connections' => [ // ... 'kafka' => [ 'driver' => 'kafka', 'bootstrap_servers' => env('KAFKA_BROKERS', 'localhost:9092'), 'security_protocol' => env('KAFKA_SECURITY_PROTOCOL', 'SASL_SSL'), 'sasl_mechanisms' => env('KAFKA_SASL_MECHANISMS', 'PLAIN'), 'sasl_username' => env('KAFKA_SASL_USERNAME', ''), 'sasl_password' => env('KAFKA_SASL_PASSWORD', ''), 'group_id' => env('KAFKA_GROUP_ID', 'laravel'), ], ],
2. Set your .env
QUEUE_CONNECTION=kafka # default topic used when a job does not specify one KAFKA_QUEUE=default KAFKA_BROKERS=pkc-xxxxx.region.provider.confluent.cloud:9092 KAFKA_SECURITY_PROTOCOL=SASL_SSL KAFKA_SASL_MECHANISMS=PLAIN KAFKA_SASL_USERNAME=your-api-key KAFKA_SASL_PASSWORD=your-api-secret KAFKA_GROUP_ID=laravel-consumers
For a local, unauthenticated broker use
KAFKA_SECURITY_PROTOCOL=PLAINTEXTand leave the SASL values empty.
3. (Optional) Publish the package config
php artisan vendor:publish --provider="Siberfx\Kafka\KafkaServiceProvider" --tag=config
This publishes config/kafka-config.php, which you can use to hold your own Kafka-related
values. Note that the queue driver itself is configured from config/queue.php (step 1).
Usage
Define a job
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public function __construct(public int $podcastId) { } public function handle(): void { // ... do the work } }
Dispatch a job
use App\Jobs\ProcessPodcast; // dispatched to the default topic (KAFKA_QUEUE) ProcessPodcast::dispatch($podcast->id); // or to a specific topic ProcessPodcast::dispatch($podcast->id)->onQueue('podcasts');
Run the worker
The worker subscribes to the given Kafka topic and processes incoming messages:
# process the default topic php artisan queue:work kafka --queue=default # process a specific topic php artisan queue:work kafka --queue=podcasts
Pushing a raw job
You can also push directly through the Queue facade:
use Illuminate\Support\Facades\Queue; Queue::connection('kafka')->push(new ProcessPodcast($podcast->id), '', 'podcasts');
Limitations
This driver implements the core produce/consume path. The following are not yet supported:
- Delayed dispatch —
->delay()/later()is a no-op; messages are produced immediately or not at all. - Queue size —
size()is not implemented, soqueue:monitorand size-based logic won't work. - Failed jobs & retries —
pop()calls the job'shandle()directly and does not integrate with Laravel'sfailed_jobs, retry, or release mechanisms. A throwing job is logged, not retried. - Trusted payloads only — consumed messages are unserialized into job objects. Restrict write access to your topics to your own applications.
Security
If you discover any security related issues, please email info@siberfx.com instead of using the issue tracker.
Credits
License
The MIT License (MIT). Please see the composer.json for license details.


