seanmorris/ksqlc

The Asynchronous PHP KSQL Connector

0.1.1 2024-02-25 14:30 UTC

This package is auto-updated.

Last update: 2024-12-25 17:20:51 UTC


README

avatar

SeanMorris/Ksqlc

/keɪ ɛs kyu ɛl si/ • The Asynchronous PHP KSQL Connector

Ksqlc provides a PHP interface to Confluent KSQL & Apache Kafka.

Ksqlc is free for distribution, modification, and use under the Apache-2.0 license.

Docs | Github | Packagist

seanmorris/ksqlc Apache-2.0 Licence Badge CircleCI Size badge Installs Coverage Min Supported PHP Version

Supports PHP 7.0 - PHP 8.3!

Installation

Install via the composer cli:

$ composer require seanmorris/ksqlc

... or add seanmorris/ksqlc to your composer.json:

	"require": {
		"seanmorris/ksqlc": "dev-master"
	}

Usage

Open a connection

Grab the URL to your KSQL server's REST endpoint, and use it to create a new Ksqlc object to begin:

<?php
use \SeanMorris\Ksqlc\Ksqlc;

$ksqlc = new Ksqlc('http://your-ksql-server:8088/');

Ksqlc::stream() - Stream Queries Asynchronously

KSQLDB will push query results to you asynchronously when you're using Ksqlc::stream().

Ksqlc will return streaming queries as generators. These can be iterated with foreach. Results will stream in until a limit is reached or the programmer breaks the loop and destroys the reference.

<?php
$stream = $ksqlc->stream('SELECT * FROM EVENT_STREAM EMIT CHANGES');

foreach($stream as $row)
{
	// $row == {"ROWKEY": "XXX", "ROWTIME": "YYY", ...}
	if($row->property === 'something')
	{
		break;
	}
}

unset($stream);

Ksqlc::multiplex() - Stream Mutliple Queries

You can loop over multiple queries at once with Ksqlc::multiplex(). Each parameter to this method represents either a string query or a list of parameters to send to Ksqlc::stream().

<?php
$queryOne = 'SELECT * FROM EVENTS WHERE BODY = "AAA" EMIT CHANGES LIMIT 20';
$queryTwo = 'SELECT * FROM STREAM WHERE BODY = "BBB" EMIT CHANGES LIMIT 20';

$stream = $ksqlc->multiplex(
	[$queryOne, 'earliest'],
	[$queryTwo, 'earliest']
);

foreach($stream as $row)
{
	/* Stream processing... */
}

Limits

Queries with limits will terminate when the given number of rows have been iterated.

Multiplexed queries will terminate when all limits have been reached.

<?php
$stream = $ksqlc->stream('SELECT * FROM EVENT_STREAM EMIT CHANGES LIMIT 20');

foreach($stream as $row)
{
	/* Stream processing... */
}

Offset Reset

Streaming queries will ONLY select new records by default. Use the second param to Ksqlc::stream() to process all records from the beginning of time.

<?php
$stream = $ksqlc->stream($queryString, 'earliest'); ## process everything
$stream = $ksqlc->stream($queryString, 'latest');   ## process new records

Full asyncronicity

Passing TRUE to the third parameter of Ksqlc::stream() allows you to turn on full asyncronous mode.

In this example, the foreach loop will spin indefinitely until the query returns 20 records and completes. If there is no data to process, a stream of NULL's will be supplied. This allows you to tend to other, unrelated streams in the same loop, or even break the loop and resume processing later on.

<?php
$query  = 'SELECT * FROM EVENT_STREAM EMIT CHANGES LIMIT 20';
$stream = $ksqlc->stream($queryString, 'latest', TRUE);

foreach($stream as $row)
{
	var_dump($row);
}

Ksqlc::run() - Run a KSQL statment

You'll do things like create or drop tables and streams with this method. Any statement that isnt a direct SELECT should be passed to Ksqlc::run().

Ksqlc::run will return an iterable object of results with metadata properties:

<?php
$results = $ksqlc->run('SHOW TABLES');
var_dump( $results );
// object SeanMorris\Ksqlc\Result {
//  $type          => "tables"
//  $warnings      => {}
//  $statementText => "SHOW TABLES"
// }

foreach($results as $table)
{
	var_dump( $table );
	// object stdClass {
	// 	$type       => "TABLE"
	// 	$name       => "event_table"
	// 	$topic      => "event_table"
	// 	$format     => "JSON"
	// 	$isWindowed => false
	// }
}

You can also use list destructuring to get the results of multiple queries all at once:

<?php
[$streams, $tables] = $ksqlc->run('SHOW STREAMS', 'SHOW TABLES');

foreach($streams as $stream)
{
	// ...
}

foreach($tables as $table)
{
	// ...
}

SeanMorris/Ksqlc

Copyright 2020 - 2024 Sean Morris

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.