kafka-bus/laravel-bridge

This is my package laravel-kafka-bus

Maintainers

Package info

github.com/kafka-bus/laravel-bridge

pkg:composer/kafka-bus/laravel-bridge

Fund package maintenance!

Micromus

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

v1.1.0 2026-06-08 17:13 UTC

This package is auto-updated.

Last update: 2026-06-08 17:13:44 UTC


README

Latest Version on Packagist GitHub Tests Action Status GitHub Code Style GitHub PHPStan Total Downloads

Laravel integration for kafka-bus — a configuration-driven Apache Kafka client built on top of ext-rdkafka. The package wires producers, consumer workers, topic routing, and middleware into the framework, and ships an optional Commiter component for idempotent message handling backed by the database.

Requirements

  • PHP ^8.2
  • Laravel ^10.0 || ^11.0 || ^12.0
  • ext-rdkafka

Installation

Install the package via Composer:

composer require kafka-bus/laravel-bridge

Publish the main configuration file:

php artisan vendor:publish --tag=kafka-bus

To use the Commiter component (idempotency and commit tracking), additionally publish its configuration and migrations:

php artisan vendor:publish --tag=kafka-bus-commiter
php artisan migrate

Configuration

The main configuration lives in config/kafka-bus.php and is split into four sections:

  • connections — Kafka broker connections and driver-specific options.
  • topics — logical topic keys mapped to physical Kafka topic names.
  • consumers — workers, topic-to-handler bindings, middleware, and consumer options.
  • producers — message-to-topic routes, middleware, and producer options.

Connections

Each connection is selected by a driver and a set of options passed straight to librdkafka. The default key picks the active connection by name.

'default' => env('KAFKA_CONNECTION', 'kafka'),

'connections' => [
    'kafka' => [
        'driver' => 'kafka',
        'options' => [
            'metadata.broker.list' => env('KAFKA_BROKER_LIST', 'localhost:9092'),
            'security.protocol'    => env('KAFKA_SECURITY_PROTOCOL', 'SASL_PLAINTEXT'),
            'sasl.mechanisms'      => env('KAFKA_SASL_MECHANISMS', 'PLAIN'),
            'sasl.username'        => env('KAFKA_SASL_USERNAME'),
            'sasl.password'        => env('KAFKA_SASL_PASSWORD'),
            'debug'                => env('KAFKA_DEBUG', false),
        ],
    ],

    'testing' => [
        'driver'  => 'null',
        'options' => [],
    ],
],

The null driver is useful for tests — calls to the bus succeed without touching a real broker.

Topics

Topic names usually depend on the environment. The bus prepends topic_prefix to every physical topic name, and the topics map binds a short logical key to that physical name.

'topic_prefix' => env('KAFKA_PREFIX', env('APP_ENV', 'local').'.'),

'topics' => [
    'products' => 'fact.products.1',
    'orders'   => 'fact.orders.1',
],

With APP_ENV=production, products resolves to production.fact.products.1.

Producers

A producer route binds a message class to a logical topic key. The shortest form maps the class directly to a topic key:

'producers' => [
    'middleware' => [
        // KafkaBus\Commiter\Middleware\ProducerIdempotencyMiddleware::class,
    ],

    'routes' => [
        App\Kafka\Messages\ProductMessage::class => 'products',
    ],

    'flush_timeout' => 5000,
    'flush_retries' => 5,

    'additional_options' => [
        'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'),
    ],
],

The verbose form lets you override timeouts, append per-route middleware, and pass driver options:

'routes' => [
    App\Kafka\Messages\ProductMessage::class => [
        'topic_key'          => 'products',
        'middleware'         => [App\Kafka\Middleware\AuditTrailMiddleware::class],
        'additional_options' => [],
        'flush_timeout'      => 5000,
        'flush_retries'      => 5,
    ],
],

Publish a message through the bus:

use KafkaBus\Core\Interfaces\Bus\BusInterface;

public function execute(BusInterface $bus): void
{
    $bus->publish(new \App\Kafka\Messages\ProductMessage(/* ... */));
}

Or via the KafkaBus facade:

use KafkaBus\Laravel\Facades\KafkaBus;

KafkaBus::publish(new \App\Kafka\Messages\ProductMessage(/* ... */));

Consumers

Workers are the units consumed by the artisan command. Each worker subscribes to one or more topics and dispatches incoming messages to handler classes.

'consumers' => [
    'middleware' => [
        // KafkaBus\Commiter\Middleware\ConsumerCommiterMiddleware::class,
    ],

    'workers' => [
        // Multi-topic worker with per-worker overrides
        'default' => [
            'middleware'   => [],
            'auto_commit'  => false,
            'consume_timeout' => 20000,
            'topics' => [
                'products' => App\Kafka\Consumers\ProductsTopicConsumer::class,
                'orders'   => [
                    'handler'    => App\Kafka\Consumers\OrdersTopicConsumer::class,
                    'middleware' => [App\Kafka\Middleware\TenantContextMiddleware::class],
                ],
            ],
        ],

        // Single-topic worker, worker name == topic key
        'products' => App\Kafka\Consumers\ProductsTopicConsumer::class,

        // Single-topic worker with overrides, worker name == topic key
        'orders' => [
            'middleware' => [],
            'handler'    => App\Kafka\Consumers\OrdersTopicConsumer::class,
        ],

        // Single-topic worker where worker name != topic key
        'products-secondary' => [
            'topic_key'  => 'products',
            'middleware' => [],
            'handler'    => App\Kafka\Consumers\ProductsTopicConsumer::class,
        ],
    ],

    'auto_commit'     => env('KAFKA_CONSUMER_AUTO_COMMIT', false),
    'consume_timeout' => 5_000,

    'additional_options' => [
        'group.id'              => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')),
        'max.poll.interval.ms'  => env('KAFKA_MAX_POLL_INTERVAL_MS', 300_000),
        'session.timeout.ms'    => env('KAFKA_SESSION_TIMEOUT_MS', 45_000),
        'heartbeat.interval.ms' => env('KAFKA_HEARTBEAT_INTERVAL_MS', 3_000),
        'auto.offset.reset'     => 'beginning',
    ],
],

Each worker resolves options in this order: additional_options, auto_commit, consume_timeout, and middleware are taken from the worker entry, then merged with the global consumers.* defaults.

Run a worker:

php artisan kafka:consume default

Artisan commands

Command Description
kafka:consume {workerName} Start a long-running consumer for the given worker.
kafka:worker:list Show registered workers, their topic keys, resolved topic names, handlers, consumer middleware, and route middleware.
kafka:route:list Show registered producer routes (message class → topic) and middleware.
kafka:offset:show {workerName} Show current / min / max offsets for every partition of every topic the worker subscribes to.
kafka:offset:set {workerName} {topicKey} {offset} {--partition=} Set the committed offset for a topic. offset accepts earliest, latest, or a numeric value. Omit --partition to apply to all partitions of the topic.

Inspecting workers and routes

php artisan kafka:worker:list
+----------+-----------+----------------------------+------------------------------------------+---------------------+----------------+
| Worker   | Topic key | Topic name                 | Handler                                  | Consumer Middleware | Route Middleware|
+----------+-----------+----------------------------+------------------------------------------+---------------------+----------------+
| default  | products  | production.fact.products.1 | App\Kafka\Consumers\ProductsTopicConsumer|                     |                |
| default  | orders    | production.fact.orders.1   | App\Kafka\Consumers\OrdersTopicConsumer  | App\...\AuditMiddleware | App\...\TenantMiddleware |
| products | products  | production.fact.products.1 | App\Kafka\Consumers\ProductsTopicConsumer|                     |                |
+----------+-----------+----------------------------+------------------------------------------+---------------------+----------------+
php artisan kafka:route:list
+----------------------------------------+-----------+----------------------------+------------+
| Message                                | Topic key | Topic name                 | Middleware |
+----------------------------------------+-----------+----------------------------+------------+
| App\Kafka\Messages\ProductMessage      | products  | production.fact.products.1 | 0          |
| App\Kafka\Messages\OrderMessage        | orders    | production.fact.orders.1   | 1          |
+----------------------------------------+-----------+----------------------------+------------+

Inspecting and resetting offsets

php artisan kafka:offset:show default
+-----------+----------------------------+-----------+---------+-----+-----+
| Topic key | Topic name                 | Partition | Current | Min | Max |
+-----------+----------------------------+-----------+---------+-----+-----+
| products  | production.fact.products.1 | 0         | 142     | 0   | 200 |
| products  | production.fact.products.1 | 1         | 90      | 0   | 150 |
+-----------+----------------------------+-----------+---------+-----+-----+

Reset all partitions of a topic to the earliest available offset:

php artisan kafka:offset:set default products earliest

Move a single partition to an explicit numeric offset:

php artisan kafka:offset:set default products 150 --partition=0

Jump every partition to the high-water mark (skip backlog):

php artisan kafka:offset:set default products latest

The command prints the resulting offsets:

+-----------+----------------------------+-----------+-----+-----+
| Topic key | Topic name                 | Partition | Old | New |
+-----------+----------------------------+-----------+-----+-----+
| products  | production.fact.products.1 | 0         | 142 | 0   |
| products  | production.fact.products.1 | 1         | 90  | 0   |
+-----------+----------------------------+-----------+-----+-----+

The worker must not be running while you reset its offsets — otherwise the active consumer group will overwrite the new position on its next commit.

Commiter

The Commiter component (powered by micromus/kafka-bus-commiter) provides:

  • Consumer idempotency — every incoming message is tracked in the kafka_bus_commits table; duplicates are skipped, retries are counted, and a configurable max-attempt threshold can stop poison messages.
  • Producer idempotency keys — outgoing messages implementing HasIdempotency automatically receive an x-idempotency-key header, which the consumer side uses as the dedup key.

It is registered automatically by CommiterServiceProvider (loaded via package auto-discovery).

Configuration

// config/kafka-bus-commiter.php
return [
    'connection' => env('KAFKA_COMMITER_CONNECTION'),
    'table'      => 'kafka_bus_commits',
    'repository' => env('KAFKA_COMMITER_REPOSITORY', 'idempotency'),

    'repositories' => [
        'idempotency' => \KafkaBus\Commiter\Repositories\IdempotencyMessageRepository::class,
        'native'      => \KafkaBus\Commiter\Repositories\NativeMessageRepository::class,
    ],
];
  • connection — Laravel database connection name; null uses the default connection.
  • table — name of the commits table created by the published migration.
  • repository — strategy for deriving the dedup key:
    • idempotency — reads the x-idempotency-key header combined with the topic name, falling back to the raw Kafka message id if the header is missing.
    • native — uses the raw Kafka message id only.
  • repositories — registry of repository implementations; add your own class here and reference it via KAFKA_COMMITER_REPOSITORY.

Enabling the consumer middleware

Add ConsumerCommiterMiddleware to the consumer middleware stack — either globally for every worker, or only for specific workers/topics:

'consumers' => [
    'middleware' => [
        \KafkaBus\Commiter\Middleware\ConsumerCommiterMiddleware::class,
    ],

    'workers' => [
        'orders' => [
            'middleware' => [
                \KafkaBus\Commiter\Middleware\ConsumerCommiterMiddleware::class,
            ],
            'handler' => App\Kafka\Consumers\OrdersTopicConsumer::class,
        ],
    ],
],

For each message the middleware:

  1. Resolves a dedup key via the configured repository.
  2. If the key was already committed — the message is skipped and a warning is logged.
  3. If the per-key attempt count exceeds maxAttempt (when configured) — the message is skipped and an error is logged.
  4. Otherwise the pipeline is executed; on success the key is committed, on failure the attempt counter is incremented and the exception is re-thrown.

Producing idempotent messages

Implement HasIdempotency on the producer message and enable ProducerIdempotencyMiddleware:

use KafkaBus\Commiter\Interfaces\HasIdempotency;
use KafkaBus\Core\Messages\ProducerMessage;

final class ProductMessage extends ProducerMessage implements HasIdempotency
{
    public function __construct(private string $productId) {}

    public function getIdempotencyKey(): string
    {
        return $this->productId;
    }
}
'producers' => [
    'middleware' => [
        \KafkaBus\Commiter\Middleware\ProducerIdempotencyMiddleware::class,
    ],
],

The middleware adds the x-idempotency-key header to every outgoing message; consumers running ConsumerCommiterMiddleware with the idempotency repository will use it as the dedup key.

Testing

composer test

KafkaBus::fake()

The KafkaBus facade ships a first-class fake that works exactly like Event::fake() or Mail::fake(). Call KafkaBus::fake() at the start of a test to replace the real BusInterface binding with an in-memory FakeBus. From that point on every call to the facade is forwarded to the fake — publish() calls are intercepted and stored, consumer pipelines can be triggered directly, and the full set of assertion methods becomes available.

use KafkaBus\Laravel\Facades\KafkaBus;

KafkaBus::fake();

Asserting producer messages

use App\Kafka\Messages\ProductMessage;
use KafkaBus\Laravel\Facades\KafkaBus;

it('publishes a product message', function () {
    KafkaBus::fake();

    app(CreateProductAction::class)->execute(productId: 1);

    KafkaBus::assertPublished(ProductMessage::class);
});

Assert with a callback to inspect the serialised ProducerMessage (after the full producer pipeline, including middleware). The callback receives a Micromus\KafkaBus\Producers\Messages\ProducerMessage instance:

KafkaBus::assertPublished(
    ProductMessage::class,
    fn($msg) => str_contains($msg->payload, '"id":1')
        && isset($msg->headers['x-idempotency-key'])
);

Other available assertions:

// Assert published exactly N times
KafkaBus::assertPublishedTimes(ProductMessage::class, 2);

// Assert a specific message was NOT published
KafkaBus::assertNotPublished(ProductMessage::class);

// Assert no messages were published at all
KafkaBus::assertNothingPublished();

Retrieve the published messages directly for custom assertions:

// list<ProducerMessage> — serialised messages including payload, headers, topic
$messages = KafkaBus::getPublished(ProductMessage::class);
$all      = KafkaBus::allPublished();

Dispatching and asserting consumer messages

addMessage() queues an RdKafka\Message into the fake connection. Once queued, call listen() to run the full consumer path — ConnectionFakerConsumerFakerConsumerStream → consumer middleware → route middleware → handler → commit — without touching a real broker.

use KafkaBus\Core\Testing\Consumers\MessageFactory;
use KafkaBus\Laravel\Facades\KafkaBus;

it('handles a product message', function () {
    KafkaBus::fake();

    $message = MessageFactory::for()
        ->withTopicKey('products')
        ->withHeaders(['x-idempotency-key' => 'abc-123'])
        ->make('{"id":1,"name":"Widget"}');

    KafkaBus::addMessage($message);
    KafkaBus::listen('products');

    // Assert side effects produced by the handler
    expect(Product::find(1))->not->toBeNull();
});

Queue multiple messages before triggering listen():

$factory = MessageFactory::for()->withTopicKey('products');

KafkaBus::addMessage($factory->make('{"id":1}'));
KafkaBus::addMessage($factory->make('{"id":2}'));

KafkaBus::listen('products');

Asserting committed messages

After listen() each successfully processed message is committed into the fake connection. Use the commit assertions to verify that your handler ran and the offset was acknowledged:

KafkaBus::addMessage(
    MessageFactory::for()->withTopicKey('products')->make('{"id":1}')
);

KafkaBus::listen('products');

// Assert at least one message was committed on the topic
KafkaBus::assertCommitted('products');

// Assert with a condition on the ConsumerMessageInterface
KafkaBus::assertCommitted(
    'products',
    fn($msg) => $msg->payload() === '{"id":1}'
        && $msg->headers()['x-idempotency-key'] === 'abc-123'
);

// Assert exact count
KafkaBus::assertCommittedTimes('products', 2);

// Assert nothing was committed (e.g. before listen() is called)
KafkaBus::assertNothingCommitted();

Retrieve committed messages directly for custom assertions:

$committed = KafkaBus::getCommitted('products'); // list<ConsumerMessageInterface>

expect($committed[0]->payload())->toBe('{"id":1}');
expect($committed[0]->headers())->toHaveKey('x-idempotency-key');

Changelog

Please see CHANGELOG for more information on what has changed recently.

Contributing

Please see CONTRIBUTING for details.

Security Vulnerabilities

Please review our security policy on how to report security vulnerabilities.

Credits

License

The MIT License (MIT). Please see License File for more information.