hauptmedia / rdkafka
Installs: 4 086
Dependents: 0
Suggesters: 0
Security: 0
Stars: 8
Watchers: 4
Forks: 6
Open Issues: 0
Requires
- php: >=5.3
- ext-rdkafka: *
This package is auto-updated.
Last update: 2022-12-15 23:59:47 UTC
README
PHP stubs for the php rdkafka extension.
See https://github.com/arnaud-lb/php-rdkafka
Wrapper / Facade objects in the Kafka namespace
You can use the wrapper objects available in the Kafka
namespace for a more PHP-style interface to the RdKafka
objects.
Example for a producer
include_once('vendor/autoload.php'); use Kafka\Configuration\ProducerConfiguration; use Kafka\Manager; $producerConfiguration = (new ProducerConfiguration()) ->setRequestRequiredAcks(0); $manager = new Manager(['hostname']); $topic = $manager->createProducerTopicFacade("test", $producerConfiguration); for($i=1;$i<=1000;$i++) { $topic->produce(0 /* partition */, "test" . time()); }
Example for a consumer
Implement the \Kafka\ConsumerInterface
and add the class to the ConsumerTopicFacade object.
include_once('../vendor/autoload.php'); class MyConsumer implements \Kafka\ConsumerInterface { /** * @param string $topic Topic name * @param int $partition Partition * @param int $offset Message offset * @param string $key Optional message key * @param string $payload Message payload * @return mixed */ public function consume($topic, $partition, $offset, $key, $payload) { echo "Received message with payload " . $payload; } } $consumerConfiguration = (new ConsumerConfiguration()) ->setAutoCommitIntervalMs(1000) ->setOffsetStoreSyncIntervalMs(60); $manager = new Manager(['hostname']); $consumerTopic = $manager->createConsumerTopicFacade("test", $consumerConfiguration); $consumerTopic->addConsumer(new MyConsumer()); $consumerTopic->consumeStart(0 /* partition */, 0 /* offset */); $consumerTopic->consume(0 /* partition */, 1000 /* timeout in ms */); $consumerTopic->consumeStop(0 /* partition */);