
Maketok Data-Migration engine

0.3.1 2018-09-06 14:19 UTC

This package is auto-updated.

Last update: 2024-10-29 04:52:57 UTC


Build Status Scrutinizer Code Quality Code Coverage

This package is aimed at migrating large chunks of data across different resources.

Some of the influences:

It's perfect to plan and execute a complex import or export. It also can be used as a transport tool to migrate your data from one structure to another. The unit (workers) structure allows it to parse a complex import file and extract a multi-level data structure from it.

The same's true for reverse process as well: the multi-level data structure can be inserted into a single file.


Use composer to include it in your project:

composer require maketok/datamigration


There are few examples in tests/integration/QueueWorkflowTest integration test. Here's the typical uses.


Import customers and addresses from flat CSV file.

use Maketok\DataMigration\MapInterface;
use Maketok\DataMigration\QueueWorkflow;
use Maketok\DataMigration\Unit\SimpleBag;
use Maketok\DataMigration\Unit\Type\Unit;
use Maketok\DataMigration\Storage\Db\ResourceHelperInterface;
use Maketok\DataMigration\Action\Type\CreateTmpFiles;
use Maketok\DataMigration\Action\Type\Load;
use Maketok\DataMigration\Action\Type\Move;
use Maketok\DataMigration\Input\Shaper\Processor\Nulls;

$customerUnit = new Unit('customers');
    'id' => 'map.customer_id'
    // ExpressionLanguage is used to interpret string expressions
    'email' => '',
    // closure or any other callable is also acceptable
    'name' => function (MapInterface $map) {
        return $map['name'];
    'age' => 'map.age',
 * the is_entity condition resolves whether
 *  unit should consider current row as the entity
 * some utility functions are available in
 *  Maketok\DataMigration\Expression\HelperExpressionsProvider
$customerUnit->setIsEntityCondition("trim( is not empty");
 * the contributions is the way for unit to
 *  add some data into general pool for every other unit to use
 * This is the logic for assigning customer_id
 * First it checks if it exists in the pre-compiled Hashmap
 * If it does not, it's calling for frozen increment for "new_customer_id" key
 *  and assign the last increment id if it's non existent
 * The frozenIncr is different from incr in that it's incremented only once
 *  is_entity condition resolves for current row
 * So it's perfect for incrementing "parent" entities
$customerUnit->addContribution(function (
    MapInterface $map,
    ResourceHelperInterface $resource,
    array $hashmaps
    ) {
        if (isset($hashmaps['email-id'][trim($map->email)])) {
            $map['customer_id'] = $hashmaps['email-id'][trim($map->email)];
        } else {
            $map['customer_id'] = $map->frozenIncr(

$addressUnit = new Unit('addresses');
    'id' => 'map.incr('address_id', resource.getLastIncrementId('addresses'))'
    'street' => 'map.street',
    'city' => '',
    'zip' => '',
    'parent_id' => 'map.customer_id',
$bag = new SimpleBag();
$bag->addSet([$customerUnit, $addressUnit]);

 * Last but not least, since we're using CSV file, we need a Shaper
 * instance to shape up our flat file before feeding it to CreateTmpFiles action
$input = new Csv($fname, 'r', new Nulls($bag, new ArrayMap(), $this->getLanguageAdapter()));

$workflow = new QueueWorkflow($config, $result);
$workflow->add(new CreateTmpFiles($bag, $config, $languageAdapter,
    $input, new ArrayMap(), $helperResource));
$workflow->add(new Load($bag, $config, $resource));
$workflow->add(new Move($bag, $config, $resource));


We have 3 DB tables for customers and their addresses.




We want to get next output:


use Maketok\DataMigration\MapInterface;
use Maketok\DataMigration\QueueWorkflow;
use Maketok\DataMigration\Unit\SimpleBag;
use Maketok\DataMigration\Unit\Type\Unit;
use Maketok\DataMigration\Storage\Db\ResourceHelperInterface;
use Maketok\DataMigration\Action\Type\AssembleInput;
use Maketok\DataMigration\Action\Type\Load;
use Maketok\DataMigration\Action\Type\Move;
use Maketok\DataMigration\Input\Shaper\Processor\Nulls;

$customerUnit = new Unit('customers');
    'id' => 'map.customer_id'
    'email' => '',
    'age' => 'map.age',
$customerUnit->setIsEntityCondition("trim( is not empty");
    (isset(hashmaps['email-id'][trim(]) ?
        hashmaps['email-id'][trim(] :
  'customer_id' => 'id',
    'email' => '',
    'age' => 'map.age',

$customerDataUnit = new Unit('customer_data');
    'id' => 'map.incr('customer_data_id', resource.getLastIncrementId('customer_data'))'
    'parent_id' => 'map.customer_id'
    'firstname' => 'map.firstname',
    'lastname' => 'map.lastname',
    explode(' ',
    (count(map.complexName) >= 2 && isset(map.complexName[0]) ? map.complexName[0] :
    (count(map.complexName) >= 2 && isset(map.complexName[1]) ? map.complexName[1] : '')
    'customer_id' => 'parent_id',
    'name' => 'map.firstname ~ " " ~ map.lastname',

$addressUnit = new Unit('addresses');
    'id' => 'map.incr('address_id', resource.getLastIncrementId('address'))'
    'customer_id' => 'map.customer_id',
    'street' => 'map.street',
    'city' => '',
    'zip' => '',
    'customer_id' => 'customer_id',
    'street' => 'map.street',
    'city' => '',
    'zip' => '',
$bag = new SimpleBag();
$bag->addSet([$customerUnit, $customerDataUnit, $addressUnit]);

$input = new Csv($fname, 'w', new Nulls($bag, new ArrayMap(), $this->getLanguageAdapter()));

$result = new Result();
$workflow = new QueueWorkflow($this->config, $result);
$workflow->add(new ReverseMove($bag, $config, $resource));
$workflow->add(new Dump($bag, $config, $resource));
$workflow->add(new AssembleInput($bag, $config, $languageAdapter, $input, new ArrayMap()));