coste / kafka-iterator
Let you consume a Kafka topic with a simple foreach loop
Requires
- ext-rdkafka: ~4.0|~5.0
README
What is KafkaConsumerIterator.php ?
KafkaConsumerIterator.php is an abstraction layer for Rdkafka extension, written in PHP.
Introduction
Using Rdkafka makes your code coupled to a not so stable API… This library provides a class which complies with PHP Iterator interface so consuming a Kafka topic is as easy as a foreach loop and can be combined with other iterators.
This can simplify your business logic and unit testing as there is nothing easier than replacing a Traversable object by an other one.
Installation
composer require coste/kafka-iterator
Examples
Classic usage
<?php
use Coste\Kafka;
$consumer = new ConsumerIterator([
'topics' => [
'topic_1',
'topic_2',
],
'group_id' => 'consumer1',
'brokers' => [
'127.0.0.1:9092',
'127.0.0.2:9092',
],
]);
foreach ($consumer as $key => $message) {
try {
// 1. Processing a message
my_complex_process_which_can_fail($message);
} catch (RuntimeError $e) {
// 2. Some exception in my process occured
put_the_message_somewhere($message);
} catch (Exception $e) {
// 3. If I want, I can still break the loop.
// If I restart the loop, this message will be read again
// as it was not treated.
break;
}
// 4. Each loop commits the offset
}
Switching iterator
<?php
use Coste\Kafka;
if ($topics) {
$it = new ConsumerIterator([
'topics' => $topics,
// …
]);
} else {
$it = new \ArrayIterator([
"message 1",
"message 2",
]);
}
foreach ($it as $message) {
// …
}
Decorated iterator
<?php
use Coste\Kafka;
$it = new ConsumerIterator([
'topics' => $topics,
// …
]);
$it = new \CallbackFilterIterator($it, function($current) {
return !empty($current);
});
$it = new \CachingIterator($it);
foreach($it as $message) {
// …
}
Use SSL security
<?php
$consumer = new ConsumerIterator([
'topics' => [
'topic_1',
'topic_2'
],
'group_id' => 'consumer1',
'brokers' => [
'127.0.0.1:9092',
'127.0.0.2:9092',
],
'ssl' => [
'ca' => [
'location' => '/etc/my_ca.pem',
],
'certificate' => [
'location' => '/etc/my_certificate.pem',
],
'key' => [
'location' => '/etc/my_key.pem'],
// add key password, only if needed
'password' => 'aabbcc123',
],
],
]);
// …
Contributing
If you have some ideas of improvement or if you are experimenting some bugs, just let me know in the bugtracker or by sending an email to charles-edouard@coste.dev. To avoid intellectual property problems and to be sure I can keep this code under a Free (as Freedom) licence, code contribution will not be accepted until I find an easy process for contributor licence agreement.
Licence
Copyright (C) 2019-2021 Charles-Édouard Coste
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License along with this program. If not, see http://www.gnu.org/licenses/.