wizcodepl/lunar-supplier-normalizer

Pipeline contracts and driver framework for normalizing supplier product feeds into Lunar.

Maintainers

Package info

github.com/wizcodepl/lunar-supplier-normalizer

pkg:composer/wizcodepl/lunar-supplier-normalizer

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

v0.1.0 2026-04-30 00:14 UTC

This package is auto-updated.

Last update: 2026-04-30 00:16:20 UTC


README

Pipeline framework for normalizing multi-supplier product feeds into Lunar.

If you have multiple product suppliers...

Each one ships their catalog in a different shape. Supplier A serves a JSON REST API with translations under name.pl. Supplier B emits an XML feed with parameters embedded as <table><tr><td> HTML inside the description. Supplier C hands you a CSV with questionable encoding. Supplier D needs OAuth + an MD5 hash signed with a timestamp to mint a token that expires every hour.

Every supplier has "color" — but A returns it structurally (features.color = "Black"), B has it inside the product name ("Acme Slim Tee L Black"), C buries it inside HTML (<tr><td>Kolor</td><td>Czarny połysk</td></tr>), and D doesn't return color at all (you have to parse it from the model name).

The data is inconsistent. The same matte-black t-shirt will be:

  • supplier A: "Black"
  • supplier B: "Czarny"
  • supplier C: "BLACK"
  • supplier D: "Onyx Black" (manufacturer marketing color you want to collapse into your canonical "Black")

Some data is just missing. One supplier exposes material in the spec table. Another only ships a model name ("Merino Crew Neck" → you infer wool). A third returns nothing — you have to validate that the product fails the ProductType requirements and skip it.

The APIs are fragile — random 5xx responses, image-hosting timeouts, and new product categories you don't know about yet. Your sync either silently drops records or blows up mid-run leaving half-written state in the database.

This package is for that problem.

What you get

                ┌─ Driver ──────────── pulls supplier data
                │  loadStates()        (HTTP, file, queue — generator with lazy fetch)
                │
                ▼
         PipelineResolver ──────────── picks a pipeline per record
                │  resolve()           (returning null = skip + unrouted counter)
                │
                ▼
            Pipeline ─────────────── runs stages in declared order
                │  stages()            (list of class names)
                │
                ▼
             Stages ───────────────── per-stage transformation
                │  static run(State)   (each reads from State, writes to State)
                │  - $state->fail($msg, $context) when a stage can't do its job
                │  - throw / fail → caught, logged, remaining stages for THIS
                │    record skip; the next record starts clean
                │
                ▼
          PipelineRun ────────────── per-stage stats
                                       entered / succeeded / failed / duration
                                       + processed / unrouted total

Core principles:

  • The driver is lazy — loadStates() is a generator, --limit=10 stops fetching after 10 records.
  • One State per record — each record gets its own mutable bag, stages can't step on each other.
  • Stage fails? The remaining stages for THAT record are skipped, the next record starts clean.
  • PipelineRun tells you EXACTLY where the problem is: ValidateRequired Failed=300 → 300 records dropped because material was missing. StageFailureException::context is forwarded to onStageError so you log the exact SKU + the missing fields.

Quick start

composer require wizcodepl/lunar-supplier-normalizer

Optionally publish the config file (only needed if you want to register drivers in config/lunar-supplier-normalizer.php instead of using the package defaults):

php artisan vendor:publish --tag=lunar-supplier-normalizer-config

Skeleton generator for a new supplier:

php artisan make:supplier-normalizer Acme --pipeline=Apparel

This creates:

app/SupplierPipelines/Acme/
├── AcmeDriver.php                ← loadStates() with TODO
├── AcmePipelineResolver.php      ← resolve() with TODO
└── ApparelPipeline/
    └── ApparelPipeline.php       ← stages() with TODO

Fill in the three TODOs, drop your stage classes into ApparelPipeline/Stages/, and register the driver in config/lunar-supplier-normalizer.php:

'drivers' => [
    'acme' => App\SupplierPipelines\Acme\AcmeDriver::class,
],

Run:

php artisan normalizer:sync acme --limit=10

Output:

Processed: 10  •  Unrouted: 0  •  Duration: 4.21s

+----------------------------------------+---------+-----------+--------+----------+
| Stage                                  | Entered | Succeeded | Failed | Duration |
+----------------------------------------+---------+-----------+--------+----------+
| Acme\Stages\MapBaseData                | 10      | 10        | 0      | 0.012s   |
| Acme\Stages\NormalizeAttributes        | 10      | 10        | 0      | 0.034s   |
| Acme\Stages\ResolveColor               | 10      | 10        | 0      | 0.008s   |
| Acme\Stages\ValidateRequired           | 10      | 7         | 3      | 0.021s   |  ← 3 dropped
| Acme\Stages\PersistProduct             | 7       | 7         | 0      | 0.842s   |
| Acme\Stages\PersistDescription         | 7       | 7         | 0      | 0.045s   |
| Acme\Stages\SyncImages                 | 7       | 7         | 0      | 1.234s   |
+----------------------------------------+---------+-----------+--------+----------+

Seven products landed in the database. Three failed at ValidateRequired — your sync_quality log channel will tell you per-SKU what was missing thanks to StageFailureException::context.

A stage is a tiny class

class MapBaseData
{
    public static function run(State $state): void
    {
        $raw = $state->get('rawData');

        $state->set('baseData', [
            'sku' => $raw['Sku'],
            'price' => $raw['Price'],
            'brand' => $raw['Brand'],
        ]);
    }
}

Stages typically come in three shapes inside an apparel pipeline:

ResolveProductType.php   → state.productType  (e.g. shirts, jackets, dresses)
MapBaseData.php          → state.baseData     (sku, price, brand, weight)
NormalizeAttributes.php  → state.attributes   (raw spec from supplier)
ResolveColor.php         → state.attributes.color           (canonical enum label)
ResolveSize.php          → state.attributes.size            (XS/S/M/L/XL/XXL)
ResolveMaterial.php      → state.attributes.material        (Cotton/Wool/Polyester)
ResolvePattern.php       → state.attributes.pattern         (Solid/Striped/Floral)
ValidateRequired.php     → state.fail() if required attrs missing
PersistProduct.php       → state.product, state.variant
PersistColorOption.php   → ProductOption attach + value sync
PersistSizeOption.php    → ProductOption attach + value sync
PersistDescription.php   → write description to DB
SyncImages.php           → download + attach media

State — the mutable bag

Every record gets its own State. Stages read and write through it:

$state = new State(['rawData' => $apiRecord]);

$state->get('rawData');                  // read (with optional default)
$state->set('baseData', [...]);          // write
$state->has('product');                  // boolean (true even when value is null)
$state->fail('Missing brand', [          // throws StageFailureException with context
    'sku' => $sku,
]);

The driver injects PipelineRun under the reserved key _run. Don't touch that from inside stages — the package owns it.

Failing a stage

State::fail($message, $context = []) throws a StageFailureException. The abstract pipeline catches it, increments the stage's failed counter, calls onStageError($stage, $exception, $state), and stops the chain for that record.

class ResolveColor
{
    public static function run(State $state): void
    {
        $attrs = $state->get('attributes', []);
        $manufacturer = (string) ($attrs['manufacturer_color'] ?? '');
        $color = self::match($manufacturer);

        if ($color === null) {
            $state->fail('Could not resolve color', [
                'manufacturer_color' => $manufacturer,
            ]);
        }

        $attrs['color'] = $color->label();
        $state->set('attributes', $attrs);
    }
}

Override onStageError in your concrete pipeline to wire it to your logger:

class ApparelPipeline extends AbstractPipeline
{
    protected function onStageError(string $stage, \Throwable $e, State $state): void
    {
        Log::channel('sync_quality')->error('acme | stage failed', [
            'stage' => $stage,
            'sku' => ($state->get('baseData') ?? [])['sku'] ?? null,
            'message' => $e->getMessage(),
            ...($e instanceof StageFailureException ? $e->context : []),
        ]);
    }
}

The ...$e->context spread merges per-stage diagnostics (manufacturer values, the unparseable raw string, the missing field list, etc.) into the log record.

Inconsistent data — Resolve* stages

The classic problem: two suppliers ship different strings for the same color.

Supplier A:  features.color.pl    = "Black"
Supplier B:  features.kolor       = "Czarny"
Supplier C:  manufacturer_color   = "Onyx Black"     (from spec table)
Supplier D:  product.name         = "Crew Tee Black" (only inside the name)

Solution: a ResolveColor stage that maps every variant to the canonical Color::Black enum, with a substring fallback through a 50+ entry dictionary.

For attributes with a stable domain (size, material, pattern) use PHP enums with a label() method. For the long-tail of color names, substring matching with locale-specific fallbacks works well.

Validating required data

A ValidateRequired stage typically reads state.productType, checks which attributes that ProductType requires, and calls $state->fail(...) with the missing field list when anything is absent. The pipeline aborts for that record; the driver moves to the next one.

Per-stage invariant

Entered = Succeeded + Failed

Every record that entered a stage finishes in exactly one bucket. Failures on ResolveX mean unparseable input; failures on Persist* mean a DB constraint; failures on SyncImages mean the CDN was unreachable. The table after each sync tells you which one — no guessing.

How the engine works (in one paragraph)

AbstractPipeline::run() iterates stages() in order, wraps each in a try/catch (\Throwable), and on failure calls onStageError($stage, $e, $state) and stops the chain for that record. AbstractDriver::run(?int $limit) iterates loadStates() (a generator — --limit=10 actually stops the HTTP fetch after 10 records), routes each State through the resolver, runs the chosen pipeline, and returns a PipelineRun with stats.

Commands

php artisan make:supplier-normalizer {Name} [--pipeline=Main] [--force]

Generator: scaffolds three files in app/SupplierPipelines/{Name}/.

php artisan normalizer:sync {driver} [--limit=N]

Run a sync. {driver} is the key from config/lunar-supplier-normalizer.php. Output: per-stage stats table + Processed/Unrouted/Duration headline.

End-to-end test

Mock your API client or hit the live one — the package doesn't get in the way.

#[Group('e2e')]
class AcmeSupplierSyncTest extends TestCase
{
    use RefreshDatabase;

    public function test_sync_creates_products_from_real_api(): void
    {
        $run = app(AcmeDriver::class)->run(limit: 10);

        // Strict: 0 failures means every sample record made it through
        foreach ($run->getStages() as $stage => $stats) {
            $this->assertSame(0, $stats['failed'],
                "Stage {$stage} had {$stats['failed']} failures (entered={$stats['entered']})",
            );
        }

        $this->assertGreaterThan(0, Product::count());

        // Idempotency
        $first = Product::count();
        app(AcmeDriver::class)->run(limit: 10);
        $this->assertSame($first, Product::count());
    }
}

Skip from CI when network/credentials are unavailable: vendor/bin/phpunit --exclude-group=e2e.

Requirements

  • PHP 8.2+
  • Laravel 11/12 (only required for SyncCommand + MakeSupplierNormalizerCommand + ServiceProvider)
  • Lunar 1.3+ (only if you persist products into Lunar — the package itself is Lunar-agnostic; persistence happens inside your own stages)

The core (State, AbstractPipeline, AbstractDriver, PipelineRun, StageFailureException) is plain PHP 8.2. Laravel is needed only for the optional artisan + service-provider layer.

License

MIT