wizcodepl / laravel-pipe
Stage-based pipeline framework for batch ETL of record streams in Laravel — drivers, resolvers, per-stage stats.
Requires
- php: ^8.2
- illuminate/console: ^11.0|^12.0
- illuminate/support: ^11.0|^12.0
Requires (Dev)
- laravel/pint: ^1.18
- orchestra/testbench: ^9.0|^10.0
- phpstan/phpstan: ^2.0
- phpunit/phpunit: ^11.0
README
Stage-based pipeline framework for batch ETL of record streams in Laravel — drivers, resolvers, per-stage stats.
If you have multiple product suppliers...
Each one ships their catalog in a different shape. Supplier A serves a JSON
REST API with translations under name.{locale}. Supplier B emits an XML feed with
parameters embedded as <table><tr><td> HTML inside the description. Supplier
C hands you a CSV with questionable encoding. Supplier D needs OAuth + an MD5
hash signed with a timestamp to mint a token that expires every hour.
Every supplier has "color" — but A returns it structurally
(features.color = "Black"), B has it inside the product name
("Acme Slim Tee L Black"), C buries it inside HTML
(<tr><td>color</td><td>Onyx Mist</td></tr>), and D doesn't return color
at all (you have to parse it from the model name).
The data is inconsistent. The same matte-black t-shirt will be:
- supplier A:
"Black" - supplier B:
"BLACK" - supplier C:
"Jet"(single-word marketing color) - supplier D:
"Onyx Black"(manufacturer marketing color you want to collapse into your canonical"Black")
Some data is just missing. One supplier exposes material in the spec
table. Another only ships a model name ("Merino Crew Neck" → you infer wool).
A third returns nothing — you have to validate that the product fails the
ProductType requirements and skip it.
The APIs are fragile — random 5xx responses, image-hosting timeouts, and new product categories you don't know about yet. Your sync either silently drops records or blows up mid-run leaving half-written state in the database.
This package is for that problem.
What you get
┌─ Driver ──────────── pulls supplier data
│ loadItems() (HTTP, file, queue — generator with lazy fetch)
│
▼
PipelineResolver ──────────── picks a pipeline per record
│ resolve() (returning null = skip + unrouted counter)
│
▼
Pipeline ─────────────── runs stages in declared order
│ stages() (list of class names)
│
▼
Stages ───────────────── per-stage transformation
│ static run(Item) (each reads from Item, writes to Item)
│ - $item->fail($msg, $context) when a stage can't do its job
│ - throw / fail → caught, logged, remaining stages for THIS
│ record skip; the next record starts clean
│
▼
PipelineRun ────────────── per-stage stats
entered / succeeded / failed / duration
+ processed / unrouted total
Core principles:
- The driver is lazy —
loadItems()is a generator,--limit=10stops fetching after 10 records. - One Item per record — each record gets its own mutable bag, stages can't step on each other.
- Stage fails? The remaining stages for THAT record are skipped, the next record starts clean.
- PipelineRun tells you EXACTLY where the problem is:
ValidateRequired Failed=300→ 300 records dropped becausematerialwas missing.StageFailureException::contextis forwarded toonStageErrorso you log the exact SKU + the missing fields.
Quick start
composer require wizcodepl/laravel-pipe
Optionally publish the config file (only needed if you want to register
drivers in config/laravel-pipe.php instead of using the
package defaults):
php artisan vendor:publish --tag=laravel-pipe-config
Skeleton generator for a new supplier:
php artisan make:pipe Acme --pipeline=Apparel
This creates:
app/Pipes/Acme/
├── AcmeDriver.php ← loadItems() with TODO
├── AcmePipelineResolver.php ← resolve() with TODO
└── ApparelPipeline/
└── ApparelPipeline.php ← stages() with TODO
Fill in the three TODOs, drop your stage classes into
ApparelPipeline/Stages/, and register the driver in
config/laravel-pipe.php:
'drivers' => [ 'acme' => App\Pipes\Acme\AcmeDriver::class, ],
Run:
php artisan pipe:run acme --limit=10
Output:
Processed: 10 • Unrouted: 0 • Duration: 4.21s
+----------------------------------------+---------+-----------+--------+----------+
| Stage | Entered | Succeeded | Failed | Duration |
+----------------------------------------+---------+-----------+--------+----------+
| Acme\Stages\MapBaseData | 10 | 10 | 0 | 0.012s |
| Acme\Stages\NormalizeAttributes | 10 | 10 | 0 | 0.034s |
| Acme\Stages\ResolveColor | 10 | 10 | 0 | 0.008s |
| Acme\Stages\ValidateRequired | 10 | 7 | 3 | 0.021s | ← 3 dropped
| Acme\Stages\PersistProduct | 7 | 7 | 0 | 0.842s |
| Acme\Stages\PersistDescription | 7 | 7 | 0 | 0.045s |
| Acme\Stages\SyncImages | 7 | 7 | 0 | 1.234s |
+----------------------------------------+---------+-----------+--------+----------+
Seven products landed in the database. Three failed at ValidateRequired —
your sync_quality log channel will tell you per-SKU what was missing
thanks to StageFailureException::context.
A stage is a tiny class
class MapBaseData { public static function run(Item $item): void { $raw = $item->get('rawData'); $item->set('baseData', [ 'sku' => $raw['Sku'], 'price' => $raw['Price'], 'brand' => $raw['Brand'], ]); } }
Stages typically come in three shapes inside an apparel pipeline:
ResolveProductType.php → item.productType (e.g. shirts, jackets, dresses)
MapBaseData.php → item.baseData (sku, price, brand, weight)
NormalizeAttributes.php → item.attributes (raw spec from supplier)
ResolveColor.php → item.attributes.color (canonical enum label)
ResolveSize.php → item.attributes.size (XS/S/M/L/XL/XXL)
ResolveMaterial.php → item.attributes.material (Cotton/Wool/Polyester)
ResolvePattern.php → item.attributes.pattern (Solid/Striped/Floral)
ValidateRequired.php → item.fail() if required attrs missing
PersistProduct.php → item.product, item.variant
PersistColorOption.php → ProductOption attach + value sync
PersistSizeOption.php → ProductOption attach + value sync
PersistDescription.php → write description to DB
SyncImages.php → download + attach media
Item — the mutable bag
Every record gets its own Item. Stages read and write through it:
$item = new Item(['rawData' => $apiRecord]); $item->get('rawData'); // read (with optional default) $item->set('baseData', [...]); // write $item->has('product'); // boolean (true even when value is null) $item->fail('Missing brand', [ // throws StageFailureException with context 'sku' => $sku, ]);
The driver injects PipelineRun under the reserved key _run. Don't touch
that from inside stages — the package owns it.
Failing a stage
Item::fail($message, $context = []) throws a StageFailureException. The
abstract pipeline catches it, increments the stage's failed counter, calls
onStageError($stage, $exception, $item), and stops the chain for that
record.
class ResolveColor { public static function run(Item $item): void { $attrs = $item->get('attributes', []); $manufacturer = (string) ($attrs['manufacturer_color'] ?? ''); $color = self::match($manufacturer); if ($color === null) { $item->fail('Could not resolve color', [ 'manufacturer_color' => $manufacturer, ]); } $attrs['color'] = $color->label(); $item->set('attributes', $attrs); } }
Override onStageError in your concrete pipeline to wire it to your logger:
class ApparelPipeline extends AbstractPipeline { protected function onStageError(string $stage, \Throwable $e, Item $item): void { Log::channel('sync_quality')->error('acme | stage failed', [ 'stage' => $stage, 'sku' => ($item->get('baseData') ?? [])['sku'] ?? null, 'message' => $e->getMessage(), ...($e instanceof StageFailureException ? $e->context : []), ]); } }
The ...$e->context spread merges per-stage diagnostics (manufacturer values,
the unparseable raw string, the missing field list, etc.) into the log record.
Inconsistent data — Resolve* stages
The classic problem: two suppliers ship different strings for the same color.
Supplier A: features.color = "Black"
Supplier B: features.color = "BLACK"
Supplier C: manufacturer_color = "Onyx Black" (from spec table)
Supplier D: product.name = "Crew Tee Black" (only inside the name)
Solution: a ResolveColor stage that maps every variant to the canonical
Color::Black enum, with a substring fallback through a 50+ entry dictionary.
For attributes with a stable domain (size, material, pattern) use PHP enums
with a label() method. For the long-tail of color names, substring matching
with locale-specific fallbacks works well.
Validating required data
A ValidateRequired stage typically reads state.productType, checks which
attributes that ProductType requires, and calls $item->fail(...) with the
missing field list when anything is absent. The pipeline aborts for that
record; the driver moves to the next one.
Per-stage invariant
Entered = Succeeded + Failed
Every record that entered a stage finishes in exactly one bucket. Failures
on ResolveX mean unparseable input; failures on Persist* mean a DB
constraint; failures on SyncImages mean the CDN was unreachable. The
table after each sync tells you which one — no guessing.
How the engine works (in one paragraph)
AbstractPipeline::run() iterates stages() in order, wraps each in a
try/catch (\Throwable), and on failure calls onStageError($stage, $e, $item) and stops the chain for that record. AbstractDriver::run(?int $limit) iterates loadItems() (a generator — --limit=10 actually stops
the HTTP fetch after 10 records), routes each Item through the resolver,
runs the chosen pipeline, and returns a PipelineRun with stats.
Commands
php artisan make:pipe {Name} [--pipeline=Main] [--force]
Generator: scaffolds three files in app/Pipes/{Name}/.
php artisan pipe:run {driver} [--limit=N]
Run a sync. {driver} is the key from config/laravel-pipe.php.
Output: per-stage stats table + Processed/Unrouted/Duration headline.
Failure records
Every stage failure is captured on PipelineRun (stage, message, context
from Item::fail()). Read them with $run->getFailures() and wire them
into whatever you already use — logs, alerts, audit tables.
To enrich each record (e.g. attach the SKU from item.baseData), override
AbstractPipeline::buildFailureRecord() in your concrete pipeline:
protected function buildFailureRecord(string $stage, \Throwable $e, Item $item): array { return [ ...parent::buildFailureRecord($stage, $e, $item), 'sku' => ($item->get('baseData') ?? [])['sku'] ?? null, ]; }
End-to-end test
Mock your API client or hit the live one — the package doesn't get in the way.
#[Group('e2e')] class AcmeSupplierSyncTest extends TestCase { use RefreshDatabase; public function test_sync_creates_products_from_real_api(): void { $run = app(AcmeDriver::class)->run(limit: 10); // Strict: 0 failures means every sample record made it through foreach ($run->getStages() as $stage => $stats) { $this->assertSame(0, $stats['failed'], "Stage {$stage} had {$stats['failed']} failures (entered={$stats['entered']})", ); } $this->assertGreaterThan(0, Product::count()); // Idempotency $first = Product::count(); app(AcmeDriver::class)->run(limit: 10); $this->assertSame($first, Product::count()); } }
Skip from CI when network/credentials are unavailable:
vendor/bin/phpunit --exclude-group=e2e.
Requirements
- PHP 8.2+
- Laravel 11 / 12 (only required for
RunCommand,MakePipeCommandandPipeServiceProvider— the artisan layer)
The core (Item, AbstractPipeline, AbstractDriver, PipelineRun,
StageFailureException) is plain PHP 8.2. Laravel is only needed for the
optional artisan + service-provider layer.
About Wizcode
Wizcode is an e-commerce agency specialised in Lunar. We design and ship B2B, B2C, and marketplace platforms on the Laravel + Lunar stack — from custom checkouts and supplier syncs to multi-channel pricing, PIM workflows, and headless storefronts.
Our open-source contributions to the Lunar ecosystem:
- wizcodepl/lunar-product-schemas — migration-style schema builder for Lunar product types and attributes.
- wizcodepl/laravel-pipe — stage-based pipeline framework for batch ETL of supplier feeds (used in production for catalog ingestion).
Contact us: https://wizcode.pl
License
MIT