repo2 / query-reactor
Asynchronous & non-blocking MySQL queries executor.
Installs: 65
Dependents: 0
Suggesters: 1
Security: 0
Stars: 51
Watchers: 4
Forks: 2
Open Issues: 0
pkg:composer/repo2/query-reactor
Requires
- php: >=5.4
- psr/log: *
- repo2/query-builder: *
This package is not auto-updated.
Last update: 2020-09-15 19:25:34 UTC
README
Query Reactor is a non-blocking MySQL queries executor. The framework is simple and fast.
All you need is implement Query or use GenericQuery.
use Psr\Log\NullLogger; use Repo2\QueryBuilder; use Repo2\QueryReactor; $driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger()); $controller = new QueryReactor\Controller\PoolingController([ 'host' => 'localhost', 'username' => 'root', 'passwd' => '', 'dbname' => 'test' ]); $reactor = new QueryReactor\Reactor($driver, $controller); $expression = QueryBuilder\select('user', ['id', 'name']); $query = new QueryReactor\Query\GenericQuery( $expression, // on fulfill function (QueryReactor\Result $result) { foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } }, // on error function (\Exception $err) { throw $err; } ); $reactor->execQuery($query); $reactor->await();
Table of contents
Installation
Install it with Composer:
{
"require": {
"repo2/query-reactor": "*"
}
}
Components
The library requires repo2/query-builder.
Driver
The Driver provides integration with low level DB API.
The API must support non-blocking queries execution.
Currently the framework implements
mysqlidriver only.
Driver usage without Reactor and Controller:
use Psr\Log\NullLogger; use Repo2\QueryBuilder; use Repo2\QueryReactor; $driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger()); $expression = QueryBuilder\select('user', ['id', 'name']); $link = $driver->connect( ['host' => 'localhost', 'dbname' => 'test'], 'root', 'some_secret_passwd' ); $driver->query($link, $expression); do { list($read, $error) = $driver->poll([$link]); foreach ($error as $link) { throw $driver->error($link); } foreach ($read as $link) { $result = $driver->read($link); foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } } } while (!$read && !$error);
Controller
The Controller provides coordination between driver connection and query execution.
The framework includes PoolingController.
The controller provides basic logic for connection pooling and query queue.
Query
The Query provides query definition and result processing.
getExpression
The method returns query expression.
function Query::getExpression()
returns
ExpressionInterface.
resolve
The method processes the query result and can create a subquery.
function Query::resolve(QueryReactor\Result $result)
returns
\Iterator,Queryornull
Example of GenericQuery:
use Repo2\QueryBuilder; use Repo2\QueryReactor; $query = new QueryReactor\Query\GenericQuery( // select all users QueryBuilder\select('user', ['id', 'name']), // on fulfill function (QueryReactor\Result $result) { foreach ($result->traverse() as $row) { // output a user echo $row['id'], ' -> ', $row['name'], PHP_EOL; yield new QueryReactor\Query\GenericQuery( // update account amount by random value QueryBuilder\update('account', ['amount' => mt_rand(10, 100)]) ->where( QueryBuilder\equal('user_id', $row['id']) ) ) } } ); $reactor = new QueryReactor\Reactor($driver, $controller); $reactor->execQuery($query); $reactor->await();
reject
The method processes the query error.
function Query::reject(\Exception $error)
returns void
Sharding
The framework supports sharding by ShardingController.
You should do 3 simple steps for getting started in the sharding:
-
implement
ShardedQueryuse Repo2\QueryBuilder; use Repo2\QueryReactor; class UserQuery implements QueryReactor\Query, QueryReactor\Sharding\ShardedQuery { public static $table = 'user'; public $id; public function resolve(QueryReactor\Result $result) { foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } } public function reject(\Exception $err) { throw $err; } public function getExpression() { return QueryBuilder\select(self::$table, ['id', 'name']) ->where(QueryBuilder\equal('id', $this->id)); } public function getDistributionName() { return self::$table; } public function getDistributionId() { return $this->id; } }
-
create own
ShardingServiceuse Repo2\QueryReactor; class SimpleShardingService implements QueryReactor\Sharding\ShardingService { public static $primary = [ 'host' => 'localhost', 'username' => 'root', 'passwd' => '', 'dbname' => 'test' ]; public static $shards = [ 'user' => [ ['id' => 1, 'dbname' => 'test1'], ['id' => 2, 'dbname' => 'test2'], ['id' => 3, 'dbname' => 'test3'] ] ]; public function selectGlobal() { return self::$primary; } public function selectShard($distributionName, $distributionValue) { $shards = self::$shards[$distributionName]; $shard = $shards[$distributionValue % count($shards)]; return $shard + self::$primary; } }
-
init the controller
use Repo2\QueryReactor; $controller = new QueryReactor\Sharding\ShardingController( new SimpleShardingService(), QueryReactor\Controller\PoolingController::class ); $reactor = new QueryReactor\Reactor($driver, $controller); $reactor->execQuery(new UserQuery($userId)); $reactor->await();
Restrictions
The framework has some restrictions:
- No prepared statements.
- No "last insert id" in results.