andach / laravel-extract-and-transform
This is my package extract-and-transform
Fund package maintenance!
Andreas Christodolou
Installs: 0
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/andach/laravel-extract-and-transform
Requires
- php: ^8.2
- illuminate/contracts: ^11.0||^12.0
- spatie/laravel-package-tools: ^1.16
Requires (Dev)
- laravel/pint: ^1.14
- nunomaduro/collision: ^8.8
- orchestra/testbench: ^10.8
- pestphp/pest: ^4.0
- pestphp/pest-plugin-arch: ^4.0
- pestphp/pest-plugin-laravel: ^4.0
This package is auto-updated.
Last update: 2026-01-07 23:50:28 UTC
README
This is a package to extract and copy data from various sources, including CSVs and SQL tables. It provides a simple, programmatic API for connecting to data sources, inspecting their datasets, and synchronizing them to local database tables.
The package is built with robustness in mind, featuring:
- Schema Versioning: Safely manage changes to your source data or local mappings over time. The package can detect schema drift and prevent data corruption.
- Transactional Syncs: Guarantees that a sync operation either completes fully or fails completely, preventing partially-synced, inconsistent data.
- Resilient Connections: Automatically retries connections to external sources with exponential backoff, making syncs resilient to temporary network issues.
- Flexible Schema Mapping: Full control over how source columns are named, typed, or excluded in your local database.
- Extensible Strategies: A strategy pattern for handling different sync scenarios.
Installation
You can install the package via composer:
composer require andach/extract-and-transform
You can publish and run the migrations with:
php artisan vendor:publish --provider="Andach\ExtractAndTransform\ExtractAndTransformServiceProvider"
php artisan migrate
Usage
The following example demonstrates the complete workflow: defining a source, creating a versioned sync profile, running a sync, and handling a schema change by creating a new version.
Simple CSV Synchronisation
This example shows a connection to a CSV file. It will automatically search for and create columns unless mapColumns() is specified
use Andach\ExtractAndTransform\Facades\ExtractAndTransform; $path = storage_path('app/products.csv'); $source = ExtractAndTransform::createSource('My Source', 'csv', ['path' => $path]); $run = $source->sync($path) ->withStrategy('full_refresh') ->mapColumns(['id' => 'remote_id', 'name' => 'product_name']) // Optional ->toTable('products_v1') // Optional ->run();
SQL Database Synchronisation
This example demonstrates connecting to an external MySQL database and syncing two different tables (users and orders) from that single source.
use Andach\ExtractAndTransform\Facades\ExtractAndTransform; // 1. Create the Source (Connection) once $source = ExtractAndTransform::createSource('Legacy DB', 'sql', [ 'driver' => 'mysql', 'host' => '192.168.1.50', 'database' => 'legacy_app', 'username' => 'readonly_user', 'password' => 'secret', ]); // 2. Sync the 'users' table $source->sync('users') ->withStrategy('full_refresh') ->toTable('legacy_users') ->run(); // 3. Sync the 'orders' table $source->sync('orders') ->withStrategy('full_refresh') ->mapColumns(['order_id' => 'id', 'total_amount' => 'amount']) ->toTable('legacy_orders') ->run();
Data Transformations
The package includes a powerful, database-agnostic DSL for transforming your data after it has been extracted. This allows you to perform complex operations like joins (lookups), string manipulation, and math entirely within your database, ensuring high performance.
Basic Usage
Use the transform() method to define a transformation pipeline. The Expr class provides a fluent API for building expressions.
use Andach\ExtractAndTransform\Transform\Expr; ExtractAndTransform::transform('Clean Products') ->from('raw_products') ->select([ // Simple Rename 'sku' => 'remote_id', // Concatenation 'full_name' => Expr::concat(Expr::col('brand'), ' ', Expr::col('name')), // Static Mapping (Case Statement) 'is_active' => Expr::map('status', ['live' => 1])->default(0), // Math Operations 'tax_amount' => Expr::col('price')->multiply(0.2), // Lookups (Left Joins) 'category_name' => Expr::lookup('categories', 'cat_id', 'id', 'name'), ]) ->toTable('clean_products') ->run();
Chaining Transformations
You can chain multiple operations together to build complex logic.
ExtractAndTransform::transform('Advanced Transform') ->from('raw_products') ->select([ // Chain math: (price * 1.2) + 5 'final_price' => Expr::col('price')->multiply(1.2)->add(5), // Chain string functions: LOWER(REPLACE(name, ' ', '-')) 'slug' => Expr::col('name')->replace(' ', '-')->lower(), // Combine concatenation and string functions 'report_name' => Expr::concat(Expr::col('brand'), ': ', Expr::col('name'))->upper(), ]) ->toTable('advanced_products') ->run();
Available Expressions
Expr::col(string $column): Selects a column.Expr::concat(...$parts): Concatenates columns and strings.Expr::map(string $column, array $mapping): Creates aCASE WHENstatement.Expr::lookup(string $table, string $localKey, string $foreignKey, string $targetColumn): Performs aLEFT JOINto fetch a value from another table.
Chainable Methods
- Numeric:
add(),subtract(),multiply(),divide() - String:
upper(),lower(),trim(),replace($search, $replace)
Data-Driven Transformations (GUI Support)
The transformation engine is designed to be fully serializable, making it easy to build a GUI on top of it. You can save the transformation configuration as a JSON object in the database and execute it later without writing any PHP code.
Example JSON Configuration: This JSON corresponds to a transformation that lowercases a reference code and calculates a total.
{
"selects": {
"ref": {
"type": "string_function",
"function": "LOWER",
"column": {
"type": "column",
"column": "order_ref"
},
"arguments": []
},
"total": {
"type": "math",
"operator": "+",
"left": {
"type": "column",
"column": "subtotal"
},
"right": {
"type": "column",
"column": "tax"
}
},
"is_paid": {
"type": "map",
"column": "status",
"mapping": { "paid": 1 },
"default": 0
}
}
}
Executing the Saved Transformation:
use Andach\ExtractAndTransform\Models\Transformation; use Andach\ExtractAndTransform\Services\TransformationService; // 1. Load the transformation model (which contains the JSON config) $transformation = Transformation::where('name', 'My Saved Transform')->first(); // 2. Run it app(TransformationService::class)->run($transformation);
The service automatically rehydrates the JSON configuration into executable expressions.
Core Concepts
Understanding the distinction between a Source and a Sync is key to using this package effectively.
1. Source (The Connection)
A Source represents the configuration required to connect to an external system.
- Examples: A MySQL database connection, a Shopify API credential, or a path to a CSV file.
- Role: It knows how to connect, but not what to fetch.
2. Sync (The Dataset)
A Sync represents a specific dataset retrieved from a Source.
- Examples: The
userstable from a database, theordersendpoint from an API, or the contents of a CSV file. - Role: It holds the state (mappings, schema versions, run history) and the strategy for that specific dataset.
Sync Strategies
The package supports multiple strategies for synchronizing data, allowing you to choose the best approach for your specific use case.
full_refresh: The simplest strategy. It truncates the local table and re-imports all data from the source. Best for small datasets or when the source does not support incremental updates.watermark: Efficient for large, append-only or mutable datasets. It uses a "watermark" column (e.g.,updated_atorid) to fetch only rows that have changed or been added since the last sync.- Modes: Supports
append_only(inserts new rows) andupsert(updates existing rows based on primary key).
- Modes: Supports
content_hash: Useful when the source lacks a reliableupdated_atcolumn. It calculates a hash of the row's content to detect changes. It can also detect and handle deletions (soft deletes) by comparing source and local hashes.id_diff: Fetches all IDs from the source and compares them with local IDs. It then fetches full rows only for new IDs. Useful for detecting new records and deletions when fetching the full dataset is too expensive, but fetching a list of IDs is cheap.
Metadata Columns
The package automatically adds several reserved metadata columns to your local tables to manage synchronization state and history. You should not use these names for your own mapped columns.
__id: The local primary key (BigInteger).__source_id: The original ID from the source (nullable).__content_hash: A hash of the row's content, used by thecontent_hashstrategy.__is_deleted: A boolean flag indicating if the row has been deleted in the source (used bycontent_hashandid_diffstrategies).__last_synced_at: The timestamp when the row was last synced/updated in the local table.
Simple vs. Complex Workflows
- Simple (1-to-1): For a single CSV file, the Source and Sync might seem redundant. You define the Source (file path) and Sync (file content) together.
- Complex (1-to-Many): For a Database, you define the Source once (the connection). You then define multiple Syncs—one for each table you want to extract. This keeps your credentials centralized while allowing independent management of each table's sync schedule and schema.
Handling Schema Changes
When you change your column mapping (e.g., adding a new column), the package automatically detects this change and creates a new schema version.
Automatic Table Naming (Recommended)
If you do not explicitly specify a table name using toTable(), the package will automatically handle the table creation for you.
- Version 1: Creates table
csv_my_source_products_v1 - Version 2 (Schema Change): Automatically creates
csv_my_source_products_v2
This ensures that your sync never fails due to missing columns, and you retain the old data in the v1 table.
Explicit Table Naming
If you explicitly specify a table name using toTable('products_v1'), you must be careful when changing the schema.
If you add a column to mapColumns but keep toTable('products_v1'), the sync will fail with a Column not found error because the package will try to insert the new column into the existing products_v1 table (which doesn't have it).
Correct Approach: When changing the schema, you should either:
- Remove
toTable()to let the package auto-generate the next version name. - Update the table name manually (e.g., change to
toTable('products_v2')).
Testing
docker compose exec -e XDEBUG_MODE=coverage app ./vendor/bin/phpunit --coverage-html phpunit_reports