ivanfuhr / ingestor
Requires
- php: ^8.2
- ext-pdo: *
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.95
- phpstan/phpstan: ^2.2
- phpunit/phpunit: ^11.5
- rector/rector: ^2.4
README
Ingestor
Ingestor is a PHP library for safe, auditable data imports with isolated staging, atomic release, and an extensible pipeline.
Data enters through a source driver, is transformed into mutations by a definition, is applied in an isolated stage by a persistence driver, and is only then promoted to production β safely and atomically.
Requires PHP 8.2+ and the PDO extension.
Installation
β‘οΈ Get started by requiring the package using Composer:
composer require ivanfuhr/ingestor
Quick Start
use Ivanfuhr\Ingestor\Ingestor; use Ivanfuhr\Ingestor\Driver\Persistence\PostgresDriver; use Ivanfuhr\Ingestor\Driver\Source\CsvDriver; $ingestor = Ingestor::make( persistence: new PostgresDriver($pdo), source: new CsvDriver(), ); $import = $ingestor ->for(CustomerImport::class) ->from('/path/to/customers.csv') ->import(); if ($import->hasFailures()) { foreach ($import->failures() as $failure) { // inspect validation or persistence failures } $import->rollback(); return; } $import->release();
Table of Contents
- Architecture
- Definitions & Schema
- Context
- Validation
- Persistence Failures
- Hooks
- Metrics
- Testing Utilities
- PostgreSQL Driver
- CSV Driver
- Development
- Community
- License
ποΈ Architecture
Ingestor separates four responsibilities:
Source
β
Source Driver
β
Iterable<RowContext>
β
Definition (prepare β validate β map)
β
Dataset (mutations)
β
Persistence Driver
β
Stage (isolated)
β
Release (atomic promotion)
| Driver | Responsibility | Implementations |
|---|---|---|
| Source | Turns a source into input rows | CsvDriver |
| Persistence | Creates staging, persists mutations, and releases | PostgresDriver |
Drivers are injected at construction time. The import pipeline never needs to know how data is read or written.
$ingestor = Ingestor::make( persistence: new PostgresDriver($pdo), source: new CsvDriver(), );
Why: Keeps reading, transformation, and persistence independent β each piece can be swapped or tested in isolation.
π Definitions & Schema
A Definition describes an import. It declares structure via Schema and transforms each row into write intentions via Dataset.
use Ivanfuhr\Ingestor\Contract\Definition; use Ivanfuhr\Ingestor\Contract\Context; use Ivanfuhr\Ingestor\Dataset\Dataset; use Ivanfuhr\Ingestor\Schema\Schema; use Ivanfuhr\Ingestor\Stage\EmptyStage; use Ivanfuhr\Ingestor\Stage\PrefilledStage; use Ivanfuhr\Ingestor\Conflict\UpdateOnConflict; final class CustomerImport implements Definition { public function schema(): Schema { return Schema::make() ->dataset('customers') ->using(PrefilledStage::class) ->onConflict(UpdateOnConflict::by('document')) ->dataset('addresses') ->using(EmptyStage::class); } public function map(array $row, Context $context): Dataset { return Dataset::make() ->insert('customers', [ 'document' => $row['cpf'], 'name' => $row['name'], ]) ->insert('addresses', [ 'document' => $row['cpf'], 'city' => $row['city'], ]); } }
Stage Strategies
| Strategy | Behavior |
|---|---|
EmptyStage |
Dataset starts empty |
PrefilledStage |
Dataset starts with a copy of existing data (ideal for incremental updates) |
Conflict Strategies
Declared in the Schema and translated by the persistence driver:
UpdateOnConflict::by('document'); IgnoreOnConflict::by('document'); ReplaceOnConflict::by('document'); FailOnConflict::by('document');
A Stage is an isolated ingestion environment. Nothing touches production until release() is called.
Import
βββ Stage
βββ customers (staging table)
βββ addresses (staging table)
Why: One row can produce zero, one, or many mutations across multiple datasets β without coupling business logic to SQL.
ποΈ Context
Shared storage available throughout an import. Use it to preload ID maps, caches, and reference data so map() stays pure and fast.
use Ivanfuhr\Ingestor\Contract\Preparable; final class OrderImport implements Definition, Preparable { public function prepare(Context $context): void { $context->put('customers', Customer::pluck('id', 'document')->all()); } public function map(array $row, Context $context): Dataset { $customers = $context->get('customers'); return Dataset::make()->insert('orders', [ 'customer_id' => $customers[$row['document']] ?? null, 'total' => $row['total'], ]); } }
Why: Avoids N+1 queries during import. I/O belongs in prepare(); map() should be a pure Row + Context β Dataset transformation.
β Validation
Row validation is optional and runs before mapping. Implement ValidatesRows on your definition:
use Ivanfuhr\Ingestor\Contract\ValidatesRows; use Ivanfuhr\Ingestor\Validation\Failure; final class CustomerImport implements Definition, ValidatesRows { public function validate(array $row, Context $context): iterable { if (empty($row['document'])) { yield Failure::error('document') ->message('Document is required.'); } if (empty($row['phone'])) { yield Failure::warning('phone') ->message('Phone number is empty.'); } } }
| Severity | Behavior |
|---|---|
ERROR |
Row is skipped β not mapped or persisted |
WARNING |
Recorded, but the row continues through the pipeline |
Failures are available after import:
$import->failures(); $import->hasFailures();
Why: Invalid rows are caught early, before any database writes, with full reporting for audits and reprocessing.
π¨ Persistence Failures
Database errors (NOT NULL, FOREIGN KEY, UNIQUE, etc.) are exposed through the same Failure mechanism, with additional context:
line()β original source line numberdataset()β affected datasetdata()β row datacause()β underlying exception
Failures do not trigger an automatic rollback. You decide between release() and rollback().
$import = $ingestor ->for(CustomerImport::class) ->from($file) ->import(); if ($import->hasFailures()) { foreach ($import->failures() as $failure) { dump([ 'line' => $failure->line(), 'dataset' => $failure->dataset(), 'message' => $failure->message(), 'data' => $failure->data(), ]); } $import->rollback(); return; } $import->release();
SQL Failure Modes
PostgresDriver supports configurable failure diagnosis:
use Ivanfuhr\Ingestor\Driver\Persistence\SqlFailureMode; new PostgresDriver($pdo, chunkSize: 500, failureMode: SqlFailureMode::Diagnostic);
| Mode | Priority |
|---|---|
Fast |
Throughput β records batch failure when a bulk INSERT fails |
Diagnostic |
Traceability β subdivides the batch to isolate the failing row |
Why: Every mutation inherits its source row context, so persistence errors remain traceable even at scale.
π Hooks
High-level lifecycle hooks for auditing, metrics, notifications, and external integrations. They run a fixed number of times regardless of row volume.
beforeImport()
β
prepare()
β
validate() β map() β persist()
β
afterImport()
β
release()
β
beforeRelease() β promote stage β afterRelease()
| Interface | When | Typical use |
|---|---|---|
BeforeImport |
Before import starts | Timers, logging, audit trail |
AfterImport |
After all rows processed, before release | Metrics, reports, notifications |
BeforeRelease |
Immediately before promotion | Final checks, manual approval |
AfterRelease |
After promotion | Cache invalidation, external sync |
BeforeRelease can block publication:
use Ivanfuhr\Ingestor\Exception\CannotRelease; public function beforeRelease(ImportedImport $import): void { if ($import->hasFailures()) { throw CannotRelease::because('Import contains unresolved failures.'); } }
Why: Integrate with the outside world without per-row callbacks that would destroy throughput.
π Metrics
Read-only metrics collected during import. Available whether you release or rollback.
$metrics = $import->metrics(); $metrics->startedAt(); $metrics->finishedAt(); $metrics->duration(); $metrics->rows(); // rows processed $metrics->importedRows(); // rows imported successfully $metrics->failedRows(); // rows with failures $metrics->mutations(); // mutations produced foreach ($metrics->datasets() as $dataset) { $dataset->name(); $dataset->mutations(); $dataset->persisted(); $dataset->failures(); }
Failures answer what and why. Metrics answer how much and how long.
Why: Every import becomes observable β performance, throughput, and per-dataset breakdowns without affecting the pipeline.
π§ͺ Testing Utilities
Test definitions in isolation β no database, no CSV files, no external infrastructure.
Asserting the Schema
use Ivanfuhr\Ingestor\Ingestor; Ingestor::test(CustomerImport::class) ->assertDataset('customers') ->assertStage(PrefilledStage::class) ->assertUpdateOnConflict('document');
Asserting map()
Ingestor::test(CustomerImport::class) ->withContext(['customers' => ['12345678901' => 1]]) ->map(['cpf' => '12345678901', 'name' => 'Ada', 'city' => 'SP']) ->assertInserted('customers', [ 'document' => '12345678901', 'name' => 'Ada', ]) ->assertDatasetCount('addresses', 1);
Asserting Validation
Ingestor::test(CustomerImport::class) ->map(['document' => null]) ->assertFailure(field: 'document', message: 'Document is required.') ->assertFailureCount(1);
Asserting the Full Pipeline
Ingestor::test(CustomerImport::class) ->fromRows([ ['cpf' => '1', 'name' => 'Ada', 'city' => 'SP'], ['cpf' => '2', 'name' => 'Bob', 'city' => 'RJ'], ]) ->import() ->assertRows(2) ->assertImportedRows(2) ->assertFailedRows(0) ->assertMutations(4);
Why: Definitions should be fully testable with fast, deterministic tests β safe to refactor without spinning up infrastructure.
π PostgreSQL Driver
PostgresDriver creates staging tables, inserts data in configurable batches, and atomically promotes staging to production.
use Ivanfuhr\Ingestor\Driver\Persistence\PostgresDriver; use Ivanfuhr\Ingestor\Driver\Persistence\SqlFailureMode; $driver = new PostgresDriver( pdo: $pdo, chunkSize: 500, failureMode: SqlFailureMode::Fast, );
The driver introspects production tables to build matching staging tables and applies conflict strategies from the Schema via ON CONFLICT.
Why: Staging + atomic swap gives you a safe rollback window before data ever reaches production.
π CSV Driver
CsvDriver reads CSV files with a header row and yields RowContext objects with line numbers and associative data.
use Ivanfuhr\Ingestor\Driver\Source\CsvDriver; $ingestor = Ingestor::make($persistence, new CsvDriver());
Why: Line numbers flow through the entire pipeline, enabling precise failure reporting back to the source file.
π οΈ Development
composer test # PHPUnit composer lint # PHP-CS-Fixer (check) composer lint:fix # PHP-CS-Fixer (fix) composer phpstan # Static analysis composer rector # Automated refactoring
Community
License
Ingestor was created by Ivan FΓΌhr under the MIT license.