istyle-inc/php-ksql

This package is abandoned and no longer maintained. The author suggests using the ytake/php-ksql package instead.

KSQL is the streaming SQL engine for Apache Kafka. REST Client for php

2.0.0 2020-07-16 01:52 UTC

This package is auto-updated.

Last update: 2021-09-25 13:48:54 UTC


README

Apache kafka / Confluent KSQL REST Client for php

Build Status Coverage Status Scrutinizer Code Quality StyleCI

License Latest Version Total Downloads

What is KSQL

KSQL is the streaming SQL engine for Apache Kafka.

What Is KSQL?

Install

required >= PHP 7.1

$ composer require ytake/php-ksql

Usage

Request Preset

class
Ytake\KsqlClient\Query\CommandStatus
Ytake\KsqlClient\Query\Status
Ytake\KsqlClient\Query\ServerInfo
Ytake\KsqlClient\Query\Ksql
Ytake\KsqlClient\Query\Stream (for stream)

Syntax Reference

Get Command Status

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\CommandStatus;
use Ytake\KsqlClient\Computation\CommandId;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new CommandStatus(CommandId::fromString('stream/MESSAGE_STREAM/create'))
)->result();

Get Statuses

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Status;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new Status())->result();

Get KSQL Server Information

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\ServerInfo;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new ServerInfo())->result();

Query KSQL

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Ksql;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Ksql('DESCRIBE users_original;')
)->result();

Client for Stream Response

<?php

use Ytake\KsqlClient\StreamClient;
use Ytake\KsqlClient\Query\Stream;
use Ytake\KsqlClient\StreamConsumable;
use Ytake\KsqlClient\Entity\StreamedRow;

$client = new StreamClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Stream(
        'SELECT * FROM testing',
        new class() implements StreamConsumable {
            public function __invoke(StreamedRow $row) 
            {
                // stream response consumer
            }
        }    
    )
)->result();