ccinn / kafka-swoole
Kafka client for php
Installs: 5
Dependents: 0
Suggesters: 0
Security: 0
Stars: 34
Watchers: 3
Forks: 7
Open Issues: 0
Type:application
Requires
- php: ^7.1
- ext-json: *
- ext-posix: *
- ext-sockets: *
- ccinn/kafka-swoole-core: dev-master
Requires (Dev)
- swoft/swoole-ide-helper: dev-master
This package is auto-updated.
Last update: 2021-08-24 07:42:54 UTC
README
🌈The first php-kafka client to support multiple compression forms
Implement all kafka protocols, providing 'HighLevel' and 'LowLevel' client apis respectively, and utilize swoole to realize collaboration and flexibly extend consumers' client
If you would like to contribute code to help me speed up my progress, please contact me at email:471113744@qq.com
core framework:kafka-swoole-core
Install
by composer
version=dev-master;composer create-project ccinn/kafka-swoole kafka-swoole ${version}
by git
git clone https://github.com/whiteCcinn/kafka-swoole.git && cd kafka-swoole && composer install
docker
docker build -t kafka-swoole:latest . docker run -it --name kafka-swoole -v $(PWD):/data/www kafka-swoole:latest bash
rendering
A member of the consumer group
Topic:caiwenhui PartitionCount:1 ReplicationFactor:1 Configs: Topic: caiwenhui Partition: 0 Leader: 1004 Replicas: 1004 Isr: 1004
Multiple members of the consumer group
Topic:kafka-swoole PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafka-swoole Partition: 0 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002 Topic: kafka-swoole Partition: 1 Leader: 1004 Replicas: 1004,1003 Isr: 1004,1003 Topic: kafka-swoole Partition: 2 Leader: 1001 Replicas: 1001,1004 Isr: 1001,1004 Topic: kafka-swoole Partition: 3 Leader: 1002 Replicas: 1002,1001 Isr: 1001,1002
- KAFKA_CLIENT_CONSUMER_NUM=2
- KAFKA_CLIENT_CONSUMER_NUM=4
Command
Produce
php bin/kafka-client kafka.produce [options] [--] <message>
php bin/kafka-client kafka.produce --help Description: Send a message Usage: kafka.produce [options] [--] <message> Arguments: message The message you wish to send. Options: -t, --topic[=TOPIC] Which is the topic you want to send? -p, --partition[=PARTITION] Which is the topic you want to send to partition? -k, --key[=KEY] Which is the topic you want to send to partition by key? -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug Help: This command will help you send separate messages to a topic..
DescribeGroups
See consumer group details
Description: See consumer group details Usage: kafka.describeGroups [options] Options: -t, --topic=TOPIC Which topic is subscribed by the consumer group? -g, --group=GROUP Which consumer group? -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug Help: See consumer group details...
php bin/kafka-client kafka.describeGroups -t mulog_clean_24 -g kafka-swoole DescribeGruops-BaseInfo ======================= -------------- ------------ -------------- -------------- groupId groupState protocolType protocolData -------------- ------------ -------------- -------------- kafka-swoole Stable consumer Range -------------- ------------ -------------- -------------- --------------------------------------------------- -------------- -------------- ---------------- ----------- memberId clientId clientHost topcic paritions --------------------------------------------------- -------------- -------------- ---------------- ----------- kafka-swoole-44857c49-b019-439b-90dd-d71112b2c01e kafka-swoole /192.167.8.2 mulog_clean_24 0,1 --------------------------------------------------- -------------- -------------- ---------------- ----------- --------------------------------------------------- -------------- -------------- ---------------- ----------- memberId clientId clientHost topcic paritions --------------------------------------------------- -------------- -------------- ---------------- ----------- kafka-swoole-5714cd77-a0dd-4d29-aa20-718f9d713908 kafka-swoole /192.167.8.2 mulog_clean_24 2,3 --------------------------------------------------- -------------- -------------- ---------------- -----------
Rpc
Support real-time acquisition of data in runtime, interaction through RPC protocol with AF_UNIX interprocess communication
php bin/kafka-client rpc -h
Description:
Built-in runtime RPC command
Usage:
rpc <type>
Arguments:
type which you want to execute command?
Options:
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
Help:
The following are the built-in RPC command options:
kafka_lag
offset_checker
block_size
member_leader
metadata_brokers
metadata_topics
- kafka_lag(Check the total difference between the current offset and the maximum offset in kafka)
php bin/kafka-client rpc kafka_lag 1000
- offset_checker(View the details of the current offset and the offset and difference in the kafka service for each partition of the topic)
php bin/kafka-client rpc offset_checker -------------- ----------- ---------------- ------------------ ----------------- topic partition current-offset kafka-max-offset remaining-count -------------- ----------- ---------------- ------------------ ----------------- kafka-swoole 2 50223 50223 0 kafka-swoole 3 70353 70353 0 kafka-swoole 0 52395 52395 0 kafka-swoole 1 50407 50407 0 -------------- ----------- ---------------- ------------------ -----------------
- block_size(If you are using storage media in indirect mode, you can use this command to see the current number of storage media)
php bin/kafka-client rpc block_size 254
- member_leader(View the Leader of the current consumer group)
php bin/kafka-client rpc member_leader --------------------------------------------------- consumer-group-leaderId --------------------------------------------------- kafka-swoole-da43c9a0-b12d-46df-9941-ee80456ec9a2 --------------------------------------------------- --------------------------------------------------- consumer-group-membersId --------------------------------------------------- kafka-swoole-6080eb8e-3bfb-4be0-a923-037bb99a2666 kafka-swoole-da43c9a0-b12d-46df-9941-ee80456ec9a2 ---------------------------------------------------
- metadata_brokers(View available brokers for the kafka service)
php bin/kafka-client rpc metadata_brokers --------- --------- ------ node-id host port --------- --------- ------ 1003 mkafka3 9092 1004 mkafka4 9092 1001 mkafka1 9092 1002 mkafka2 9092 --------- --------- ------
- metadata_topicss(See the subscribed topic for more details)
php bin/kafka-client rpc metadata_topics -------------- ----------- ----------- --------------- ----------- topic partition leader-id replica-nodes isr-nodes -------------- ----------- ----------- --------------- ----------- kafka-swoole 2 1001 1001,1004 1001,1004 kafka-swoole 1 1004 1004,1003 1004,1003 kafka-swoole 3 1002 1002,1001 1002,1001 kafka-swoole 0 1003 1003,1002 1002,1003 -------------- ----------- ----------- --------------- -----------
Consumer
php bin/kafka-client start
Config
Common
Options
FILE: config/common.yaml
# Your kafka version kafka.version: 0.9.0.0 # This is for bootstrapping and the producer will only use it for getting metadata # (topics, partitions and replicas). The socket connections for sending the actual data # will be established based on the broker information returned in the metadata. The # format is host1:port1,host2:port2, and the list can be a subset of brokers or # a VIP pointing to a subset of brokers. metadata.broker.list: "mkafka1:9092,mkafka2:9092,mkafka3:9092,mkafka4:9092" # The producer generally refreshes the topic metadata from brokers when there is a failure # (partition missing, leader not available...). It will also poll regularly (default: every 10min # so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. # If you set this to zero, the metadata will get refreshed after each message sent (not recommended) # Important note: the refresh happen only AFTER the message is sent, so if the producer never sends # a message the metadata is never refreshed topic.metadata.refresh.interval.ms: 60 * 10 * 1000 # a string that uniquely identifies a set of consumers within the same consumer group group.id: "kafka-swoole" # Which you want to operation # Example: topic1,topic2,topic3,topic4,topic5 topic.names: "kafka-swoole" # The periodic time the heartbeat request is sent to tell kafka that the client still exists heartbeat.interval.ms: 10 * 1000 # The maximum allowed time within a group for which no heartbeat is sent # So this value must be greater than the heartbeat sending cycle:heartbeat.interval.ms # (default: 30s) group.keep.session.max.ms: 30 * 1000 # Select a strategy for assigning partitions to consumer streams. See ProtocolPartitionAssignmentStrategyEnum::class # Range: # RoundRobin: # Sticky: partition.assignment.strategy: "Range" # highLevelApi the frequency in ms that the consumer offsets are committed to zookeeper auto.commit.interval.ms: 10 * 1000 # smallest : automatically reset the offset to the smallest offset # largest : automatically reset the offset to the largest offset auto.offset.reset: largest
Support compression protocol
- nomoral
- gzip
- snappy (If you need snappy compressed data, you will need to install an additional php-ext-snappy extension)
Protocol
- Produce
- Fetch
- ListOffsets
- Metadata
- LeaderAndIsr
- StopReplica
- UpdateMetadata
- ControlledShutdown
- OffsetCommit
- OffsetFetch
- FindCoordinator
- JoinGroup
- Heartbeat
- Heartbeat
- LeaveGroup
- SyncGroup
- DescribeGroups
- ListGroups
- ListGroups
- SaslHandshake
- ApiVersions
- CreateTopics
- DeleteTopics
- DeleteRecords
- InitProducerId
- OffsetForLeaderEpoch
- AddPartitionsToTxn
- AddOffsetsToTxn
- EndTxn
- WriteTxnMarkers
- TxnOffsetCommit
- DescribeAcls
- CreateAcls
- DeleteAcls
- DescribeConfigs
- AlterConfigs
- AlterReplicaLogDirs
- DescribeLogDirs
- SaslAuthenticate
- CreatePartitions
- CreateDelegationToken
- RenewDelegationToken
- ExpireDelegationToken
- DescribeDelegationToken
- DeleteGroups
- ElectPreferredLeaders
- IncrementalAlterConfigs
usage
The idea here is that the examples are initiated based on the API protocol, not the client API. As an example, here we start a request for ListOffsetsRequest.
$protocol = new ListOffsetsRequest(); $partitions = []; array_push($partitions, (new PartitionsListsOffsets())->setPartition(Int32::value(0)) ->setMaxNumOffsets(Int32::value(10)) ->setTimestamp(Int64::value(time())) ); $topics = []; array_push($topics, (new TopicsListsOffsets())->setTopic(String16::value('caiwenhui')) ->setPartitions($partitions) ); $protocol->setRequestHeader( (new RequestHeader())->setApiVersion(Int16::value(ProtocolVersionEnum::API_VERSION_0)) ->setClientId(String16::value('kafka-swoole')) ->setCorrelationId(Int32::value(ProtocolEnum::LIST_OFFSETS)) ->setApiKey(Int16::value(ProtocolEnum::LIST_OFFSETS)) ); $protocol->setReplicaId(Int32::value(-1)); $protocol->setTopics($topics); $payload = $protocol->pack(); $n = $socket->send($payload); $data = $socket->recv(); $protocol->response->unpack($data); var_dump($protocol->response,$protocol->response->toArray()); // Here you can see the response protocol of the kafka service /* object(Kafka\Protocol\Response\ListOffsetsResponse)#46 (3) { ["responses":"Kafka\Protocol\Response\ListOffsetsResponse":private]=> array(1) { [0]=> object(Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets)#68 (2) { ["topic":"Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets":private]=> object(Kafka\Protocol\Type\String16)#72 (1) { ["value":protected]=> string(9) "caiwenhui" } ["partitionResponses":"Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets":private]=> array(1) { [0]=> object(Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets)#71 (3) { ["partition":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=> object(Kafka\Protocol\Type\Int32)#79 (1) { ["value":protected]=> int(0) } ["errorCode":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=> object(Kafka\Protocol\Type\Int16)#78 (1) { ["value":protected]=> int(3) } ["offsets":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=> array(0) { } } } } } ["responseHeader":protected]=> object(Kafka\Protocol\Response\Common\ResponseHeader)#58 (1) { ["correlationId":"Kafka\Protocol\Response\Common\ResponseHeader":private]=> object(Kafka\Protocol\Type\Int32)#64 (1) { ["value":protected]=> int(2) } } ["size":protected]=> object(Kafka\Protocol\Type\Int32)#63 (1) { ["value":protected]=> int(33) } } array(3) { ["responses"]=> array(1) { [0]=> array(2) { ["topic"]=> string(9) "caiwenhui" ["partitionResponses"]=> array(1) { [0]=> array(3) { ["partition"]=> int(0) ["errorCode"]=> int(3) ["offsets"]=> array(0) { } } } } } ["responseHeader"]=> array(1) { ["correlationId"]=> int(2) } ["size"]=> int(33) } */
Env
# APP APP_NAME=kafka-swoole ## zh_CN,en_US APP_LANGUGE=en_US # SERVER # The server just only receive rpc request select data in memory SERVER_IP=127.0.0.1 SERVER_PORT=9501 SERVER_REACTOR_NUM=1 SERVER_WORKER_NUM=1 SERVER_MAX_REQUEST=50 # KAFKA_CLIENT_CONSUMER_NUM dynamically changes this parameter based on the partition of the subscribed topic KAFKA_CLIENT_CONSUMER_NUM=2 # KAFKA_CLIENT # Client Process # KAFKA_CLIENT_API_MODE:"HIGH_LEVEL" / "LOW LEVEL" # KAFKA_CLIENT_CONSUMER_NUM: Must be less than the maximum partition in topic KAFKA_CLIENT_API_MODE=LOW_LEVEL # REDIS/FILE/DIRECTLY # If you choose "Directly" mode, the number of processing logical processes is equal to the minimum number of kafka client processes. # The KAFKA_CUSTOM_PROCESS_NUM parameter is ignored. # Make sure your consumption logic consumes as much data as possible, otherwise the rate of consumption will be lower than the rate of production. # The process generated by KAFKA_CUSTOM_PROCESS_NUM gets messages from the storage medium KAFKA_MESSAGE_STORAGE=REDIS # Number of message processing processes KAFKA_SINKER_PROCESS_NUM=2 # Which is your storage redis config KAFKA_STORAGE_REDIS=POOL_REDIS_0 # Redis stores the persistent key KAFKA_STORAGE_REDIS_KEY=${APP_NAME}:storage:redis:messages # Redis persists the maximum number of messages KAFKA_STORAGE_REDIS_LIMIT=40000 # Redis Pool # `POOL_REDIS_NUM` is number,which begin offset is 0 POOL_REDIS_NUM=1 POOL_REDIS_0_MAX_NUMBER=5 POOL_REDIS_0_MAX_IDLE=3 POOL_REDIS_0_MIN_IDLE=0 POOL_REDIS_0_HOST=mredis POOL_REDIS_0_PORT=60379 POOL_REDIS_0_AUTH=uXUxGIyprkel1nYWhCyoCYAT4CNCUW2mXkVcDfhTqetnYSD7 POOL_REDIS_0_DB=0 # other redis config ...
Unit-test
Take the project directory as the root directory.
php vendor/bin/phpunit tests/Protocol/
PHPUnit 7.5.16 by Sebastian Bergmann and contributors. Runtime: PHP 7.1.28 Configuration: /www5/kafka-swoole/phpunit.xml.dist ...................... 22 / 22 (100%) Time: 64 ms, Memory: 6.00 MB OK (22 tests, 22 assertions)