
kafka-php for laravel

dev-master 2023-04-25 03:56 UTC

This package is not auto-updated.

Last update: 2024-04-24 06:38:23 UTC


How to use

1. Install the CHENXCHEN/kafka-php

composer require chenxchen/kafka-php:dev-baoshi

Here is a minimal example of a composer.json file:

	"require": {
		"chenxchen/kafka-php": "dev-baoshi"

2. Define you queue in config/queue.php :

return [
// ...
    'connections' => [
        'kafka-demo' => [
            'driver' => 'd-kafka',
            'metadataBrokerList' => '', // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers.
            'topics' => ['local_test', ], // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers.
            'brokerVersion' => '2.0.0', // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers.
            'producer' => [
//                 'topics' => ['local_test', 'local_test_2', ], 
                'compression' => \Kafka\Protocol\Produce::COMPRESSION_GZIP,
                'metadataRefreshIntervalMs' => 10000,
//                'metadataBrokerList' => '',
//                'brokerVersion' => '2.0.0',
                'requiredAck' => '1',
                'produceInterval' => 500,
                'timeout' => 5000,
            'consumer' => [
//                'topics' => ['local_test', 'local_test_2', ],
                'metadataRefreshIntervalMs' => 10000,
                'consumeMode' => \Kafka\ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET,
//                'metadataBrokerList' => '',
                'groupId' => 'test-chc',
//                'brokerVersion' => '2.0.0',
//                'sessionTimeout' => 30000,
//                'rebalanceTimeout' => 30000,
                'maxBytes' => 65536,
                'maxWaitTime' => 100,
                'executeHandle' => \App\Jobs\KafkaDemo\KafkaDemoHandle::class, // consumer handle
// ...

3. Define your Job


namespace App\Jobs\KafkaDemo;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class KafkaDemoJob implements ShouldQueue
    use InteractsWithQueue, Queueable, SerializesModels;

     * 内容
     * @var array
    protected $content = null;

     * Create a new job instance.
     * @param array $content  
    public function __construct($content)
        $this->content = $content;

        // connection name define in queue.php
        $this->connection = 'kafka-demo';

     * You must implement this function for get data
     * @return array
    public function getData() {
        return $this->content;

Then you can dispatch this job:

$job = new KafkaDemoJob([33,567]);

4. Define your Consumer handle


namespace App\Jobs\KafkaDemo;

use CHENXCHEN\LaravelQueueKafka\Message\TopicMessage;

class KafkaDemoHandle
     * @param TopicMessage[] $messages
    public function executes($messages) {
        printf("%s\n", json_encode($messages));

5. Start consumer

php artisan queue:kafka kafka-demo

Supported versions of Laravel

Tested on: [5.3]