liangtao/kafka-php

PHP client for Kafka version 2.7.1

1.0.2 2022-08-18 03:22 UTC

This package is auto-updated.

Last update: 2024-10-18 08:02:34 UTC


README

Build Status Packagist Packagist Packagist GitHub issues GitHub forks GitHub stars GitHub license

Kafka-php 使用纯粹的PHP 编写的 kafka 客户端,目前支持 2.7.1 以上版本的 Kafka,使用 PHP 异步执行的方式来和kafka broker 交互,由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本

安装环境要求

  • PHP version >= 8.0.12
  • Kafka version >= 2.7.1

使用 Composer 安装

composer require liangtao/kafka-php

添加 composer 依赖 liangtao/kafka-php 到项目的 composer.json 文件中即可,如:

{
	"require": {
		"liangtao/kafka-php": ">=1.0.0"
	}
}

配置

配置参数见 配置

Produce

异步回调方式调用

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setBrokerVersion('0.9.0.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(function() {
	return array(
		array(
			'topic' => 'test',
			'value' => 'test....message.',
			'key' => 'testkey',
		),
	);
});
$producer->setLogger($logger);
$producer->success(function($result) {
	var_dump($result);
});
$producer->error(function($errorCode) {
	var_dump($errorCode);
});
$producer->send(true);

同步方式调用生产者

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192');
$config->setBrokerVersion('0.9.0.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
$producer->setLogger($logger);

for($i = 0; $i < 100; $i++) {
        $result = $producer->send(array(
                array(
                        'topic' => 'test1',
                        'value' => 'test1....message.',
                        'key' => '',
                ),
        ));
        var_dump($result);
}

Consumer

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setTopics(array('test'));
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

Basic Protocol

基础协议 API 调用方式见 Example