simpod/kafka-bundle
Kafka Symfony bundle.
Requires
- ext-rdkafka: *
- psr/log: ^1.0
- ext-pcntl: *
- php: ^7.2
- symfony/event-dispatcher: ^4.1
- symfony/config: ^3.4|^4.1
- symfony/console: ^3.4|^4.1
- symfony/dependency-injection: ^3.4|^4.1
- symfony/framework-bundle: ^3.4|^4.1
- phpunit/phpunit: ^8.0
Requires (Dev)
- kwn/php-rdkafka-stubs: ^1.0
- doctrine/coding-standard: ^5.0
- phpstan/phpstan: ^0.10.3
- phpstan/phpstan-phpunit: ^0.10.0
- phpstan/phpstan-strict-rules: ^0.10.1
- symfony/yaml: ^3.4|^4.1
Suggests
- kwn/php-rdkafka-stubs: Support and autocompletion for RDKafka in IDE | require as dev dependency
README
Installation
Add as Composer dependency:
$ composer require simpod/kafka-bundle
Then add KafkaBundle
to Symfony's bundles.php
:
use SimPod\KafkaBundle\SimPodKafkaBundle; return [ ... new SimPodKafkaBundle() ... ];
Usage
This package simply makes it easier to integrate https://github.com/arnaud-lb/php-rdkafka with Symfony. For more details how to work with Kafka in PHP, refer to its documentation.
This bundle registers these commands:
bin/console debug:kafka:consumers
to list all available consumer groupsbin/console kafka:consumer:run <consumer group name>
to run consumer instance to join specific consumer group
Setup
Create eg. kafka.yaml
file in your config directory with following content:
kafka: broker_list: '%env(KAFKA_BROKER_LIST)%' # required client: id: 'your-application-name'
It reads env var KAFKA_BROKER_LIST
that contains comma-separated list of brokers (broker-1.kafka.com:9092,broker-2.kafka.com:9092
).
If not set, it defaults to 127.0.0.1:9092
Producing
To create producer, you will need only Brokers
from this bundle, there's no need for anything else.
Simple example:
<?php declare(strict_types=1); use RdKafka\Producer; use SimPod\KafkaBundle\Kafka\Brokers; use const RD_KAFKA_PARTITION_UA; use function json_encode; class SimpleProducer { private const TOPIC_NAME = 'topic1'; /** @var Brokers */ private $brokers; public function __construct(Brokers $brokers) { $this->brokers = $brokers; } public function produce(MessageObject $message) : void { $producer = new Producer(); $producer->addBrokers($this->brokers->getList()); $topic = $producer->newTopic(self::TOPIC_NAME); // 4th argument can be optional partitioning key $topic->produce( RD_KAFKA_PARTITION_UA, 0, json_encode($message) ); } }
Consuming
This is example of simple consumer that belongs into simple_consumer_group
and consuming topic1
<?php declare(strict_types=1); use RdKafka\Message; use SimPod\KafkaBundle\Kafka\Consumer\Consumer; use SimPod\KafkaBundle\Kafka\Consumer\Configuration; final class SimpleConsumer extends Consumer { private const GROUP_ID = 'simple_consumer_group'; public function consume(Message $kafkaMessage) : void { // Execute your consumer logic here // Example manual commit $this->kafkaConsumer->commit($kafkaMessage); } public function getGroupId() : string { return self::GROUP_ID; } /** * @return string[] */ public function getTopics() : array { return ['topic1']; } public function getConfiguration() : Configuration { return new Configuration($this->getGroupId()); } }
It is automatically registered to container for it extends Consumer
Development
There is kwn/php-rdkafka-stubs
listed as a dev dependency so it properly integrates php-rdkafka extension with IDE.