anktx/kafka-client

PHP wrapper for RdKafka

Installs: 22

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 1

Forks: 0

Open Issues: 0

pkg:composer/anktx/kafka-client

0.3.1 2026-01-26 16:32 UTC

This package is auto-updated.

Last update: 2026-01-26 16:34:40 UTC


README

Обёртка над ext-rdkafka для работы с Apache Kafka на PHP. Библиотека предоставляет простой и удобный интерфейс для продюсирования и консьюминга сообщений.

Требования

  • PHP 8.4+
  • ext-rdkafka

Установка

composer require anktx/kafka-client

Быстрый старт

Producer

use Anktx\Kafka\Client\Config\ProducerConfig;
use Anktx\Kafka\Client\Config\Enum\CompressionType;
use Anktx\Kafka\Client\KafkaProducer;
use Anktx\Kafka\Client\KafkaMessage\KafkaProducerMessage;

$producer = new KafkaProducer(
    new ProducerConfig(
        brokers: 'kafka:9092',
        compressionType: CompressionType::snappy,
    )
);

$producer->produce(
    new KafkaProducerMessage(
        topic: 'events',
        body: json_encode(['event' => 'order_created', 'id' => 123]),
        key: 'order-123',
        headers: ['source' => 'api'],
    )
);

$producer->flush();

Consumer

use Anktx\Kafka\Client\Config\ConsumerConfig;
use Anktx\Kafka\Client\Config\Enum\OffsetReset;
use Anktx\Kafka\Client\ConsumeResult\KafkaConsumeTimeout;
use Anktx\Kafka\Client\KafkaConsumer;
use Anktx\Kafka\Client\KafkaMessage\KafkaConsumerMessage;
use Anktx\Kafka\Client\TopicSubscription\TopicSubscription;
use Anktx\Kafka\Client\TopicSubscription\TopicSubscriptionList;

$consumer = new KafkaConsumer(
    new ConsumerConfig(
        brokers: 'kafka:9092',
        groupId: 'order-processor',
        instanceId: 'worker-1',
        offsetReset: OffsetReset::latest,
    )
);

$consumer->subscribe(
    new TopicSubscriptionList(
        new TopicSubscription(topic: 'events'),
    )
);

while (true) {
    $result = $consumer->consume();

    if ($result instanceof KafkaConsumerMessage) {
        echo $result->body . "\n";
        // ... обработка сообщения ...

        $consumer->commit($result);
    }
}

Message Stream

Для более чистого кода используйте генератор:

use Anktx\Kafka\Client\KafkaMessageStream;

$stream = new KafkaMessageStream($consumer);

foreach ($stream->stream() as $message) {
    // Только сообщения, без обработки таймаутов/EOF
    echo $message->body . "\n";
    $consumer->commit($message);
}

Стратегии опроса (Poll Strategies)

При отправке сообщений они попадают в локальную очередь, а затем асинхронно отправляются в Kafka. Метод poll() обслуживает эту очередь — обрабатывает отчёты о доставке и освобождает память. Если не вызывать poll(), очередь может переполниться.

Стратегии определяют, когда вызывать poll():

use Anktx\Kafka\Client\PollStrategy\TimeoutPollStrategy;
use Anktx\Kafka\Client\PollStrategy\ProbabilityPollStrategy;

// Опрос каждые N секунд
$producer = new KafkaProducer(
    $config,
    new TimeoutPollStrategy(pollIntervalSec: 1),
);

// Опрос с вероятностью N (0.0 - 1.0)
$producer = new KafkaProducer(
    $config,
    new ProbabilityPollStrategy(probability: 0.1),
);

Доступные стратегии:

  • NeverPoolStrategy — не вызывать poll() (по умолчанию, подходит для низкой нагрузки)
  • TimeoutPollStrategy — вызывать poll() каждые N секунд
  • ProbabilityPollStrategy — вызывать poll() с вероятностью N (например, 10% вызовов)

Конфигурация

ProducerConfig

$config = new ProducerConfig(
    brokers: string,                    // Обязательно
    queueBufferingMaxKBytes: int,       // По умолчанию: 20480
    batchSize: int,                     // По умолчанию: 102400
    lingerMs: int,                      // По умолчанию: 10
    compressionType: CompressionType,   // По умолчанию: snappy
    isDebug: bool,                      // По умолчанию: false
    logger: LoggerInterface,            // По умолчанию: NullLogger
);

ConsumerConfig

$config = new ConsumerConfig(
    brokers: string,                    // Обязательно
    groupId: string,                    // Обязательно
    instanceId: string,                 // Обязательно
    offsetReset: OffsetReset,           // По умолчанию: earliest
    autoCommitMs: ?int,                 // По умолчанию: null (ручной коммит)
    sessionTimeoutMs: ?int,             // По умолчанию: null
    isDebug: bool,                      // По умолчанию: false
    logger: LoggerInterface,            // По умолчанию: NullLogger
);

Типы возвращаемых значений

Метод consume() возвращает union type:

  • KafkaConsumerMessage — успешно полученное сообщение
  • KafkaConsumeTimeout — таймаут (нет новых сообщений)
  • KafkaPartitionEof — достигнут конец партиции

Пример обработки:

$result = $consumer->consume(1000);

if ($result instanceof KafkaConsumerMessage) {
    // Обработка сообщения
    $consumer->commit($result);
} elseif ($result instanceof KafkaConsumeTimeout) {
    // Нет сообщений, можно продолжить работу
}

Структура проекта

src/
├── Config/                          # Конфигурация
│   ├── ConsumerConfig.php           # Конфигурация консьюмера
│   ├── ProducerConfig.php           # Конфигурация продюсера
│   └── Enum/                        # Перечисления
│       ├── CompressionType.php      # Типы компрессии (snappy, gzip, lz4, zstd)
│       └── OffsetReset.php          # Стратегия сброса оффсета (latest, earliest)
│
├── ConsumeResult/                   # Результаты консьюминга
│   ├── KafkaConsumeTimeout.php      # Таймаут (нет сообщений)
│   └── KafkaPartitionEof.php        # Достигнут конец партиции
│
├── Exception/                       # Исключения
│   ├── Business/                    # Бизнес-логика
│   ├── Kafka/                       # Ошибки Kafka
│   └── Logic/                       # Логические ошибки
│
├── KafkaMessage/                    # Сообщения
│   ├── AbstractMessage.php          # Базовый класс
│   ├── KafkaConsumerMessage.php     # Сообщение консьюмера
│   └── KafkaProducerMessage.php     # Сообщение продюсера
│
├── PollStrategy/                    # Стратегии опроса очереди
│   ├── PollStrategy.php             # Интерфейс стратегии
│   ├── NeverPoolStrategy.php        # Не вызывать poll()
│   ├── ProbabilityPollStrategy.php  # Вызывать с вероятностью N
│   └── TimeoutPollStrategy.php      # Вызывать каждые N секунд
│
├── TopicSubscription/               # Подписки на топики
│   ├── TopicSubscription.php        # Одна подписка
│   └── TopicSubscriptionList.php    # Список подписок
│
├── KafkaConsumer.php                # Главный класс консьюмера
├── KafkaProducer.php                # Главный класс продюсера
└── KafkaMessageStream.php           # Генератор для стриминга сообщений

Обработка исключений

Библиотека использует иерархию исключений:

Exception
├── KafkaException                    # Ошибки Kafka
│   ├── KafkaConnectionException      # Потеряно соединение
│   ├── KafkaConsumerException        # Ошибка консьюмера
│   └── KafkaProducerException        # Ошибка продюсера
├── LogicException                    # Логические ошибки
│   └── NotSubscribedException        # Не подписан на топики
└── BusinessException                 # Бизнес-логика
    ├── EmptySubscriptionsException   # Пустой список подписок
    └── TopicHasNoPartitionException  # Топик не имеет партиций

Пример обработки:

try {
    $producer->produce($message);
    $producer->flush();
} catch (KafkaConnectionException $e) {
    // Потеряно соединение с Kafka
} catch (KafkaProducerException $e) {
    // Ошибка отправки сообщения
}