olexin-pro/data-processing-pipeline

A robust, strictly-typed, and extensible data processing pipeline system for Laravel applications. Process data through a chain of isolated steps with built-in conflict resolution, priority handling, and optional execution history tracking.

Installs: 3

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/olexin-pro/data-processing-pipeline

v0.3.0 2025-11-11 06:03 UTC

This package is auto-updated.

Last update: 2026-01-11 06:26:15 UTC


README

PHP Version Laravel Version License codecov

A robust, strictly-typed, and extensible data processing pipeline system for Laravel applications. Process data through a chain of isolated steps with built-in conflict resolution, priority handling, and optional execution history tracking.

โœจ Features

  • ๐Ÿ”’ Strictly Typed - Full PHP 8.1+ type safety with enums and interfaces
  • ๐Ÿ”„ Immutable Payload - Original data never changes during processing
  • ๐ŸŽฏ Conflict Resolution - Built-in strategies: MERGE, OVERWRITE, SKIP, CUSTOM
  • ๐Ÿ“Š Priority System - Control data precedence in merge operations
  • ๐Ÿ“ Execution History - Optional database tracking of pipeline runs
  • ๐Ÿš€ Queue-Safe - Fully serializable for Laravel queues
  • ๐Ÿงฉ Extensible - Easy to add custom steps and conflict resolvers
  • ๐Ÿงช Well Tested - Comprehensive test coverage

๐Ÿ“ฆ Installation

composer require olexin-pro/data-processing-pipeline

Publish Migrations

php artisan vendor:publish --provider="DataProcessingPipeline\PipelineServiceProvider" --tag=pipeline-migrations
php artisan migrate

This creates two tables:

  • pipeline_runs - Stores pipeline execution records
  • pipeline_steps - Stores individual step execution details

๐Ÿš€ Quick Start

1. Create a Pipeline Step

namespace App\Pipelines\Steps;

use DataProcessingPipeline\Pipelines\Contracts\PipelineStepInterface;
use DataProcessingPipeline\Pipelines\Contracts\PipelineContextInterface;
use DataProcessingPipeline\Pipelines\Results\GenericPipelineResult;
use DataProcessingPipeline\Pipelines\Enums\ConflictPolicy;

class EmailFormatterStep implements PipelineStepInterface
{
    public function handle(PipelineContextInterface $context): GenericPipelineResult
    {
        $email = $context->getContent('user.email', '');
        
        return new GenericPipelineResult(
            key: 'email',
            data: ['value' => strtolower(trim($email))],
            policy: ConflictPolicy::MERGE,
            priority: 10,
            provenance: self::class
        );
    }
}
<?php

namespace App\Pipelines\Steps;

use DataProcessingPipeline\Pipelines\Contracts\PipelineStepInterface;
use DataProcessingPipeline\Pipelines\Contracts\PipelineContextInterface;
use DataProcessingPipeline\Pipelines\Results\GenericPipelineResult;
use DataProcessingPipeline\Pipelines\Enums\ConflictPolicy;

class EmailValidatorStep implements PipelineStepInterface
{
    public function handle(PipelineContextInterface $context): GenericPipelineResult
    {
        $emailData = $context->getResult('email')->getData();
        $email = $emailData['value'] ?? null;

        $isValid = filter_var($email, FILTER_VALIDATE_EMAIL) !== false;

        return new GenericPipelineResult(
            key: 'email',
            data: [
                'value' => $email
                'status' => $isValid ? 'verified' : 'invalid',
            ],
            policy: ConflictPolicy::MERGE,
            priority: 20,
            provenance: self::class
        );
    }
}

2. Run the Pipeline

use DataProcessingPipeline\Pipelines\Context\PipelineContext;
use DataProcessingPipeline\Pipelines\Contracts\PipelineRunnerInterface;
use DataProcessingPipeline\Pipelines\History\PipelineHistoryRecorder;
use App\Pipelines\Steps\EmailFormatterStep;
use App\Pipelines\Steps\EmailValidatorStep;

$context = PipelineContext::make(['user' => ['email' => 'John@Example.COM']]);
$recorder = new PipelineHistoryRecorder('user-processing');

$runner = app(PipelineRunnerInterface::class);
$runner->setRecorder($recorder)
    ->addStep(new EmailFormatterStep())
    ->addStep(new EmailValidatorStep());

$result = $runner->run($context);

// Access results
$emailData = $result->getResult('email')->getData();
// ['value' => 'john@example.com', 'status' => 'verified']

// Build data
$built = $result->build();
// ['email' => ['value' => 'john@example.com', 'status' => 'verified']]

3. Queue Execution

use App\Pipelines\Steps\EmailFormatterStep;
use App\Pipelines\Steps\EmailValidatorStep;
use DataProcessingPipeline\Pipelines\Context\PipelineContext;
use DataProcessingPipeline\Jobs\ProcessPipelineJob;
use DataProcessingPipeline\Services\Notifiers\LogNotifier;

$payload = ['user' => ['email' => 'john@example.com']];

$steps = [
    EmailFormatterStep::class,
    EmailValidatorStep::class,
];


$context = PipelineContext::make($payload);

ProcessPipelineJob::dispatch(
    contextData: $context->toArray(),
    stepClasses: $steps,
    pipelineName: 'email-processing'
    recordHistory: false
    notifierClass: LogNotifier::class
);

๐Ÿ“š Core Concepts

Pipeline Context

The context is an immutable container that holds:

  • Payload (readonly) - Original input data
  • Results - Accumulated step outputs
  • Meta - Arbitrary metadata (errors, timestamps, etc.)
use DataProcessingPipeline\Pipelines\Context\PipelineContext;
use DataProcessingPipeline\Pipelines\Contracts\ConflictResolverInterface;

$context = new PipelineContext::make(
    payload: ['user' => ['id' => 1, 'email' => 'test@example.com']],
    meta: ['request_id' => 'abc-123'],
    conflictResolver: app()->make(ConflictResolverInterface::class)
);

Pipeline Results

Every step returns a PipelineResultInterface:

interface PipelineResultInterface
{
    public function getKey(): string;
    public function getData(): int|float|array|bool|string|null;
    public function getPolicy(): ConflictPolicy;
    public function getPriority(): int;
    public function getProvenance(): string;
    public function getStatus(): ResultStatus;
    public function getMeta(): array;
}

Conflict Policies

When multiple steps write to the same key:

MERGE (default)

Recursively merges arrays, respecting priorities:

// Step 1
['name' => 'John', 'age' => 30]

// Step 2
['age' => 31, 'city' => 'NYC']

// Result
['name' => 'John', 'age' => 31, 'city' => 'NYC']

OVERWRITE

Replaces previous value completely:

return new GenericPipelineResult(
    key: 'user',
    data: ['id' => 2, 'name' => 'Jane'],
    policy: ConflictPolicy::OVERWRITE
);

SKIP

Keeps the first result, ignores the rest:

return new GenericPipelineResult(
    key: 'config',
    data: ['version' => 2],
    policy: ConflictPolicy::SKIP
);

CUSTOM

Define a custom resolver:

use DataProcessingPipeline\Pipelines\Contracts\ConflictResolverInterface;
use DataProcessingPipeline\Pipelines\Contracts\PipelineResultInterface;
use DataProcessingPipeline\Pipelines\Contracts\PipelineContextInterface;

class PriorityConflictResolver implements ConflictResolverInterface
{
    public function resolve(
        PipelineResultInterface $existing,
        PipelineResultInterface $incoming,
        PipelineContextInterface $context
    ): PipelineResultInterface {
        return $incoming->getPriority() > $existing->getPriority()
            ? $incoming
            : $existing;
    }
}

return new GenericPipelineResult(
    key: 'user',
    data: ['role' => 'admin'],
    policy: ConflictPolicy::CUSTOM,
    meta: ['resolver' => PriorityConflictResolver::class]
);

๐Ÿ”ง Advanced Usage

Priority System

Higher priority overrides lower ones in merges:

// Low priority
new GenericPipelineResult(
    key: 'settings',
    data: ['theme' => 'light'],
    priority: 5
);

// High priority
new GenericPipelineResult(
    key: 'settings',
    data: ['theme' => 'dark'],
    priority: 20
);

// Result: theme = 'dark'

Execution History

Enable automatic logging:

use DataProcessingPipeline\Pipelines\History\PipelineHistoryRecorder;

$recorder = new PipelineHistoryRecorder('user-processing');

$runner = new PipelineRunner(
    steps: [
        new EmailFormatterStep(),
        new EmailValidatorStep(),
    ],
    recorder: $recorder
);

// or
$recorder = app()->makeWith(
    PipelineHistoryRecorderInterface::class, 
    [
        'pipelineName' => $pipelineName, 
        'enabled' => true
    ]
);

$runner = app(PipelineRunnerInterface::class)
    ->setRecorder($recorder)
    ->addStep(new EmailFormatterStep())
    ->addStep(new EmailValidatorStep());

$runner->run($context);

Error Handling

Pipeline continues on failure:

class RiskyStep implements PipelineStepInterface
{
    public function handle(PipelineContext $context): PipelineResultInterface
    {
        if ($someCondition) {
            throw new \RuntimeException('Processing failed');
        }
        
        return new GenericPipelineResult(
            key: 'result',
            data: ['success' => true]
        );
    }
}

$result = $runner->run($context);

if (!empty($result->meta['errors'])) {
    foreach ($result->meta['errors'] as $error) {
        Log::error('Pipeline step failed', [
            'step' => $error['step'],
            'message' => $error['message']
        ]);
    }
}

Dynamic Steps

Add steps at runtime:

$recorder = new PipelineHistoryRecorder('user-processing');
$runner = app(PipelineRunnerInterface::class);

$runner->setRecorder($recorder)
       ->addStep(new EmailFormatterStep())
       ->addStep(new EmailValidatorStep())
       ->addStep(new EmailDomainCheckerStep());


$runner->run($context);

Conditional Steps

Steps can self-skip:

class ConditionalStep implements PipelineStepInterface
{
    public function handle(PipelineContextInterface $context): PipelineResultInterface
    {
        if (!$context->getContent('process_email')) {
            return null;
        }

        // normal logic...
    }
}

๐ŸŽฏ Real-World Example

// Domain: E-commerce Order Processing
<?php

use DataProcessingPipeline\Pipelines\Contracts\{
    PipelineContextInterface,
    PipelineResultInterface,
    PipelineStepInterface
};
use DataProcessingPipeline\Pipelines\Results\GenericPipelineResult;
use DataProcessingPipeline\Pipelines\Enums\ConflictPolicy;
use DataProcessingPipeline\Pipelines\Context\PipelineContext;
use DataProcessingPipeline\Pipelines\Runner\PipelineRunner;

/**
 * Validate order
 */
final class ValidateOrderStep implements PipelineStepInterface
{
    public function handle(PipelineContextInterface $context): PipelineResultInterface
    {
        $order = $context->getContent('order', []);
        $errors = [];

        if (empty($order['items'])) {
            $errors[] = 'Order must contain at least one item.';
        }

        if (!isset($order['id'])) {
            $errors[] = 'Order ID is missing.';
        }

        return new GenericPipelineResult(
            key: 'validation',
            data: [
                'valid'  => empty($errors),
                'errors' => $errors,
            ],
            priority: 100
        );
    }
}

/**
 * Calculate per-product totals
 */
final class CalculateProductsStep implements PipelineStepInterface
{
    public function handle(PipelineContextInterface $context): PipelineResultInterface
    {
        $items = $context->getContent('order.items', []);

        $products = collect($items)->map(fn ($p) => [
            'name'  => $p['name'],
            'price' => (float) $p['price'],
            'qty'   => (int) $p['qty'],
            'total' => (float) $p['price'] * (int) $p['qty'],
        ])->toArray();

        return new GenericPipelineResult(
            key: 'products',
            data: $products,
            policy: ConflictPolicy::MERGE
        );
    }
}

/**
 * Calculate totals (subtotal, tax, total)
 */
final class CalculateTotalsStep implements PipelineStepInterface
{
    public function handle(PipelineContextInterface $context): PipelineResultInterface
    {
        $products = $context->getResult('products')?->getData() ?? [];

        $subtotal = array_sum(array_column($products, 'total'));
        $tax = round($subtotal * 0.1, 2);
        $total = $subtotal + $tax;

        return new GenericPipelineResult(
            key: 'totals',
            data: compact('subtotal', 'tax', 'total')
        );
    }
}

/**
 * Apply coupon discounts
 */
final class ApplyDiscountStep implements PipelineStepInterface
{
    public function handle(PipelineContextInterface $context): PipelineResultInterface
    {
        $totals = $context->getResult('totals')?->getData() ?? [];
        $coupon = $context->getContent('coupon_code');

        $discountRate = match ($coupon) {
            'SAVE10' => 0.10,
            'SAVE20' => 0.20,
            default  => 0.0,
        };

        $discount = round(($totals['total'] ?? 0) * $discountRate, 2);

        return new GenericPipelineResult(
            key: 'totals',
            data: [
                'discount' => $discount,
                'total'    => ($totals['total'] ?? 0) - $discount,
            ],
            policy: ConflictPolicy::MERGE
        );
    }
}

// ----------------------------------------------------
// โ–ถ Example usage
// ----------------------------------------------------

$context = new PipelineContext([
    'order' => [
        'id' => 123,
        'items' => [
            ['name' => 'Product A', 'price' => 100, 'qty' => 1],
            ['name' => 'Product B', 'price' => 50,  'qty' => 8],
        ],
    ],
    'coupon_code' => 'SAVE10',
]);

$runner = new PipelineRunner([
    new CalculateProductsStep(),
    new CalculateTotalsStep(),
    new ValidateOrderStep(),
    new ApplyDiscountStep(),
]);

$result = $runner->run($context);

// ----------------------------------------------------
// โ–ถ Results
// ----------------------------------------------------

$totals = $result->getResult('totals')->getData();
/*
[
    'subtotal' => 500,        // 100*1 + 50*8
    'tax' => 50,
    'discount' => 55,
    'total' => 495
]
*/

$data = $result->build();
/*
[
    'products' => [
        ['name' => 'Product A', 'price' => 100.0, 'qty' => 1, 'total' => 100.0],
        ['name' => 'Product B', 'price' => 50.0,  'qty' => 8, 'total' => 400.0],,
    ],
    'totals' => [
        'subtotal' => 500,
        'tax' => 50,
        'discount' => 55,
        'total' => 495,
    ],
    'validation' => [
        'valid' => true,
        'errors' => [],
    ],
]
*/

๐Ÿ›  Step Generator

Create pipeline steps with a single command. By default, classes are placed under App\Pipeline\Steps and saved to app/Pipeline/Steps.

php artisan make:pipeline-step EmailFormatterStep
# => app/Pipeline/Steps/EmailFormatterStep.php
# => namespace App\Pipeline\Steps;

Nested directories

php artisan make:step Order/TotalCalculationStep
# => app/Pipeline/Steps/Order/TotalCalculationStep.php
# => namespace App\Pipeline\Steps\Order;

Options

  • --policy= โ€” conflict policy (MERGE|OVERWRITE|SKIP|CUSTOM). Default: MERGE.
  • --priority= โ€” result priority (int). Default: 10.
  • --key= โ€” result key (defaults to the snake_case of the class name without the Step suffix, e.g., EmailFormatterStep โ†’ email_formatter).
  • --force โ€” overwrite the file if it already exists.

Examples:

php artisan make:pipeline-step EmailValidatorStep --policy=MERGE --priority=20
php artisan make:pipeline-step "Order/Discount/ApplyCouponStep" --key=coupon --policy=OVERWRITE

Customizing the stub

php artisan vendor:publish --provider="DataProcessingPipeline\PipelineServiceProvider" --tag=pipeline-stubs
# then edit stubs/pipeline.step.stub

๐Ÿงช Testing

composer run test

๐Ÿ“– API Reference

PipelineContext

use DataProcessingPipeline\Pipelines\Context\PipelineContext;
use DataProcessingPipeline\Pipelines\Contracts\ConflictResolverInterface;

new PipelineContext(
    array $payload,
    array $results = [],
    array $meta = [],
    ?ConflictResolverInterface $conflictResolver = null
);
// or
PipelineContext::make(
    array $payload,
    array $results = [],
    array $meta = [],
    ?ConflictResolverInterface $conflictResolver = null
);

$context->addResult(PipelineResultInterface $result): void
$context->getResult(string $key): ?PipelineResultInterface
$context->getContent(string $key, mixed $default = null): mixed
$context->hasResult(string $key): bool
$context->toArray(): array
$context->build(): array

GenericPipelineResult

use DataProcessingPipeline\Pipelines\Enums\ConflictPolicy;
use DataProcessingPipeline\Pipelines\Enums\ResultStatus;

new GenericPipelineResult(
    string $key,
    int|float|array|bool|string|null $data,
    ConflictPolicy $policy = ConflictPolicy::MERGE,
    int $priority = 10,
    string $provenance = '',
    ResultStatus $status = ResultStatus::OK,
    array $meta = []
);

GenericPipelineResult::make(
        string $key,
        int|float|array|bool|string|null $data,
        ConflictPolicy $policy = ConflictPolicy::MERGE,
        int $priority = 10,
        string $provenance = '',
        ResultStatus $status = ResultStatus::OK,
        array $meta =[],
    ): self

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

๐Ÿ“„ License

This package is open-sourced software licensed under the MIT license.

๐Ÿ™ Credits

๐Ÿ“ง Support

For support, please open an issue on GitHub

Made with โค๏ธ for the Laravel community