chenxchen / laravel-kafka-php
kafka-php for laravel
dev-master
2019-03-15 11:53 UTC
Requires
- chenxchen/kafka-php: dev-baoshi
- illuminate/database: 5.3.*|5.4.*|5.5.*|5.6.*|5.7.*
- illuminate/queue: 5.3.*|5.4.*|5.5.*|5.6.*|5.7.*
- illuminate/support: 5.3.*|5.4.*|5.5.*|5.6.*|5.7.*
This package is auto-updated.
Last update: 2025-03-16 04:33:41 UTC
README
How to use
1. Install the CHENXCHEN/kafka-php
composer require chenxchen/kafka-php:dev-baoshi
Here is a minimal example of a composer.json file:
{ "require": { "chenxchen/kafka-php": "dev-baoshi" } }
2. Define you queue in config/queue.php
:
return [ // ... 'connections' => [ 'kafka-demo' => [ 'driver' => 'd-kafka', 'metadataBrokerList' => '127.0.0.1:9092', // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers. 'topics' => ['local_test', ], // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers. 'brokerVersion' => '2.0.0', // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers. 'producer' => [ // 'topics' => ['local_test', 'local_test_2', ], 'compression' => \Kafka\Protocol\Produce::COMPRESSION_GZIP, 'metadataRefreshIntervalMs' => 10000, // 'metadataBrokerList' => '', // 'brokerVersion' => '2.0.0', 'requiredAck' => '1', 'produceInterval' => 500, 'timeout' => 5000, ], 'consumer' => [ // 'topics' => ['local_test', 'local_test_2', ], 'metadataRefreshIntervalMs' => 10000, 'consumeMode' => \Kafka\ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET, // 'metadataBrokerList' => '', 'groupId' => 'test-chc', // 'brokerVersion' => '2.0.0', // 'sessionTimeout' => 30000, // 'rebalanceTimeout' => 30000, 'maxBytes' => 65536, 'maxWaitTime' => 100, 'executeHandle' => \App\Jobs\KafkaDemo\KafkaDemoHandle::class, // consumer handle ], ] ], // ... ];
3. Define your Job
<?php namespace App\Jobs\KafkaDemo; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class KafkaDemoJob implements ShouldQueue { use InteractsWithQueue, Queueable, SerializesModels; /** * 内容 * @var array */ protected $content = null; /** * Create a new job instance. * * @param array $content */ public function __construct($content) { $this->content = $content; // connection name define in queue.php $this->connection = 'kafka-demo'; } /** * You must implement this function for get data * @return array */ public function getData() { return $this->content; } }
Then you can dispatch this job:
$job = new KafkaDemoJob([33,567]); dispatch($job);
4. Define your Consumer handle
<?php namespace App\Jobs\KafkaDemo; use CHENXCHEN\LaravelQueueKafka\Message\TopicMessage; class KafkaDemoHandle { /** * @param TopicMessage[] $messages */ public function executes($messages) { printf("%s\n", json_encode($messages)); } }
5. Start consumer
php artisan queue:kafka kafka-demo
Supported versions of Laravel
Tested on: [5.3]