purplefan / php-rdkafka-bundle
Integrates php-rdkafka with Symfony2|3
1.0.1
2016-10-04 08:40 UTC
Requires
- php: >=5.6
- ext-rdkafka: *
- symfony/config: ~2.3 || ~3.0
- symfony/console: ~2.3 || ~3.0
- symfony/dependency-injection: ~2.3 || ~3.0
- symfony/framework-bundle: ~2.3 || ~3.0
- symfony/yaml: ~2.3 || ~3.0
Requires (Dev)
- kwn/php-rdkafka-stubs: *
- phpunit/phpunit: ~4.8 || ~5.0
This package is not auto-updated.
Last update: 2025-03-05 22:50:32 UTC
README
About
This Symfony bundle provides connectivity to the Kafka publish-subscribe messaging system based on rdkafka binding to librdkafka
Installation
Add the dependency in your composer.json
{ "require": { "mshauneu/php-rdkafka-bundle" } }
Enable the bundle in your application kernel
// app/AppKernel.php public function registerBundles() { $bundles = array( // ... new Mshauneu\RdKafkaBundle\MshauneuRdKafkaBundle(), ); }
Configuration
Simple configuration could look like:
mshauneu_rd_kafka: producers: test_producer: brokers: 127.0.0.1:9092 topic: test_topic consumers: test_consumer: brokers: 127.0.0.1:9092 topic: test_topic properties: group_id: "test_group_id" topic_properties: offset_store_method: broker auto_offset_reset: smallest auto_commit_interval_ms: 100
Configuration properties are documented:
- for producer or consumer in CommunicatorConfiguration.php
- for topic to produce in TopicProducerConfiguration.php
- for topic to consume in TopicConsumerConfiguration.php
Usage
Publishing messages to a Kafka topic
From a Symfony controller:
$payload = 'test_message'; $topicProducer = $container->get('mshauneu_rd_kafka')->getProducer("test_producer"); $topicProducer->produceStart(); $topicProducer->produce("message"); $topicProducer->produceStop();
By CLI:
./app/console kafka:producer --producer test_producer test_message
Consume messages out of a Kafka topic:
Implement ConsumerInterface
class MessageHandler implements ConsumerInterface { public function consume($topic, $partition, $offset, $key, $payload) { echo "Received payload: " . $payload . PHP_EOL; } }
Register it:
test_message_handler: class: MessageHandler
From a Symfony controller:
$topicConsumer = $container->get('mshauneu_rd_kafka')->getConsumer("test_producer"); $topicConsumer->consumeStart(TopicCommunicator::OFFSET_STORED); $topicConsumer->consume($consumerImpl); $topicConsumer->consumeStop();
By CLI:
./app/console kafka:consumer --consumer test_consumer --handler test_message_handler
License
This project is under the MIT License. See the LICENSE file for the full license text.