anktx / kafka-client
PHP wrapper for RdKafka
Installs: 22
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 0
Open Issues: 0
pkg:composer/anktx/kafka-client
Requires
- php: ^8.4
- ext-rdkafka: *
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- infection/infection: *
- phpstan/phpstan: ^2.0
- phpstan/phpstan-strict-rules: ^2.0
- phpunit/phpunit: ^12.0
README
Обёртка над ext-rdkafka для работы с Apache Kafka на PHP. Библиотека предоставляет простой и удобный интерфейс для продюсирования и консьюминга сообщений.
Требования
- PHP 8.4+
- ext-rdkafka
Установка
composer require anktx/kafka-client
Быстрый старт
Producer
use Anktx\Kafka\Client\Config\ProducerConfig; use Anktx\Kafka\Client\Config\Enum\CompressionType; use Anktx\Kafka\Client\KafkaProducer; use Anktx\Kafka\Client\KafkaMessage\KafkaProducerMessage; $producer = new KafkaProducer( new ProducerConfig( brokers: 'kafka:9092', compressionType: CompressionType::snappy, ) ); $producer->produce( new KafkaProducerMessage( topic: 'events', body: json_encode(['event' => 'order_created', 'id' => 123]), key: 'order-123', headers: ['source' => 'api'], ) ); $producer->flush();
Consumer
use Anktx\Kafka\Client\Config\ConsumerConfig; use Anktx\Kafka\Client\Config\Enum\OffsetReset; use Anktx\Kafka\Client\ConsumeResult\KafkaConsumeTimeout; use Anktx\Kafka\Client\KafkaConsumer; use Anktx\Kafka\Client\KafkaMessage\KafkaConsumerMessage; use Anktx\Kafka\Client\TopicSubscription\TopicSubscription; use Anktx\Kafka\Client\TopicSubscription\TopicSubscriptionList; $consumer = new KafkaConsumer( new ConsumerConfig( brokers: 'kafka:9092', groupId: 'order-processor', instanceId: 'worker-1', offsetReset: OffsetReset::latest, ) ); $consumer->subscribe( new TopicSubscriptionList( new TopicSubscription(topic: 'events'), ) ); while (true) { $result = $consumer->consume(); if ($result instanceof KafkaConsumerMessage) { echo $result->body . "\n"; // ... обработка сообщения ... $consumer->commit($result); } }
Message Stream
Для более чистого кода используйте генератор:
use Anktx\Kafka\Client\KafkaMessageStream; $stream = new KafkaMessageStream($consumer); foreach ($stream->stream() as $message) { // Только сообщения, без обработки таймаутов/EOF echo $message->body . "\n"; $consumer->commit($message); }
Стратегии опроса (Poll Strategies)
При отправке сообщений они попадают в локальную очередь, а затем асинхронно отправляются в Kafka. Метод poll() обслуживает эту очередь — обрабатывает отчёты о доставке и освобождает память. Если не вызывать poll(), очередь может переполниться.
Стратегии определяют, когда вызывать poll():
use Anktx\Kafka\Client\PollStrategy\TimeoutPollStrategy; use Anktx\Kafka\Client\PollStrategy\ProbabilityPollStrategy; // Опрос каждые N секунд $producer = new KafkaProducer( $config, new TimeoutPollStrategy(pollIntervalSec: 1), ); // Опрос с вероятностью N (0.0 - 1.0) $producer = new KafkaProducer( $config, new ProbabilityPollStrategy(probability: 0.1), );
Доступные стратегии:
NeverPoolStrategy— не вызыватьpoll()(по умолчанию, подходит для низкой нагрузки)TimeoutPollStrategy— вызыватьpoll()каждые N секундProbabilityPollStrategy— вызыватьpoll()с вероятностью N (например, 10% вызовов)
Конфигурация
ProducerConfig
$config = new ProducerConfig( brokers: string, // Обязательно queueBufferingMaxKBytes: int, // По умолчанию: 20480 batchSize: int, // По умолчанию: 102400 lingerMs: int, // По умолчанию: 10 compressionType: CompressionType, // По умолчанию: snappy isDebug: bool, // По умолчанию: false logger: LoggerInterface, // По умолчанию: NullLogger );
ConsumerConfig
$config = new ConsumerConfig( brokers: string, // Обязательно groupId: string, // Обязательно instanceId: string, // Обязательно offsetReset: OffsetReset, // По умолчанию: earliest autoCommitMs: ?int, // По умолчанию: null (ручной коммит) sessionTimeoutMs: ?int, // По умолчанию: null isDebug: bool, // По умолчанию: false logger: LoggerInterface, // По умолчанию: NullLogger );
Типы возвращаемых значений
Метод consume() возвращает union type:
KafkaConsumerMessage— успешно полученное сообщениеKafkaConsumeTimeout— таймаут (нет новых сообщений)KafkaPartitionEof— достигнут конец партиции
Пример обработки:
$result = $consumer->consume(1000); if ($result instanceof KafkaConsumerMessage) { // Обработка сообщения $consumer->commit($result); } elseif ($result instanceof KafkaConsumeTimeout) { // Нет сообщений, можно продолжить работу }
Структура проекта
src/
├── Config/ # Конфигурация
│ ├── ConsumerConfig.php # Конфигурация консьюмера
│ ├── ProducerConfig.php # Конфигурация продюсера
│ └── Enum/ # Перечисления
│ ├── CompressionType.php # Типы компрессии (snappy, gzip, lz4, zstd)
│ └── OffsetReset.php # Стратегия сброса оффсета (latest, earliest)
│
├── ConsumeResult/ # Результаты консьюминга
│ ├── KafkaConsumeTimeout.php # Таймаут (нет сообщений)
│ └── KafkaPartitionEof.php # Достигнут конец партиции
│
├── Exception/ # Исключения
│ ├── Business/ # Бизнес-логика
│ ├── Kafka/ # Ошибки Kafka
│ └── Logic/ # Логические ошибки
│
├── KafkaMessage/ # Сообщения
│ ├── AbstractMessage.php # Базовый класс
│ ├── KafkaConsumerMessage.php # Сообщение консьюмера
│ └── KafkaProducerMessage.php # Сообщение продюсера
│
├── PollStrategy/ # Стратегии опроса очереди
│ ├── PollStrategy.php # Интерфейс стратегии
│ ├── NeverPoolStrategy.php # Не вызывать poll()
│ ├── ProbabilityPollStrategy.php # Вызывать с вероятностью N
│ └── TimeoutPollStrategy.php # Вызывать каждые N секунд
│
├── TopicSubscription/ # Подписки на топики
│ ├── TopicSubscription.php # Одна подписка
│ └── TopicSubscriptionList.php # Список подписок
│
├── KafkaConsumer.php # Главный класс консьюмера
├── KafkaProducer.php # Главный класс продюсера
└── KafkaMessageStream.php # Генератор для стриминга сообщений
Обработка исключений
Библиотека использует иерархию исключений:
Exception
├── KafkaException # Ошибки Kafka
│ ├── KafkaConnectionException # Потеряно соединение
│ ├── KafkaConsumerException # Ошибка консьюмера
│ └── KafkaProducerException # Ошибка продюсера
├── LogicException # Логические ошибки
│ └── NotSubscribedException # Не подписан на топики
└── BusinessException # Бизнес-логика
├── EmptySubscriptionsException # Пустой список подписок
└── TopicHasNoPartitionException # Топик не имеет партиций
Пример обработки:
try { $producer->produce($message); $producer->flush(); } catch (KafkaConnectionException $e) { // Потеряно соединение с Kafka } catch (KafkaProducerException $e) { // Ошибка отправки сообщения }