lireincore / kafka-rest-client
Kafka rest client
Installs: 1 292
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 0
Forks: 3
Open Issues: 1
Requires
- php: >=7.1
- ext-json: *
- psr/http-client: ^1.0
- psr/http-factory: ^1.0
- psr/http-message: ^1.0
- psr/log: ^1.0
This package is auto-updated.
Last update: 2024-11-22 10:02:28 UTC
README
About
Kafka rest client for confluent rest proxy v2
Install
Add the "lireincore/kafka-rest-client": "dev-master"
package to your require
section in the composer.json
file
or
$ php composer.phar require lireincore/kafka-rest-client dev-master
Usage
use Psr\Log\LoggerInterface; use Psr\Http\Client\ClientInterface; use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\RequestFactoryInterface; use LireinCore\KafkaRestClient\Client; use LireinCore\KafkaRestClient\Producer; use LireinCore\KafkaRestClient\Consumer; use LireinCore\KafkaRestClient\KafkaRestException; use LireinCore\KafkaRestClient\Request\SendMessagesRequest; use LireinCore\KafkaRestClient\Request\ConsumerCreateRequest; use LireinCore\KafkaRestClient\Request\ConsumerAssignmentRequest; use LireinCore\KafkaRestClient\Request\GetMessagesRequest; //$client implements Psr\Http\Client\ClientInterface //$requestFactory implements Psr\Http\Message\RequestFactoryInterface //$streamFactory implements Psr\Http\Message\StreamFactoryInterface //$logger implements Psr\Log\LoggerInterface $kafkaClient = new Client('rest-host:8082', $client, $requestFactory, $streamFactory, $logger); //produce message $producer = new Producer($kafkaClient); $request = (new SendMessagesRequest('test_topic')) ->addRecord('test value'); $response = $producer->send($request); /***************************************************************/ //consume message $consumer = new Consumer($kafkaClient); $consumerCreateRequest = new ConsumerCreateRequest('test_group'); $consumerCreateResponse = $consumer->create($consumerCreateRequest); $consumerAssignmentRequest = (new ConsumerAssignmentRequest()) ->addPartition('test_topic', 0); $consumer->assign($consumerAssignmentRequest, $consumerCreateResponse); $getMessagesRequest = new GetMessagesRequest(); $messages = $consumer->pool($getMessagesRequest, $consumerCreateResponse); if ($messages) { //...custom process messages //commit last offsets $consumerCommitOffsetsRequest = $consumer->createConsumerCommitOffsetsRequest($messages); try { $consumer->commit($consumerCommitOffsetsRequest, $consumerCreateResponse); } catch (KafkaRestException $ex) { var_dump($ex->getMessage()); } } $consumer->delete($consumerCreateResponse);
License
The MIT License (MIT). Please see License File for more information.