
Maketok Data-Migration engine

0.3.1 2018-09-06 14:19 UTC

This package is auto-updated.

Last update: 2025-03-01 00:13:53 UTC


Build Status Scrutinizer Code Quality Code Coverage

This package is aimed at migrating large chunks of data across different resources.

Some of the influences:

It's perfect to plan and execute a complex import or export. It also can be used as a transport tool to migrate your data from one structure to another. The unit (workers) structure allows it to parse a complex import file and extract a multi-level data structure from it.

The same's true for reverse process as well: the multi-level data structure can be inserted into a single file.


Use composer to include it in your project:

composer require maketok/datamigration


There are few examples in tests/integration/QueueWorkflowTest integration test. Here's the typical uses.


Import customers and addresses from flat CSV file.

email name age street city zip Bart Robbinson 32 20 Chestnut Terrace New York 07003 Kale D 15 123 Manson LA 90023
111 Dale str. Chicago 60333
use Maketok\DataMigration\MapInterface;
use Maketok\DataMigration\QueueWorkflow;
use Maketok\DataMigration\Unit\SimpleBag;
use Maketok\DataMigration\Unit\Type\Unit;
use Maketok\DataMigration\Storage\Db\ResourceHelperInterface;
use Maketok\DataMigration\Action\Type\CreateTmpFiles;
use Maketok\DataMigration\Action\Type\Load;
use Maketok\DataMigration\Action\Type\Move;
use Maketok\DataMigration\Input\Shaper\Processor\Nulls;

$customerUnit = new Unit('customers');
    'id' => 'map.customer_id'
    // ExpressionLanguage is used to interpret string expressions
    'email' => '',
    // closure or any other callable is also acceptable
    'name' => function (MapInterface $map) {
        return $map['name'];
    'age' => 'map.age',
 * the is_entity condition resolves whether
 *  unit should consider current row as the entity
 * some utility functions are available in
 *  Maketok\DataMigration\Expression\HelperExpressionsProvider
$customerUnit->setIsEntityCondition("trim( is not empty");
 * the contributions is the way for unit to
 *  add some data into general pool for every other unit to use
 * This is the logic for assigning customer_id
 * First it checks if it exists in the pre-compiled Hashmap
 * If it does not, it's calling for frozen increment for "new_customer_id" key
 *  and assign the last increment id if it's non existent
 * The frozenIncr is different from incr in that it's incremented only once
 *  is_entity condition resolves for current row
 * So it's perfect for incrementing "parent" entities
$customerUnit->addContribution(function (
    MapInterface $map,
    ResourceHelperInterface $resource,
    array $hashmaps
    ) {
        if (isset($hashmaps['email-id'][trim($map->email)])) {
            $map['customer_id'] = $hashmaps['email-id'][trim($map->email)];
        } else {
            $map['customer_id'] = $map->frozenIncr(

$addressUnit = new Unit('addresses');
    'id' => 'map.incr('address_id', resource.getLastIncrementId('addresses'))'
    'street' => 'map.street',
    'city' => '',
    'zip' => '',
    'parent_id' => 'map.customer_id',
$bag = new SimpleBag();
$bag->addSet([$customerUnit, $addressUnit]);

 * Last but not least, since we're using CSV file, we need a Shaper
 * instance to shape up our flat file before feeding it to CreateTmpFiles action
$input = new Csv($fname, 'r', new Nulls($bag, new ArrayMap(), $this->getLanguageAdapter()));

$workflow = new QueueWorkflow($config, $result);
$workflow->add(new CreateTmpFiles($bag, $config, $languageAdapter,
    $input, new ArrayMap(), $helperResource));
$workflow->add(new Load($bag, $config, $resource));
$workflow->add(new Move($bag, $config, $resource));


We have 3 DB tables for customers and their addresses.


id email age
1 32
2 15


id parent_id firstname lastname
1 1 Bart Robinson
2 2 Kale Dager


id customer_id street city zip
1 1 20 Chestnut Terrace New York 07003
2 1 123 Manson LA 90023
3 2 111 Dale str. Chicago 60333

We want to get next output:


email name age street city zip Bart Robbinson 32 20 Chestnut Terrace New York 07003 Kale D 15 123 Manson LA 90023
111 Dale str. Chicago 60333
use Maketok\DataMigration\MapInterface;
use Maketok\DataMigration\QueueWorkflow;
use Maketok\DataMigration\Unit\SimpleBag;
use Maketok\DataMigration\Unit\Type\Unit;
use Maketok\DataMigration\Storage\Db\ResourceHelperInterface;
use Maketok\DataMigration\Action\Type\AssembleInput;
use Maketok\DataMigration\Action\Type\Load;
use Maketok\DataMigration\Action\Type\Move;
use Maketok\DataMigration\Input\Shaper\Processor\Nulls;

$customerUnit = new Unit('customers');
    'id' => 'map.customer_id'
    'email' => '',
    'age' => 'map.age',
$customerUnit->setIsEntityCondition("trim( is not empty");
    (isset(hashmaps['email-id'][trim(]) ?
        hashmaps['email-id'][trim(] :
  'customer_id' => 'id',
    'email' => '',
    'age' => 'map.age',

$customerDataUnit = new Unit('customer_data');
    'id' => 'map.incr('customer_data_id', resource.getLastIncrementId('customer_data'))'
    'parent_id' => 'map.customer_id'
    'firstname' => 'map.firstname',
    'lastname' => 'map.lastname',
    explode(' ',
    (count(map.complexName) >= 2 && isset(map.complexName[0]) ? map.complexName[0] :
    (count(map.complexName) >= 2 && isset(map.complexName[1]) ? map.complexName[1] : '')
    'customer_id' => 'parent_id',
    'name' => 'map.firstname ~ " " ~ map.lastname',

$addressUnit = new Unit('addresses');
    'id' => 'map.incr('address_id', resource.getLastIncrementId('address'))'
    'customer_id' => 'map.customer_id',
    'street' => 'map.street',
    'city' => '',
    'zip' => '',
    'customer_id' => 'customer_id',
    'street' => 'map.street',
    'city' => '',
    'zip' => '',
$bag = new SimpleBag();
$bag->addSet([$customerUnit, $customerDataUnit, $addressUnit]);

$input = new Csv($fname, 'w', new Nulls($bag, new ArrayMap(), $this->getLanguageAdapter()));

$result = new Result();
$workflow = new QueueWorkflow($this->config, $result);
$workflow->add(new ReverseMove($bag, $config, $resource));
$workflow->add(new Dump($bag, $config, $resource));
$workflow->add(new AssembleInput($bag, $config, $languageAdapter, $input, new ArrayMap()));