purplefan/php-rdkafka-bundle

Integrates php-rdkafka with Symfony2|3

1.0.1 2016-10-04 08:40 UTC

This package is not auto-updated.

Last update: 2024-03-20 17:46:19 UTC


README

About

This Symfony bundle provides connectivity to the Kafka publish-subscribe messaging system based on rdkafka binding to librdkafka

Installation

Add the dependency in your composer.json

{
    "require": {
        "mshauneu/php-rdkafka-bundle"
    }
}

Enable the bundle in your application kernel

// app/AppKernel.php
public function registerBundles() {
    $bundles = array(
        // ...
        new Mshauneu\RdKafkaBundle\MshauneuRdKafkaBundle(),
    );
}

Configuration

Simple configuration could look like:

mshauneu_rd_kafka:
  producers: 
    test_producer: 
      brokers: 127.0.0.1:9092
      topic: test_topic   
  consumers:
    test_consumer:
      brokers: 127.0.0.1:9092
      topic: test_topic   
      properties: 
        group_id: "test_group_id"
      topic_properties: 
        offset_store_method: broker           
        auto_offset_reset: smallest
        auto_commit_interval_ms: 100

Configuration properties are documented:

Usage

Publishing messages to a Kafka topic

From a Symfony controller:

$payload = 'test_message';
$topicProducer = $container->get('mshauneu_rd_kafka')->getProducer("test_producer");
$topicProducer->produceStart();
$topicProducer->produce("message");
$topicProducer->produceStop();

By CLI:

./app/console kafka:producer --producer test_producer test_message 

Consume messages out of a Kafka topic:

Implement ConsumerInterface

class MessageHandler implements ConsumerInterface {
	public function consume($topic, $partition, $offset, $key, $payload) {
		echo "Received payload: " . $payload . PHP_EOL;
	}
}

Register it:

test_message_handler:
    class: MessageHandler

From a Symfony controller:

$topicConsumer = $container->get('mshauneu_rd_kafka')->getConsumer("test_producer");
$topicConsumer->consumeStart(TopicCommunicator::OFFSET_STORED);
$topicConsumer->consume($consumerImpl);
$topicConsumer->consumeStop();

By CLI:

./app/console kafka:consumer --consumer test_consumer --handler test_message_handler 

License

This project is under the MIT License. See the LICENSE file for the full license text.