
Agnostic model to efficiently query and scroll any kind of data (SQL, Search engine, HTTP API, CSV, ...) and push them anywhere with a ETL

0.9.14 2020-08-11 15:10 UTC


Simple multi-database library to search content on different engines and aggregate those results into document-oriented structures.

The library define the class MKCG\Model\DBAL\QueryEngine to build documents using different Drivers.

It also defines a simple ETL to be able to easily and efficiently synchronize content between different datasources.

Engine API

The QueryEngine API define two methods : query() and scroll().

Each one can fetch and build documents using the provided \MKCG\Model\Model with the appropriate \MKCG\Model\DBAL\QueryCriteria. However, the scroll() method return a \Generator and internally performs multiple batches to efficiently scroll big collections.


$model = Schema\User::make('default', 'user')

$criteria = (new QueryCriteria())
        ->addFilter('status', FilterInterface::FILTER_IN, [ 2 , 3 , 5 , 7 ])
        ->addFilter('registered_at', FilterInterface::FILTER_GREATER_THAN_EQUAL, '2000-01-01')
        ->addSort('firstname', 'ASC')
        ->addSort('lastname', 'ASC')
        ->addFilter('title', FilterInterface::FILTER_FULLTEXT_MATCH, 'ab')

$users = $engine->query($model, $criteria);

echo json_encode($users->getContent(), JSON_PRETTY_PRINT) . "\n";
echo "\nFound : " . $users->getCount() . " users\n";

$iterator = $engine->scroll($model, $criteria);

foreach ($iterator as $user) {
    echo json_encode($user, JSON_PRETTY_PRINT) . "\n";


Drivers definition

Each Driver is responsible to perform queries on a single datasource (database, HTTP API, local files, ...) and must :

  • implements the MKCG\Model\DBAL\Drivers\DriverInterface
  • registered inside the QueryEngine


use MKCG\Model\DBAL\QueryEngine;
use MKCG\Model\DBAL\Drivers;

$mongoClient = new MongoDB\Client('mongodb://root:password@mongodb');

$redisClient = new \Predis\Client([
    'scheme' => 'tcp',
    'host' => 'redisearch',
    'port' => 6379

$sqlConnection = \Doctrine\DBAL\DriverManager::getConnection([
    'user' => 'root',
    'password' => 'root',
    'host' => 'mysql',
    'driver' => 'pdo_mysql',

$engine = (new QueryEngine('mysql'))
    ->registerDriver(new Drivers\Doctrine($sqlConnection), 'mysql')
    ->registerDriver(new Drivers\CsvReader($fixturePath), 'csv')
    ->registerDriver(new Drivers\RssReader(new Adapters\Guzzle), 'rss')
    ->registerDriver(new Drivers\SitemapReader(new Adapters\Guzzle), 'sitemap')
    ->registerDriver(new Drivers\Http(new Adapters\Guzzle), 'http')
    ->registerDriver(new Drivers\HttpRobot(new Adapters\Guzzle), 'http_robot')
    ->registerDriver(new Drivers\MongoDB($mongoClient), 'mongodb')

Runtime behaviors


Features supported by driver

Query criteria options

HTTP-based drivers :

  • HTTP
  • HttpRobot
  • RssReader
  • SitemapReader
  • Elasticsearch

Result-based filterable drivers :

  • CsvReader
  • RssReader
  • SitemapReader

When both url_generator and url are provided, then only url_generator is used.


Constants are defined by the interface MKCG\Model\DBAL\FilterInterface

Filters supported by driver

CUSTOM filter type

Custom filters can be applied by providing a callable to the QueryCriteria instance :

(new QueryCriteria())
        ->addCallableFilter(function(Query $query, ...$arguments) {
            // do something

The first argument of the callable SHOULD always be the Query instance. Other arguments might change depending on the driver.

Some Driver apply filters on fetched results and expect a false return value when the filter does not match. Internaly they apply a array_filter on each fetched result before :

  • CsvReader
  • RssReader
  • SitemapReader

callable arguments by Driver


A deadly simple ETL is defined as a single class \MKCG\Model\ETL. It can be used in combination with the QueryEngine scroll API to transform then push content to different loaders;


function pipelineEtl(QueryEngine $engine)
    $model = Schema\Product::make('default', 'products');
    $criteria = (new QueryCriteria())
            ->addFilter('sku.color', FilterInterface::FILTER_IN, ['aqua', 'purple'])

    $iterator = $engine->scroll($model, $criteria, 100);

    $pushed = ETL::extract($engine->scroll($model, $criteria, 100), 1000, 500)
        ->transform(function($item) {
            return [
                'id' => $item['_id'],
                'sku' => $item['sku']
        ->transform(function($item) {
            return $item + [
                'sku_count' => count($item['sku'] ?? [])
        ->load(function(iterable $bulk) {
            echo sprintf("[ETL] Loader 1 - Loading %d elements\n", count($bulk));
        ->load(function(iterable $bulk) {
            echo sprintf("[ETL] Loader 2 - Loading %d elements\n", count($bulk));
        ->load(function(iterable $bulk) {
            echo sprintf("[ETL] Loader 3 - Loading %d elements\n", count($bulk));

    echo sprintf("[ETL] Pushed %d elements\n", $pushed);


Aggregrations supported

Test and examples

No tests are provided although some will be made using Behat for the release of the version 1.0.0. However a fully functionnal example is provided in examples/ and build documents using different kinds of Drivers

From : ./examples

docker-compose up --build -d
docker exec -it php_query_model sh -c "cd /home/php-query-model/examples && composer install"
docker exec -it php_query_model sh -c "php /home/php-query-model/examples/index.php"

By default this will run only two functions (located in index.php)

// searchProducts($engine);
// searchGithubRobot($engine);
// searchSitemaps($engine);
// searchPackages($engine);
// searchUsers($engine);
// searchHackerNews($engine);

The pipelineEtl use the Engine scroll API to iterates a list of Product stored in MongoDB and apply different transformationsbefore pushing content with three loaders using the ETL component.

The searchOrder use the Engine scroll API to :

  • scan a CSV file containing ecommerce Order
    • then inject their corresponding Product stored on MongoDB
    • then inject their correspondng customers stored as User into MySQL
      • with their first two defined Address also stored in Mysql
      • and all their Post stored in Mysql

You might want to uncomment the other search functions to execute HTTP queries and fetch :


Expected features

Work In progress


Drivers "nice to have"

Database Drivers

  • Algolia
  • ArangoDB
  • Cassandra
  • \Illuminate\Eloquent (library used by Laravel)
  • Neo4J
  • PostgreSQL
  • ScyllaDB
  • Solr


  • Kafka
  • MySQL binlog
  • RabbitMQ


  • AWS S3
  • File system
  • OpenIO


  • AWS
  • OVH


  • Cloudinary
  • Sendinblue

Social Network

  • Facebook
  • LinkedIn
  • Twitter


Feel free to open a merge request for any suggestion or to contribute to this project.