maketok / datamigration
Maketok Data-Migration engine
Installs: 27 729
Dependents: 0
Suggesters: 0
Security: 0
Stars: 5
Watchers: 3
Forks: 3
Open Issues: 1
Requires
- php: >=5.4.0
- doctrine/dbal: 2.*
- fzaninotto/faker: ~1
- symfony/expression-language: 2.*
Requires (Dev)
- mikey179/vfsstream: ~1.0
- phpunit/dbunit: ~1
- phpunit/phpunit: 4.*
README
This package is aimed at migrating large chunks of data across different resources.
Some of the influences:
It's perfect to plan and execute a complex import or export. It also can be used as a transport tool to migrate your data from one structure to another. The unit (workers) structure allows it to parse a complex import file and extract a multi-level data structure from it.
The same's true for reverse process as well: the multi-level data structure can be inserted into a single file.
Installation
Use composer to include it in your project:
composer require maketok/datamigration
Examples
There are few examples in tests/integration/QueueWorkflowTest
integration test.
Here's the typical uses.
Import
Import customers and addresses from flat CSV file.
use Maketok\DataMigration\MapInterface; use Maketok\DataMigration\QueueWorkflow; use Maketok\DataMigration\Unit\SimpleBag; use Maketok\DataMigration\Unit\Type\Unit; use Maketok\DataMigration\Storage\Db\ResourceHelperInterface; use Maketok\DataMigration\Action\Type\CreateTmpFiles; use Maketok\DataMigration\Action\Type\Load; use Maketok\DataMigration\Action\Type\Move; use Maketok\DataMigration\Input\Shaper\Processor\Nulls; $customerUnit = new Unit('customers'); $customerUnit->setTable('customers'); $customerUnit->setMapping([ 'id' => 'map.customer_id' // ExpressionLanguage is used to interpret string expressions 'email' => 'mail.email', // closure or any other callable is also acceptable 'name' => function (MapInterface $map) { return $map['name']; }, 'age' => 'map.age', ]); /* * the is_entity condition resolves whether * unit should consider current row as the entity * some utility functions are available in * Maketok\DataMigration\Expression\HelperExpressionsProvider */ $customerUnit->setIsEntityCondition("trim(map.email) is not empty"); /* * the contributions is the way for unit to * add some data into general pool for every other unit to use * * This is the logic for assigning customer_id * First it checks if it exists in the pre-compiled Hashmap * If it does not, it's calling for frozen increment for "new_customer_id" key * and assign the last increment id if it's non existent * The frozenIncr is different from incr in that it's incremented only once * is_entity condition resolves for current row * So it's perfect for incrementing "parent" entities */ $customerUnit->addContribution(function ( MapInterface $map, ResourceHelperInterface $resource, array $hashmaps ) { if (isset($hashmaps['email-id'][trim($map->email)])) { $map['customer_id'] = $hashmaps['email-id'][trim($map->email)]; } else { $map['customer_id'] = $map->frozenIncr( 'new_customer_id', $resource->getLastIncrementId('customers') ) } }); $addressUnit = new Unit('addresses'); $addressUnit->setTable('addresses'); $addressUnit->setMapping([ 'id' => 'map.incr('address_id', resource.getLastIncrementId('addresses'))' 'street' => 'map.street', 'city' => 'map.city', 'zip' => 'map.zip', 'parent_id' => 'map.customer_id', ]); $addressUnit->setParent($customerUnit); $bag = new SimpleBag(); $bag->addSet([$customerUnit, $addressUnit]); /* * Last but not least, since we're using CSV file, we need a Shaper * instance to shape up our flat file before feeding it to CreateTmpFiles action */ $input = new Csv($fname, 'r', new Nulls($bag, new ArrayMap(), $this->getLanguageAdapter())); $workflow = new QueueWorkflow($config, $result); $workflow->add(new CreateTmpFiles($bag, $config, $languageAdapter, $input, new ArrayMap(), $helperResource)); $workflow->add(new Load($bag, $config, $resource)); $workflow->add(new Move($bag, $config, $resource)); $workflow->execute();
Export
We have 3 DB tables for customers and their addresses.
customer
customer_data
address
We want to get next output:
customers.csv
use Maketok\DataMigration\MapInterface; use Maketok\DataMigration\QueueWorkflow; use Maketok\DataMigration\Unit\SimpleBag; use Maketok\DataMigration\Unit\Type\Unit; use Maketok\DataMigration\Storage\Db\ResourceHelperInterface; use Maketok\DataMigration\Action\Type\AssembleInput; use Maketok\DataMigration\Action\Type\Load; use Maketok\DataMigration\Action\Type\Move; use Maketok\DataMigration\Input\Shaper\Processor\Nulls; $customerUnit = new Unit('customers'); $customerUnit->setTable('customer'); $customerUnit->setMapping([ 'id' => 'map.customer_id' 'email' => 'mail.email', 'age' => 'map.age', ]); $customerUnit->setIsEntityCondition("trim(map.email) is not empty"); $customerUnit->addContribution("map.offsetSet( 'customer_id', (isset(hashmaps['email-id'][trim(map.email)]) ? hashmaps['email-id'][trim(map.email)] : map.frozenIncr( 'new_customer_id', resource.getLastIncrementId('customer') ) ) )"); $customerUnit->setReversedConnection([ 'customer_id' => 'id', ]); $customerUnit->setReversedMapping([ 'email' => 'map.email', 'age' => 'map.age', ]); $customerDataUnit = new Unit('customer_data'); $customerDataUnit->setTable('customer_data'); $customerDataUnit->setMapping([ 'id' => 'map.incr('customer_data_id', resource.getLastIncrementId('customer_data'))' 'parent_id' => 'map.customer_id' 'firstname' => 'map.firstname', 'lastname' => 'map.lastname', ]); $customerDataUnit->addContribution("map.offsetSet( 'complexName', explode(' ', map.name) )"); $customerDataUnit->addContribution("map.offsetSet( 'firstname', (count(map.complexName) >= 2 && isset(map.complexName[0]) ? map.complexName[0] : map.name) )"); $customerDataUnit->addContribution("map.offsetSet( 'lastname', (count(map.complexName) >= 2 && isset(map.complexName[1]) ? map.complexName[1] : '') )"); $customerDataUnit->setReversedConnection([ 'customer_id' => 'parent_id', ]); $customerDataUnit->setReversedMapping([ 'name' => 'map.firstname ~ " " ~ map.lastname', ]); $customerUnit->addSibling($customerDataUnit); $addressUnit = new Unit('addresses'); $addressUnit->setTable('address'); $addressUnit->setMapping([ 'id' => 'map.incr('address_id', resource.getLastIncrementId('address'))' 'customer_id' => 'map.customer_id', 'street' => 'map.street', 'city' => 'map.city', 'zip' => 'map.zip', ]); $addressUnit->setReversedConnection([ 'customer_id' => 'customer_id', ]); $addressUnit->setReversedMapping([ 'street' => 'map.street', 'city' => 'map.city', 'zip' => 'map.zip', ]); $addressUnit->setParent($customerUnit); $bag = new SimpleBag(); $bag->addSet([$customerUnit, $customerDataUnit, $addressUnit]); $input = new Csv($fname, 'w', new Nulls($bag, new ArrayMap(), $this->getLanguageAdapter())); $result = new Result(); $workflow = new QueueWorkflow($this->config, $result); $workflow->add(new ReverseMove($bag, $config, $resource)); $workflow->add(new Dump($bag, $config, $resource)); $workflow->add(new AssembleInput($bag, $config, $languageAdapter, $input, new ArrayMap())); $workflow->execute();