meitu / php-consumergroup
php kafka consumer
Requires (Dev)
- phpunit/phpunit: 5.6.*
This package is not auto-updated.
Last update: 2025-04-23 02:53:35 UTC
README
php-consumergroup is a kafka consumer library with group and rebalance supports.
Requirements
- Apache Kafka 0.7.x, 0.8.x, 0.9.x, 0.10.x
Dependencies
- php-zookeeper
- php_rdkafka (1.0.0 is recommended)
- librdkafka (0.9.1 is recommended)
Performance
78,000+
messages/s for single process
more detail benchmark
Example
- installing this library via composer
<?php
use MTKafka\Consumer;
function call_back_func($msg) {
echo "$msg->payload\n";
}
function handle_error_call_back($msg) {
echo $msg->errstr();
}
$consumer = New Consumer("localhost:2181");
$consumer->setGroupId("group-test");
$consumer->setTopic("topic-test");
$consumer->setOffsetAutoReset(Consumer::SMALLEST);
$consumer->setErrHandler("handle_error_call_back");
try {
$consumer->start("echo_message");
}
catch(Exception $e) {
printf("error: %s\n", $e->getMessage());
}
see example.php
Consumer Options
Consumer::setMaxMessage()
Number, defaults to 32
If partitions > 1, it forces consumers to switch to other partitons when max message is reached, or other partitons will be starved
Consumer::setCommitInterval()
Millisecond, defaults to 500ms
Offset auto commit interval.
Consumer::setWatchInterval()
Millisecond, defaults to 10,000 ms
Time interval to check rebalance. Rebalance is triggered when the number of partition or consumer changes.
Consumer::setConsumeTimeout()
Millisecond, default is 1,000 ms
Kafka request timeout.
Consumer::setClientId()
String, defaults to "default"
Client id is used to identify consumers.
Consumer::setOffsetAutoReset()
smallest|largest
, defaults to smallest
Consumer can choose whether to fetch the oldest or the lastest message when offset isn't present in zookeeper or is out of range.
Consumer::setConf()
Attribute and value are passed in
We can use this function to modify the librdkafka configuration。
more detail about librdkafka configuration.
Exception
- Recoverable exceptption (e.g. request timeout), error handler will be called, and you can log error messages.
- Unrecoverable exception (e.g. kafka/zookeeper is broken), exceptions will be thrown, and you should log message and stop the consumer.
Benchmark
Type | Parmeter |
---|---|
CPU | Intel(R) Xeon(R) CPU E5-2420 0 @ 1.90GHz |
CPU Core | 24 |
Memory | 16G |
Disk | SSD |
Network | 1000Mbit/s |
Os | CentOS release 6.7 |
Benchmark is measured by produring 20,000,000
messages at single partition, and calculate the time it takes to consume those messages.
QPS is 78,000
messages/s when process cpu utility is 100%
.