micromus / kafka-bus-commiter
This is my package kafka-bus-repeater
Fund package maintenance!
Requires
- php: ^8.2
- micromus/kafka-bus: ^1.1.1
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.64
- phpstan/extension-installer: ^1.3
- phpstan/phpstan: ^2.1.0
- testo/testo: ^0.10.2
This package is auto-updated.
Last update: 2026-05-27 19:05:22 UTC
README
A middleware package for micromus/kafka-bus that provides idempotent Kafka message processing. It tracks which messages have already been handled, prevents duplicate processing, and allows limiting the maximum number of read attempts.
How It Works
ConsumerCommiterMiddleware is inserted into the consumer pipeline and performs three checks before passing a message further:
- Already committed — if the message was already successfully processed (
commitedAtis not null), the middleware logs a warning and stops the pipeline. - Max attempts exceeded — if
maxAttemptis set and the attempt count has exceeded it, the middleware logs an error and stops the pipeline. - Successful processing — if both checks pass, the message continues down the pipeline; once handled,
commit()is called to record it as processed.
Installation
composer require micromus/kafka-bus-commiter
Usage
Basic Example
Implement ConsumerMessageRepositoryInterface to persist message state (e.g. in a database or Redis), then pass the
middleware into your worker options:
use Micromus\KafkaBus\Bus; use Micromus\KafkaBus\Consumers\Router\ConsumerRoutesBuilder; use Micromus\KafkaBus\Consumers\Router\RouteInfo; use Micromus\KafkaBus\Topics\Topic; use Micromus\KafkaBus\Topics\TopicRegistry; use Micromus\KafkaBusCommiter\Middleware\ConsumerCommiterMiddleware; $topicRegistry = (new TopicRegistry()) ->add(new Topic('production.fact.products.1', 'products')); $consumerRoutes = ConsumerRoutesBuilder::make($topicRegistry) ->add(new RouteInfo('products', new YourMessageHandler())) ->build(); $workerRegistry = (new Bus\Listeners\Workers\MemoryWorkerRegistry()) ->add( new Bus\Listeners\Workers\Worker( name: 'default-listener', routes: $consumerRoutes, options: new Bus\Listeners\Workers\Options( middleware: [ new ConsumerCommiterMiddleware(new YourMessageRepository()) ] ) ) );
Middleware Options
new ConsumerCommiterMiddleware( consumerMessageRepository: $repository, // required logger: $logger, // PSR-3 logger, defaults to NullLogger maxAttempt: 3, // max attempts, -1 = unlimited )
| Parameter | Type | Default | Description |
|---|---|---|---|
consumerMessageRepository |
ConsumerMessageRepositoryInterface |
— | Storage for message state |
logger |
LoggerInterface |
NullLogger |
PSR-3 compatible logger |
maxAttempt |
int |
-1 |
Maximum number of processing attempts. -1 means unlimited |
Implementing the Repository
You need to provide your own implementation of ConsumerMessageRepositoryInterface:
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface; use Micromus\KafkaBusCommiter\Attempt; use Micromus\KafkaBusCommiter\Interfaces\ConsumerMessageRepositoryInterface; class DatabaseConsumerMessageRepository implements ConsumerMessageRepositoryInterface { /** * Returns the current attempt for a given message. * Return Attempt(number: 1) if this is the first time the message is seen. * Return Attempt with a non-null commitedAt if it was already committed. */ public function attempt(ConsumerMessageInterface $message): Attempt { // ... } /** * Marks the message as successfully processed. */ public function commit(ConsumerMessageInterface $message): void { // ... } /** * Returns true if a record for this message already exists in storage. */ public function exists(ConsumerMessageInterface $message): bool { // ... } }
The message identifier is available via $message->msgId().
Testing
composer test
Changelog
Please see CHANGELOG for more information on what has changed recently.
Contributing
Please see CONTRIBUTING for details.
Security Vulnerabilities
Please review our security policy on how to report security vulnerabilities.
Credits
License
The MIT License (MIT). Please see License File for more information.