wizcodepl / lunar-supplier-normalizer
Pipeline contracts and driver framework for normalizing supplier product feeds into Lunar.
Package info
github.com/wizcodepl/lunar-supplier-normalizer
pkg:composer/wizcodepl/lunar-supplier-normalizer
Requires
- php: ^8.2
- illuminate/console: ^11.0|^12.0
- illuminate/support: ^11.0|^12.0
Requires (Dev)
- laravel/pint: ^1.18
- orchestra/testbench: ^9.0|^10.0
- phpstan/phpstan: ^2.0
- phpunit/phpunit: ^11.0
README
Pipeline framework for normalizing multi-supplier product feeds into Lunar.
If you have multiple product suppliers...
Each one ships their catalog in a different shape. Supplier A serves a JSON
REST API with translations under name.pl. Supplier B emits an XML feed with
parameters embedded as <table><tr><td> HTML inside the description. Supplier
C hands you a CSV with questionable encoding. Supplier D needs OAuth + an MD5
hash signed with a timestamp to mint a token that expires every hour.
Every supplier has "color" — but A returns it structurally
(features.color = "Black"), B has it inside the product name
("Acme Slim Tee L Black"), C buries it inside HTML
(<tr><td>Kolor</td><td>Czarny połysk</td></tr>), and D doesn't return color
at all (you have to parse it from the model name).
The data is inconsistent. The same matte-black t-shirt will be:
- supplier A:
"Black" - supplier B:
"Czarny" - supplier C:
"BLACK" - supplier D:
"Onyx Black"(manufacturer marketing color you want to collapse into your canonical"Black")
Some data is just missing. One supplier exposes material in the spec
table. Another only ships a model name ("Merino Crew Neck" → you infer wool).
A third returns nothing — you have to validate that the product fails the
ProductType requirements and skip it.
The APIs are fragile — random 5xx responses, image-hosting timeouts, and new product categories you don't know about yet. Your sync either silently drops records or blows up mid-run leaving half-written state in the database.
This package is for that problem.
What you get
┌─ Driver ──────────── pulls supplier data
│ loadStates() (HTTP, file, queue — generator with lazy fetch)
│
▼
PipelineResolver ──────────── picks a pipeline per record
│ resolve() (returning null = skip + unrouted counter)
│
▼
Pipeline ─────────────── runs stages in declared order
│ stages() (list of class names)
│
▼
Stages ───────────────── per-stage transformation
│ static run(State) (each reads from State, writes to State)
│ - $state->fail($msg, $context) when a stage can't do its job
│ - throw / fail → caught, logged, remaining stages for THIS
│ record skip; the next record starts clean
│
▼
PipelineRun ────────────── per-stage stats
entered / succeeded / failed / duration
+ processed / unrouted total
Core principles:
- The driver is lazy —
loadStates()is a generator,--limit=10stops fetching after 10 records. - One State per record — each record gets its own mutable bag, stages can't step on each other.
- Stage fails? The remaining stages for THAT record are skipped, the next record starts clean.
- PipelineRun tells you EXACTLY where the problem is:
ValidateRequired Failed=300→ 300 records dropped becausematerialwas missing.StageFailureException::contextis forwarded toonStageErrorso you log the exact SKU + the missing fields.
Quick start
composer require wizcodepl/lunar-supplier-normalizer
Optionally publish the config file (only needed if you want to register
drivers in config/lunar-supplier-normalizer.php instead of using the
package defaults):
php artisan vendor:publish --tag=lunar-supplier-normalizer-config
Skeleton generator for a new supplier:
php artisan make:supplier-normalizer Acme --pipeline=Apparel
This creates:
app/SupplierPipelines/Acme/
├── AcmeDriver.php ← loadStates() with TODO
├── AcmePipelineResolver.php ← resolve() with TODO
└── ApparelPipeline/
└── ApparelPipeline.php ← stages() with TODO
Fill in the three TODOs, drop your stage classes into
ApparelPipeline/Stages/, and register the driver in
config/lunar-supplier-normalizer.php:
'drivers' => [ 'acme' => App\SupplierPipelines\Acme\AcmeDriver::class, ],
Run:
php artisan normalizer:sync acme --limit=10
Output:
Processed: 10 • Unrouted: 0 • Duration: 4.21s
+----------------------------------------+---------+-----------+--------+----------+
| Stage | Entered | Succeeded | Failed | Duration |
+----------------------------------------+---------+-----------+--------+----------+
| Acme\Stages\MapBaseData | 10 | 10 | 0 | 0.012s |
| Acme\Stages\NormalizeAttributes | 10 | 10 | 0 | 0.034s |
| Acme\Stages\ResolveColor | 10 | 10 | 0 | 0.008s |
| Acme\Stages\ValidateRequired | 10 | 7 | 3 | 0.021s | ← 3 dropped
| Acme\Stages\PersistProduct | 7 | 7 | 0 | 0.842s |
| Acme\Stages\PersistDescription | 7 | 7 | 0 | 0.045s |
| Acme\Stages\SyncImages | 7 | 7 | 0 | 1.234s |
+----------------------------------------+---------+-----------+--------+----------+
Seven products landed in the database. Three failed at ValidateRequired —
your sync_quality log channel will tell you per-SKU what was missing
thanks to StageFailureException::context.
A stage is a tiny class
class MapBaseData { public static function run(State $state): void { $raw = $state->get('rawData'); $state->set('baseData', [ 'sku' => $raw['Sku'], 'price' => $raw['Price'], 'brand' => $raw['Brand'], ]); } }
Stages typically come in three shapes inside an apparel pipeline:
ResolveProductType.php → state.productType (e.g. shirts, jackets, dresses)
MapBaseData.php → state.baseData (sku, price, brand, weight)
NormalizeAttributes.php → state.attributes (raw spec from supplier)
ResolveColor.php → state.attributes.color (canonical enum label)
ResolveSize.php → state.attributes.size (XS/S/M/L/XL/XXL)
ResolveMaterial.php → state.attributes.material (Cotton/Wool/Polyester)
ResolvePattern.php → state.attributes.pattern (Solid/Striped/Floral)
ValidateRequired.php → state.fail() if required attrs missing
PersistProduct.php → state.product, state.variant
PersistColorOption.php → ProductOption attach + value sync
PersistSizeOption.php → ProductOption attach + value sync
PersistDescription.php → write description to DB
SyncImages.php → download + attach media
State — the mutable bag
Every record gets its own State. Stages read and write through it:
$state = new State(['rawData' => $apiRecord]); $state->get('rawData'); // read (with optional default) $state->set('baseData', [...]); // write $state->has('product'); // boolean (true even when value is null) $state->fail('Missing brand', [ // throws StageFailureException with context 'sku' => $sku, ]);
The driver injects PipelineRun under the reserved key _run. Don't touch
that from inside stages — the package owns it.
Failing a stage
State::fail($message, $context = []) throws a StageFailureException. The
abstract pipeline catches it, increments the stage's failed counter, calls
onStageError($stage, $exception, $state), and stops the chain for that
record.
class ResolveColor { public static function run(State $state): void { $attrs = $state->get('attributes', []); $manufacturer = (string) ($attrs['manufacturer_color'] ?? ''); $color = self::match($manufacturer); if ($color === null) { $state->fail('Could not resolve color', [ 'manufacturer_color' => $manufacturer, ]); } $attrs['color'] = $color->label(); $state->set('attributes', $attrs); } }
Override onStageError in your concrete pipeline to wire it to your logger:
class ApparelPipeline extends AbstractPipeline { protected function onStageError(string $stage, \Throwable $e, State $state): void { Log::channel('sync_quality')->error('acme | stage failed', [ 'stage' => $stage, 'sku' => ($state->get('baseData') ?? [])['sku'] ?? null, 'message' => $e->getMessage(), ...($e instanceof StageFailureException ? $e->context : []), ]); } }
The ...$e->context spread merges per-stage diagnostics (manufacturer values,
the unparseable raw string, the missing field list, etc.) into the log record.
Inconsistent data — Resolve* stages
The classic problem: two suppliers ship different strings for the same color.
Supplier A: features.color.pl = "Black"
Supplier B: features.kolor = "Czarny"
Supplier C: manufacturer_color = "Onyx Black" (from spec table)
Supplier D: product.name = "Crew Tee Black" (only inside the name)
Solution: a ResolveColor stage that maps every variant to the canonical
Color::Black enum, with a substring fallback through a 50+ entry dictionary.
For attributes with a stable domain (size, material, pattern) use PHP enums
with a label() method. For the long-tail of color names, substring matching
with locale-specific fallbacks works well.
Validating required data
A ValidateRequired stage typically reads state.productType, checks which
attributes that ProductType requires, and calls $state->fail(...) with the
missing field list when anything is absent. The pipeline aborts for that
record; the driver moves to the next one.
Per-stage invariant
Entered = Succeeded + Failed
Every record that entered a stage finishes in exactly one bucket. Failures
on ResolveX mean unparseable input; failures on Persist* mean a DB
constraint; failures on SyncImages mean the CDN was unreachable. The
table after each sync tells you which one — no guessing.
How the engine works (in one paragraph)
AbstractPipeline::run() iterates stages() in order, wraps each in a
try/catch (\Throwable), and on failure calls onStageError($stage, $e, $state) and stops the chain for that record. AbstractDriver::run(?int $limit) iterates loadStates() (a generator — --limit=10 actually stops
the HTTP fetch after 10 records), routes each State through the resolver,
runs the chosen pipeline, and returns a PipelineRun with stats.
Commands
php artisan make:supplier-normalizer {Name} [--pipeline=Main] [--force]
Generator: scaffolds three files in app/SupplierPipelines/{Name}/.
php artisan normalizer:sync {driver} [--limit=N]
Run a sync. {driver} is the key from config/lunar-supplier-normalizer.php.
Output: per-stage stats table + Processed/Unrouted/Duration headline.
End-to-end test
Mock your API client or hit the live one — the package doesn't get in the way.
#[Group('e2e')] class AcmeSupplierSyncTest extends TestCase { use RefreshDatabase; public function test_sync_creates_products_from_real_api(): void { $run = app(AcmeDriver::class)->run(limit: 10); // Strict: 0 failures means every sample record made it through foreach ($run->getStages() as $stage => $stats) { $this->assertSame(0, $stats['failed'], "Stage {$stage} had {$stats['failed']} failures (entered={$stats['entered']})", ); } $this->assertGreaterThan(0, Product::count()); // Idempotency $first = Product::count(); app(AcmeDriver::class)->run(limit: 10); $this->assertSame($first, Product::count()); } }
Skip from CI when network/credentials are unavailable:
vendor/bin/phpunit --exclude-group=e2e.
Requirements
- PHP 8.2+
- Laravel 11/12 (only required for
SyncCommand+MakeSupplierNormalizerCommand+ServiceProvider) - Lunar 1.3+ (only if you persist products into Lunar — the package itself is Lunar-agnostic; persistence happens inside your own stages)
The core (State, AbstractPipeline, AbstractDriver, PipelineRun,
StageFailureException) is plain PHP 8.2. Laravel is needed only for the
optional artisan + service-provider layer.
License
MIT