seanmorris / ksqlc
The Asynchronous PHP KSQL Connector
Installs: 6 784
Dependents: 0
Suggesters: 0
Security: 0
Stars: 12
Watchers: 2
Forks: 5
Open Issues: 2
Requires
- php: ^7.0
Requires (Dev)
- phpunit/phpunit: ^6
This package is auto-updated.
Last update: 2024-12-25 17:20:51 UTC
README
SeanMorris/Ksqlc
/keɪ ɛs kyu ɛl si/ • The Asynchronous PHP KSQL Connector
Ksqlc provides a PHP interface to Confluent KSQL & Apache Kafka.
Ksqlc is free for distribution, modification, and use under the Apache-2.0 license.
Supports PHP 7.0 - PHP 8.3!
Installation
Install via the composer cli:
$ composer require seanmorris/ksqlc
... or add seanmorris/ksqlc
to your composer.json
:
"require": { "seanmorris/ksqlc": "dev-master" }
Usage
Open a connection
Grab the URL to your KSQL server's REST endpoint, and use it to create a new Ksqlc
object to begin:
<?php use \SeanMorris\Ksqlc\Ksqlc; $ksqlc = new Ksqlc('http://your-ksql-server:8088/');
Ksqlc::stream() - Stream Queries Asynchronously
KSQLDB will push query results to you asynchronously when you're using Ksqlc::stream()
.
Ksqlc will return streaming queries as generators. These can be iterated with foreach
. Results will stream in until a limit is reached or the programmer breaks the loop and destroys the reference.
<?php $stream = $ksqlc->stream('SELECT * FROM EVENT_STREAM EMIT CHANGES'); foreach($stream as $row) { // $row == {"ROWKEY": "XXX", "ROWTIME": "YYY", ...} if($row->property === 'something') { break; } } unset($stream);
Ksqlc::multiplex() - Stream Mutliple Queries
You can loop over multiple queries at once with Ksqlc::multiplex()
. Each parameter to this method represents either a string query or a list of parameters to send to Ksqlc::stream()
.
<?php $queryOne = 'SELECT * FROM EVENTS WHERE BODY = "AAA" EMIT CHANGES LIMIT 20'; $queryTwo = 'SELECT * FROM STREAM WHERE BODY = "BBB" EMIT CHANGES LIMIT 20'; $stream = $ksqlc->multiplex( [$queryOne, 'earliest'], [$queryTwo, 'earliest'] ); foreach($stream as $row) { /* Stream processing... */ }
Limits
Queries with limits will terminate when the given number of rows have been iterated.
Multiplexed queries will terminate when all limits have been reached.
<?php $stream = $ksqlc->stream('SELECT * FROM EVENT_STREAM EMIT CHANGES LIMIT 20'); foreach($stream as $row) { /* Stream processing... */ }
Offset Reset
Streaming queries will ONLY select new records by default. Use the second param to Ksqlc::stream()
to process all records from the beginning of time.
<?php $stream = $ksqlc->stream($queryString, 'earliest'); ## process everything $stream = $ksqlc->stream($queryString, 'latest'); ## process new records
Full asyncronicity
Passing TRUE
to the third parameter of Ksqlc::stream()
allows you to turn on full asyncronous mode.
In this example, the foreach loop will spin indefinitely until the query returns 20 records and completes. If there is no data to process, a stream of NULL
's will be supplied. This allows you to tend to other, unrelated streams in the same loop, or even break the loop and resume processing later on.
<?php $query = 'SELECT * FROM EVENT_STREAM EMIT CHANGES LIMIT 20'; $stream = $ksqlc->stream($queryString, 'latest', TRUE); foreach($stream as $row) { var_dump($row); }
Ksqlc::run() - Run a KSQL statment
You'll do things like create or drop tables and streams with this method. Any statement that isnt a direct SELECT
should be passed to Ksqlc::run()
.
Ksqlc::run will return an iterable object of results with metadata properties:
<?php $results = $ksqlc->run('SHOW TABLES'); var_dump( $results ); // object SeanMorris\Ksqlc\Result { // $type => "tables" // $warnings => {} // $statementText => "SHOW TABLES" // } foreach($results as $table) { var_dump( $table ); // object stdClass { // $type => "TABLE" // $name => "event_table" // $topic => "event_table" // $format => "JSON" // $isWindowed => false // } }
You can also use list destructuring to get the results of multiple queries all at once:
<?php [$streams, $tables] = $ksqlc->run('SHOW STREAMS', 'SHOW TABLES'); foreach($streams as $stream) { // ... } foreach($tables as $table) { // ... }
SeanMorris/Ksqlc
Copyright 2020 - 2024 Sean Morris
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- https://raw.githubusercontent.com/seanmorris/ksqlc/master/LICENSE
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.