repo2/query-reactor

This package is abandoned and no longer maintained. No replacement package was suggested.

Asynchronous & non-blocking MySQL queries executor.

v1.1 2015-02-08 23:41 UTC

This package is not auto-updated.

Last update: 2020-09-15 19:25:34 UTC


README

Build Status Latest Stable Version Total Downloads License Scrutinizer Code Quality Code Coverage

Query Reactor is a non-blocking MySQL queries executor. The framework is simple and fast. All you need is implement Query or use GenericQuery.

use Psr\Log\NullLogger;
use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger());

$controller = new QueryReactor\Controller\PoolingController([
    'host' => 'localhost',
    'username' => 'root',
    'passwd' => '',
    'dbname' => 'test'
]);

$reactor = new QueryReactor\Reactor($driver, $controller);

$expression = QueryBuilder\select('user', ['id', 'name']);

$query = new QueryReactor\Query\GenericQuery(
    $expression,
    // on fulfill
    function (QueryReactor\Result $result) {
        foreach ($result->traverse() as $row) {
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
        }
    },
    // on error
    function (\Exception $err) {
        throw $err;
    }
);

$reactor->execQuery($query);

$reactor->await();

Table of contents

  1. Installation
  2. Components
  3. Sharding
  4. Restrictions

Installation

Install it with Composer:

{
    "require": {
        "repo2/query-reactor": "*"
    }
}

Components

The library requires repo2/query-builder.

Driver

The Driver provides integration with low level DB API. The API must support non-blocking queries execution.

Currently the framework implements mysqli driver only.

Driver usage without Reactor and Controller:

use Psr\Log\NullLogger;
use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger());

$expression = QueryBuilder\select('user', ['id', 'name']);

$link = $driver->connect(
    ['host' => 'localhost', 'dbname' => 'test'],
    'root',
    'some_secret_passwd'
);

$driver->query($link, $expression);

do {
    list($read, $error) = $driver->poll([$link]);
    foreach ($error as $link) {
        throw $driver->error($link);
    }
    foreach ($read as $link) {
        $result = $driver->read($link);
        foreach ($result->traverse() as $row) {
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
        }
    }
} while (!$read && !$error);

Controller

The Controller provides coordination between driver connection and query execution.

The framework includes PoolingController. The controller provides basic logic for connection pooling and query queue.

Query

The Query provides query definition and result processing.

getExpression

The method returns query expression.

function Query::getExpression()

returns ExpressionInterface.

resolve

The method processes the query result and can create a subquery.

function Query::resolve(QueryReactor\Result $result)

returns \Iterator, Query or null

Example of GenericQuery:

use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$query = new QueryReactor\Query\GenericQuery(
    // select all users
    QueryBuilder\select('user', ['id', 'name']),
    // on fulfill
    function (QueryReactor\Result $result) {
        foreach ($result->traverse() as $row) {
            // output a user
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
            yield new QueryReactor\Query\GenericQuery(
                // update account amount by random value
                QueryBuilder\update('account', ['amount' => mt_rand(10, 100)])
                ->where(
                    QueryBuilder\equal('user_id', $row['id'])
                )
            )
        }
    }
);

$reactor = new QueryReactor\Reactor($driver, $controller);
$reactor->execQuery($query);
$reactor->await();

reject

The method processes the query error.

function Query::reject(\Exception $error)

returns void

Sharding

The framework supports sharding by ShardingController.

You should do 3 simple steps for getting started in the sharding:

  1. implement ShardedQuery

    use Repo2\QueryBuilder;
    use Repo2\QueryReactor;
    
    class UserQuery implements QueryReactor\Query, QueryReactor\Sharding\ShardedQuery
    {
        public static $table = 'user';
    
        public $id;
    
        public function resolve(QueryReactor\Result $result)
        {
            foreach ($result->traverse() as $row) {
                echo $row['id'], ' -> ', $row['name'], PHP_EOL;
            }
        }
    
        public function reject(\Exception $err)
        {
            throw $err;
        }
    
        public function getExpression()
        {
            return QueryBuilder\select(self::$table, ['id', 'name'])
            ->where(QueryBuilder\equal('id', $this->id));
        }
    
        public function getDistributionName()
        {
            return self::$table;
        }
    
        public function getDistributionId()
        {
            return $this->id;
        }
    }
  2. create own ShardingService

    use Repo2\QueryReactor;
    
    class SimpleShardingService implements QueryReactor\Sharding\ShardingService
    {
        public static $primary = [
            'host' => 'localhost',
            'username' => 'root',
            'passwd' => '',
            'dbname' => 'test'
        ];
    
        public static $shards = [
            'user' => [
                ['id' => 1, 'dbname' => 'test1'],
                ['id' => 2, 'dbname' => 'test2'],
                ['id' => 3, 'dbname' => 'test3']
            ]
        ];
    
        public function selectGlobal()
        {
            return self::$primary;
        }
    
        public function selectShard($distributionName, $distributionValue)
        {
            $shards = self::$shards[$distributionName];
            $shard = $shards[$distributionValue % count($shards)];
            return $shard + self::$primary;
        }
    }
  3. init the controller

    use Repo2\QueryReactor;
    
    $controller = new QueryReactor\Sharding\ShardingController(
        new SimpleShardingService(),
        QueryReactor\Controller\PoolingController::class
    );
    
    $reactor = new QueryReactor\Reactor($driver, $controller);
    $reactor->execQuery(new UserQuery($userId));
    $reactor->await();

Restrictions

The framework has some restrictions:

  • No prepared statements.
  • No "last insert id" in results.

Source: https://github.com/Repo2/query-reactor