myvon/reactphp-rdkafka

RdKafka Consumer and Producer implementation with ReactPHP Event Loop

1.0 2023-02-15 22:03 UTC

This package is auto-updated.

Last update: 2024-12-16 02:28:45 UTC


README

RdKafka implementation with ReactPHP EventLoop.

This library implement PHP RDKafka from Arnaud-lb with react/event-loop and react/stream and provide a non-blocking event-driven Consumer and Producer.

Latest Version on Packagist Tests Total Downloads

How it works

This package use periodic timers from react/event-loop to consume messages at regular interval. To avoid blocking, timeout is set to 0 when consuming.

It also use react/stream to receive and send message in an event-driven way. Consuming message is done by listening to the data event of the steam. Producing message is done by writing data to the corresponding stream. See Consuming Messages and Producing Messages below.

Installation

You can install the package via composer:

composer require myvon/reactphp-kafka

Be sure to have the PHP RDKafka extension installed on your server.

Consuming Messages

To consume message start by creating a Configuration object by passing it the name of your application (used for group.id configuration of kafka, see Consumer group id (general) for more information) and the list of your brokers :

use Myvon\Kafka\Configuration;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);

Then, you can create an "Myvon\Kafka\Consumer" instance:

use Myvon\Kafka\Consumer;

$consumer = new Consumer($configuration->consumer());

Using Configuration::consumer() generate an RdKafka\Conf instance with correct configuration for a Consumer.

You can then start consuming message by calling the start method of the consumer and passing it the list of topics you want to subscribe :

$stream = $consumer->start(['topic']);

This method will return an ThroughStream instance, allowing you to listen to the data event to receive messages :

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Consumer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$consumer = new Consumer($configuration->consumer());
$stream = $consumer->start(['topic']);

$stream->on('data', function($data) {
    $topic = $data['topic'];
    $message = $data['payload'];
    
    //... do whatever you want here 
});

The $data parameter will contain the following keys :

  • topic: contain the name of the topic the message come from
  • payload : contain the message received

Handling consumer errors

The consumer will write to the stream every message received with error RD_KAFKA_RESP_ERR_NO_ERROR.

RD_KAFKA_RESP_ERR__TIMED_OUT and RD_KAFKA_RESP_ERR__PARTITION_EOF will be ignored.

Every other error will be sent through the error event :

$stream->on("error", function(Exception $exception) {
    $errorStr = $exception->getMessage();
    $errorCode = $exception->getCode();
    // handle the error here
});

This package does not handle errors, it simply pass it to your application. It's up to you to handle it.

Consumer timeout and periodic timer

By default the timeout passed to the consume method of RdKafka\KafkaConsumer it set to 0. This prevents the method to block the execution of the script. If you want to set a timeout anyway, you can do it by passing the desired timeout (in ms) to the setConsumeTimeout method :

$consumer->setConsumeTimeout(1000); // 1 second

Be aware that this will affect the EventLoop !

BY default the consumer will look for messages every 1 second. You can set this timer by passing the new timer to the setTimerPeriod method :

$consumer->setTimerPeriod(0.1); // 100 ms

Notice: It internally use the addPeriodicTimer method of the EventLoop so the timer is in second.

Accessing the KafkaConsumer instance

If you need to access the KafkaConsumer instance directly, you can do it by calling getConsumer:

$kafkaConsumer = $consumer->getConsumer();

Producing Messages

Like the Consumer, you need to create the configuration object and pass it to Myvon\Kafka\Producer when instantiating it :

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['topicName']);

You can pass multiple topic to the start method. Call start will return one instance of ThroughStream by topic you want to publish in. You can access the stream of a given topic by :

  • Accessing it through the data returned by the start method : $streams['topicName']
  • Retrieving the ThroughStream instance of a given topic by calling getStream('topicName')

You can then write message to the stream, which will by produced to the corresponding topic :

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['myFirstTopic', 'mySecondTopic']);

$streams['myFirstTopic']->write('Hello First Topic !');
$streams['mySecondTopic']->write('Hello Second Topic !');

$producer->getStream('myFirstTopic')->write('Hello sent with getStream !');

Notice: the producer will ensure every message is sent by calling poll() every 500ms. This delay can be changed by calling setPollInterval. When streams are closed, the producer poll every message and flush them to be sure noting is lost.

I don't want to use the EventLoop when producing message

Sometime, you want to produce message directly without using an eventloop. This can be done by passing false as second argument to the start method :

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['myFirstTopic', 'mySecondTopic'], false);

$streams['myFirstTopic']->write('Hello First Topic !');
$streams['mySecondTopic']->write('Hello Second Topic !');

$producer->getStream('myFirstTopic')->write('Hello sent with getStream !');

This will deactivate every usage of the loop in the producer and your code will immediately exit after the last line. To avoid loosing message, streams are closed on the destruction of the class, thanks to the __destruct() method.

Gracefully stopping Consumer and Producer

If you want to stop the Consumer or the Producer before your code end, you can call the stop() method of each class. This will remove every periodic timer and close all streams.

By doing this you will receive the end and close events of every stream used by the Consumer or the Producer.

Using custom configuration option

You can pass custom configuration option to the RdKafka\Conf instance generated by passing them as an array to the thrid arguments of the Myvon\Kafka\Configuration constructor :

$configuration = new Configuration('appName', ['127.0.0.1:9092'], ['enable.partition.eof' => 'false']);

Using a custom loop

If you don't want the Consumer or the Producer to use the default loop, you can pass it as second arguments of the constructor of each class :

$loop = new AnotherLoopInstance();
$producer = new Producer($configuration->producer(), $loop);
$consumer = new Consumer($configuration->consumer(), $loop);

Notice: passing a loop to the producer will force loop utilization even if you pass false as second argument of start

Testing

composer test

Contributing

Please see CONTRIBUTING for details.

License

The MIT License (MIT). Please see License File for more information.