coste/kafka-iterator

Let you consume a Kafka topic with a simple foreach loop

v0.2.1 2021-11-30 14:28 UTC

This package is auto-updated.

Last update: 2025-06-29 02:07:21 UTC


README

What is KafkaConsumerIterator.php ?

KafkaConsumerIterator.php is an abstraction layer for Rdkafka extension, written in PHP.

Introduction

Using Rdkafka makes your code coupled to a not so stable API… This library provides a class which complies with PHP Iterator interface so consuming a Kafka topic is as easy as a foreach loop and can be combined with other iterators.

This can simplify your business logic and unit testing as there is nothing easier than replacing a Traversable object by an other one.

Installation

composer require coste/kafka-iterator

Examples

Classic usage

<?php

use Coste\Kafka;

$consumer = new ConsumerIterator([
    'topics' => [
        'topic_1',
        'topic_2',
    ],
    'group_id' => 'consumer1',
    'brokers' => [
        '127.0.0.1:9092',
        '127.0.0.2:9092',
    ],
]);

foreach ($consumer as $key => $message) {
    try {

        // 1. Processing a message
        my_complex_process_which_can_fail($message);

    } catch (RuntimeError $e) {

        // 2. Some exception in my process occured
        put_the_message_somewhere($message);

    } catch (Exception $e) {

        // 3. If I want, I can still break the loop.
        //    If I restart the loop, this message will be read again
        //    as it was not treated.
        break;
    }

    // 4. Each loop commits the offset
}

Switching iterator

<?php

use Coste\Kafka;

if ($topics) {
    $it = new ConsumerIterator([
        'topics' => $topics,
        // …
    ]);
} else {
    $it = new \ArrayIterator([
        "message 1",
        "message 2",
    ]);
}

foreach ($it as $message) {
    // …
}

Decorated iterator

<?php

use Coste\Kafka;

$it = new ConsumerIterator([
    'topics' => $topics,
    // …
]);

$it = new \CallbackFilterIterator($it, function($current) {
    return !empty($current);
});
    
$it = new \CachingIterator($it);
    
foreach($it as $message) {
    // …
}

Use SSL security

<?php

$consumer = new ConsumerIterator([
    'topics' => [
        'topic_1',
        'topic_2'
    ],
    'group_id' => 'consumer1',
    'brokers' => [
        '127.0.0.1:9092',
        '127.0.0.2:9092',
    ],
    'ssl' => [
        'ca' => [
            'location' => '/etc/my_ca.pem',
        ],
        'certificate' => [
             'location' => '/etc/my_certificate.pem',
        ],
        'key' => [
            'location' => '/etc/my_key.pem'],

            // add key password, only if needed
            'password' => 'aabbcc123',
        ],
    ],
]);

// …

Contributing

If you have some ideas of improvement or if you are experimenting some bugs, just let me know in the bugtracker or by sending an email to charles-edouard@coste.dev. To avoid intellectual property problems and to be sure I can keep this code under a Free (as Freedom) licence, code contribution will not be accepted until I find an easy process for contributor licence agreement.

Licence

AGPL

Copyright (C) 2019-2021 Charles-Édouard Coste

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program. If not, see http://www.gnu.org/licenses/.