nassirian / laravel-kafka-migration
Manage Kafka topics like Laravel migrations — create, migrate, and rollback Kafka topics with artisan commands.
Package info
github.com/nassirian/laravel-kafka-migration
pkg:composer/nassirian/laravel-kafka-migration
Requires
- php: ^8.0
- illuminate/console: ^8.0|^9.0|^10.0|^11.0|^12.0
- illuminate/container: ^8.0|^9.0|^10.0|^11.0|^12.0
- illuminate/database: ^8.0|^9.0|^10.0|^11.0|^12.0
- illuminate/filesystem: ^8.0|^9.0|^10.0|^11.0|^12.0
- illuminate/support: ^8.0|^9.0|^10.0|^11.0|^12.0
Requires (Dev)
- longlang/phpkafka: ^1.2
- mockery/mockery: ^1.6
- orchestra/testbench: ^6.0|^7.0|^8.0|^9.0|^10.0
- phpunit/phpunit: ^9.0|^10.0|^11.0
Suggests
- ext-curl: Required for the HTTP/Confluent REST Proxy driver
- ext-rdkafka: Required for the rdkafka and jobcloud drivers
- jobcloud/php-kafka-lib: Required for the jobcloud driver (fluent rdkafka wrapper)
- longlang/phpkafka: Required for the longlang driver (pure PHP Kafka client)
This package is auto-updated.
Last update: 2026-03-24 21:22:55 UTC
README
Manage Kafka topics like Laravel migrations — create, track, and rollback Kafka topics using familiar artisan commands.
Requirements
- PHP 8.1+
- Laravel 9, 10, 11, or 12
Installation
composer require nassirian/laravel-kafka-migration
Publish the config:
php artisan vendor:publish --tag=kafka-migration-config
Configuration
Set your Kafka connection in .env:
KAFKA_MIGRATION_DRIVER=longlang # rdkafka | longlang | http | mock KAFKA_BROKERS=localhost:9092
Available Drivers
| Driver | Description | Requires |
|---|---|---|
longlang |
Pure PHP Kafka client (recommended) | composer require longlang/phpkafka |
rdkafka |
High-performance via PHP extension | pecl install rdkafka |
jobcloud |
Fluent rdkafka wrapper by Jobcloud | composer require jobcloud/php-kafka-lib + ext-rdkafka |
http |
Confluent REST Proxy API | ext-curl, REST Proxy running |
mock |
In-memory (for testing/development) | Nothing |
Usage
Create a topic migration
php artisan make:kafka-topic orders php artisan make:kafka-topic user-events php artisan make:kafka-topic payment.processed
This creates a timestamped file in database/kafka-migrations/:
// database/kafka-migrations/2024_01_15_123456_create_orders_topic.php use Nassirian\LaravelKafkaMigration\Migration\KafkaMigration; return new class extends KafkaMigration { public function up(): void { $this->createTopic( $this->topic('orders') ->partitions(3) ->replicationFactor(1) ->retentionMs(604_800_000) // 7 days ->cleanupPolicy('delete') ->compressionType('snappy') ); } public function down(): void { $this->deleteTopic('orders'); } };
Topic builder options
$this->topic('my-topic') ->partitions(6) // number of partitions ->replicationFactor(3) // replication factor ->retentionMs(604_800_000) // retention in milliseconds ->retentionBytes(1_073_741_824) // retention in bytes ->cleanupPolicy('delete') // delete | compact | delete,compact ->compressionType('snappy') // none | gzip | snappy | lz4 | zstd | producer ->minInsyncReplicas(2) // min ISR ->maxMessageBytes(1_048_576) // max message size ->segmentBytes(1_073_741_824) // segment size ->config('custom.key', 'value'); // any raw Kafka config
Run migrations
php artisan kafka:migrate
Check status
php artisan kafka:migrate:status
Rollback last batch
php artisan kafka:migrate:rollback
php artisan kafka:migrate:rollback --step=2 # rollback 2 migrations
Reset all migrations
php artisan kafka:migrate:reset
Pretend mode (dry run)
php artisan kafka:migrate --pretend php artisan kafka:migrate:rollback --pretend
Using the Facade
use Nassirian\LaravelKafkaMigration\Facades\KafkaManager; KafkaManager::topicExists('orders'); // bool KafkaManager::listTopics(); // string[] KafkaManager::getTopicMetadata('orders'); // array // Switch drivers at runtime KafkaManager::driver('rdkafka')->listTopics();
Jobcloud Driver Details
The jobcloud driver uses jobcloud/php-kafka-lib as its configuration layer. That library provides a clean, opinionated fluent builder around ext-rdkafka — it handles broker normalisation, SASL, and SSL setup — and the package then hands off the resulting KafkaConfiguration (which extends RdKafka\Conf) directly to RdKafka\AdminClient for all topic operations.
KAFKA_MIGRATION_DRIVER=jobcloud KAFKA_BROKERS=broker1:9092,broker2:9092 # Optional SASL KAFKA_SECURITY_PROTOCOL=SASL_SSL KAFKA_SASL_MECHANISMS=PLAIN KAFKA_SASL_USERNAME=your-user KAFKA_SASL_PASSWORD=your-pass # Optional SSL KAFKA_SSL_CA=/path/to/ca.pem KAFKA_SSL_CERT=/path/to/cert.pem KAFKA_SSL_KEY=/path/to/key.pem
Extra rdkafka settings can be passed via extra_config in config/kafka-migration.php:
'jobcloud' => [ // ... 'extra_config' => [ 'log_level' => '6', 'fetch.message.max.bytes' => '1048576', ], ],
Integration tests that require a live broker are tagged with @group integration and can be run separately:
composer test-integration
Registering a Custom Driver
// In a service provider use Nassirian\LaravelKafkaMigration\KafkaManager; $this->app->make(KafkaManager::class)->extend('my-driver', function ($app) { return new MyCustomDriver($app['config']['kafka-migration.drivers.my-driver']); });
Testing
Use the mock driver in your tests so no real Kafka connection is needed:
// config/kafka-migration.php or .env.testing KAFKA_MIGRATION_DRIVER=mock
Run the test suite:
composer test
or
./vendor/bin/phpunit
License
MIT