werk365 / larakafka
Kafka client for laravel that can easily handle producing and consuming messages
Requires
- illuminate/support: ~7|~8
- jobcloud/php-kafka-lib: ^1
Requires (Dev)
- orchestra/testbench: ~5|~6
- phpunit/phpunit: ~9.0
This package is auto-updated.
Last update: 2024-12-19 20:33:52 UTC
README
Kafka client package for use in Laravel. Based on jobcloud/php-kafka-lib.
This package supports an extension to produce spatie activitylog activity automatically. Use werk365/larakafka-activity
.
Without much configuration (simply making sure the config has the required broker information and credentials), you'll be able to enable the Spatie Activity Logging on a model, and this package will take care of also sending that information to a kafka topic corresponding with your application name.
Besides this basic logging feature, it also allows you to produce and consume anything you would want.
Producing can be easily done in-code, and you can start any number of consumers through:
$ php artisan larakafka:consume {topic}
Installation
Via Composer
$ composer require werk365/larakafka
Publish the config file using
$ php artisan vendor:publish --provider="Werk365\LaraKafka\LaraKafkaServiceProvider"
Configuration
The publishes config file looks as follows, you can find an explanation below it.
<?php return [ 'client' => [ 'client_name' => str_replace(' ', '-', strtolower(env('APP_NAME'))), 'configs' => [ 'producer' => [ 'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))), 'compression.codec' => 'snappy', 'security.protocol' => 'SASL_SSL', 'sasl.mechanisms' => 'PLAIN', 'sasl.username' => '', 'sasl.password' => '' ], 'consumer' => [ 'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))), 'security.protocol' => 'SASL_SSL', 'sasl.mechanisms' => 'PLAIN', 'sasl.username' => '', 'sasl.password' => '', 'auto.offset.reset' => 'earliest' ] ], 'broker' => '', ], 'functions' => [ 'consumer' => [ 'example' => [ 'function' => 'ingest', 'namespace' => '\App\Services\KafkaService' ] ], ], 'maps' => [ 'consumer' => [ 'example' => [ 'model' => 'App\Models\Example', 'event_id' => 'uuid', 'model_id' => 'id', 'attributes' => [ 'uuid' => 'id', 'first_name' => 'name', ] ] ] ] ];
Not all of the above configuration is needed for every usecase. If you just wish to use the activity logging feature, simply make sure to configure the client.configs.producer
and client.broker
.
Client
The producer and consumer configuration live here. This should be fairly straight forward. Do note that the default topic set for the producer will be the client.client_name
value.
Functions
This is currently only used for the consumer, if you wish to consume a topic, the corresponding function will be called when a message is received. In this case while consuming the example
topic, a static function \App\Services\KafkaService::ingest($key, $headers, $body)
would be called. It is then up to your application to process that data.
Maps
Lastly, this is the configuration for the storeMessage()
function. This function can be used to help easily process the data received from Kafka. This function expects an array of data and can map and store it to a database for you. Further information about this function in usage, but some configuration points:
model
= The model that should be used to store the data
event_id
= The key name of the unique id that belongs to the object
model_id
= The key name of the unique id as it is called in the model
attributes
= The attributes that should be stored for this model. Not all attributes configured here have to be present in the consumed message, as only updates attributes could be sent. The key
represents the attribute key name as it is in the event, the value
represents the key as it is called in the model.
Usage
Produce
use Werk365\LaraKafka\LaraKafka; $kafka = new LaraKafka(); $kafka->setTopic("string") //optional, defaults to application name ->setKey("string") // optional, default will be the caller classname ->setHeaders(["key" => "value"]) // optional, default will contain more information about caller ->setBody("string") // Body can also be set like: $kafka = new LaraKafka("body") ->produce();
Other available methods:
->setBroker("string")
Sets broker other than defined in config
->setProducerConfig([])
Overrides config settings
->addProducerConfig("key", "value")
Adds value to set config
->addHeaders([])
Merges added headers array in to set one
Octane Consumer
To run a consumer when using Laravel Octane, you can choose to either have it run through a Swoole worker or normally though a php process using the console command described in the next step. To use the Octane version, first make a new consumer using:
$ php artisan kafka:consumer topic
This will create a new consumer class in the App\Consumers
namespace. For example: App\Consumers\TestConsumer
when using test
as the topic name.
In this consumer you will find a handleMessage()
method which has everything you will need to start processing your messages. To make sure this consumer is started with your octane application, add the consumer to the listeners in the octane config file. I recommend adding it as a listener for the TickReceived
event like so:
'listeners' => [ // ... TickReceived::class => [ ...Octane::prepareApplicationForNextOperation(), \App\Consumers\TestConsumer::class ], // ... ],
The consumer will only be started once and keep running, but putting it here will mean it will be started every octane-tick (every second), so the consumer can ensure it is still running. If the initial consumer has given no sign of live for 60
seconds, a new consumer will be started.
Console Consumer
To run a consumer, you can simply run
$ php artisan larakafka:consume {topic}
On reading a message, the function defined in your config will be called. If we have the function given in the example config, it could simply look like this:
namespace App\Services; use Illuminate\Support\Facades\Log; class KafkaService { public static function ingest($key, $headers, $body) { Log::info(json_encode($body)); } }
If you wish to store the data received in the body, you can use the storeMessage
method. This method takes in an array of attributes and an array of types. This means one array of attributes can be mapped and stored to different Models (types). In this example we'll only store one model, assuming the example config, and assuming the body has event_attributes
which is an array containing attributes.
namespace App\Services; use Werk365\LaraKafka\LaraKafka; class KafkaService { public static function ingest($key, $headers, $body) { $kafka = new LaraKafka(); $kafka->storeMessage($body->event_attributes, ["user"]); } }
Change log
Please see the changelog for more information on what has changed recently.
Testing
WIP
$ composer test
Security
If you discover any security related issues, please email author email instead of using the issue tracker.
Credits
License
license. Please see the license file for more information.