repo2 / query-reactor
Asynchronous & non-blocking MySQL queries executor.
Requires
- php: >=5.4
- psr/log: *
- repo2/query-builder: *
This package is not auto-updated.
Last update: 2020-09-15 19:25:34 UTC
README
Query Reactor is a non-blocking MySQL queries executor. The framework is simple and fast.
All you need is implement Query
or use GenericQuery
.
use Psr\Log\NullLogger; use Repo2\QueryBuilder; use Repo2\QueryReactor; $driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger()); $controller = new QueryReactor\Controller\PoolingController([ 'host' => 'localhost', 'username' => 'root', 'passwd' => '', 'dbname' => 'test' ]); $reactor = new QueryReactor\Reactor($driver, $controller); $expression = QueryBuilder\select('user', ['id', 'name']); $query = new QueryReactor\Query\GenericQuery( $expression, // on fulfill function (QueryReactor\Result $result) { foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } }, // on error function (\Exception $err) { throw $err; } ); $reactor->execQuery($query); $reactor->await();
Table of contents
Installation
Install it with Composer:
{ "require": { "repo2/query-reactor": "*" } }
Components
The library requires repo2/query-builder.
Driver
The Driver
provides integration with low level DB API.
The API must support non-blocking queries execution.
Currently the framework implements
mysqli
driver only.
Driver usage without Reactor and Controller:
use Psr\Log\NullLogger; use Repo2\QueryBuilder; use Repo2\QueryReactor; $driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger()); $expression = QueryBuilder\select('user', ['id', 'name']); $link = $driver->connect( ['host' => 'localhost', 'dbname' => 'test'], 'root', 'some_secret_passwd' ); $driver->query($link, $expression); do { list($read, $error) = $driver->poll([$link]); foreach ($error as $link) { throw $driver->error($link); } foreach ($read as $link) { $result = $driver->read($link); foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } } } while (!$read && !$error);
Controller
The Controller
provides coordination between driver connection and query execution.
The framework includes PoolingController
.
The controller provides basic logic for connection pooling and query queue.
Query
The Query
provides query definition and result processing.
getExpression
The method returns query expression.
function Query::getExpression()
returns
ExpressionInterface
.
resolve
The method processes the query result and can create a subquery.
function Query::resolve(QueryReactor\Result $result)
returns
\Iterator
,Query
ornull
Example of GenericQuery:
use Repo2\QueryBuilder; use Repo2\QueryReactor; $query = new QueryReactor\Query\GenericQuery( // select all users QueryBuilder\select('user', ['id', 'name']), // on fulfill function (QueryReactor\Result $result) { foreach ($result->traverse() as $row) { // output a user echo $row['id'], ' -> ', $row['name'], PHP_EOL; yield new QueryReactor\Query\GenericQuery( // update account amount by random value QueryBuilder\update('account', ['amount' => mt_rand(10, 100)]) ->where( QueryBuilder\equal('user_id', $row['id']) ) ) } } ); $reactor = new QueryReactor\Reactor($driver, $controller); $reactor->execQuery($query); $reactor->await();
reject
The method processes the query error.
function Query::reject(\Exception $error)
returns void
Sharding
The framework supports sharding by ShardingController
.
You should do 3 simple steps for getting started in the sharding:
-
implement
ShardedQuery
use Repo2\QueryBuilder; use Repo2\QueryReactor; class UserQuery implements QueryReactor\Query, QueryReactor\Sharding\ShardedQuery { public static $table = 'user'; public $id; public function resolve(QueryReactor\Result $result) { foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } } public function reject(\Exception $err) { throw $err; } public function getExpression() { return QueryBuilder\select(self::$table, ['id', 'name']) ->where(QueryBuilder\equal('id', $this->id)); } public function getDistributionName() { return self::$table; } public function getDistributionId() { return $this->id; } }
-
create own
ShardingService
use Repo2\QueryReactor; class SimpleShardingService implements QueryReactor\Sharding\ShardingService { public static $primary = [ 'host' => 'localhost', 'username' => 'root', 'passwd' => '', 'dbname' => 'test' ]; public static $shards = [ 'user' => [ ['id' => 1, 'dbname' => 'test1'], ['id' => 2, 'dbname' => 'test2'], ['id' => 3, 'dbname' => 'test3'] ] ]; public function selectGlobal() { return self::$primary; } public function selectShard($distributionName, $distributionValue) { $shards = self::$shards[$distributionName]; $shard = $shards[$distributionValue % count($shards)]; return $shard + self::$primary; } }
-
init the controller
use Repo2\QueryReactor; $controller = new QueryReactor\Sharding\ShardingController( new SimpleShardingService(), QueryReactor\Controller\PoolingController::class ); $reactor = new QueryReactor\Reactor($driver, $controller); $reactor->execQuery(new UserQuery($userId)); $reactor->await();
Restrictions
The framework has some restrictions:
- No prepared statements.
- No "last insert id" in results.