georgeff/pipeline

Immutable pipeline builder

Maintainers

Package info

github.com/MikeGeorgeff/pipeline

pkg:composer/georgeff/pipeline

Statistics

Installs: 9

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

1.0.0 2026-02-06 11:12 UTC

This package is auto-updated.

Last update: 2026-03-07 20:10:22 UTC


README

An immutable pipeline library for PHP 8.2+.

Installation

composer require georgeff/pipeline

Basic Usage

use Georgeff\Pipeline\Stage;
use Georgeff\Pipeline\Pipeline;

$pipeline = (new Pipeline())
    ->pipe(Stage::from(fn($payload) => $payload * 2))
    ->pipe(Stage::from(fn($payload) => $payload + 1));

$result = $pipeline->process(5); // 11

Stages

All stages must implement StageInterface:

use Georgeff\Pipeline\StageInterface;

final class MultiplyStage implements StageInterface
{
    public function __construct(private readonly int $factor)
    {
    }

    public function __invoke(mixed $payload): mixed
    {
        return $payload * $this->factor;
    }
}

$pipeline = (new Pipeline())
    ->pipe(new MultiplyStage(2))
    ->pipe(new MultiplyStage(3));

$result = $pipeline->process(5); // 30

For simple transformations, use the Stage helper:

$pipeline = (new Pipeline())
    ->pipe(Stage::from(fn($payload) => $payload * 2));

Immutability

The pipeline is immutable. Each call to pipe() returns a new instance:

$base = (new Pipeline())
    ->pipe(new ValidateOrder());

$adminPipeline = $base->pipe(new AdminCheck());
$userPipeline = $base->pipe(new UserCheck());

// $base remains unchanged

Factory

Build pipelines with initial stages:

use Georgeff\Pipeline\PipelineFactory;

$pipeline = PipelineFactory::build(
    new ValidateOrder(),
    new ProcessPayment(),
    new SendConfirmation()
);

Conditional Branching

Execute different stages based on a condition:

use Georgeff\Pipeline\ConditionalStage;

$pipeline = (new Pipeline())
    ->pipe(new ConditionalStage(
        fn($order) => $order->isPriority(),
        new PriorityProcessing(),
        new StandardProcessing()
    ));

The false branch is optional. If omitted, the payload passes through unchanged:

$pipeline = (new Pipeline())
    ->pipe(new ConditionalStage(
        fn($order) => $order->needsReview(),
        new FlagForReview()
    ));

Switch Branching

Select a stage based on a key:

use Georgeff\Pipeline\SwitchStage;

$pipeline = (new Pipeline())
    ->pipe(new SwitchStage(
        fn($order) => $order->getType(),
        [
            'digital' => new ProcessDigitalOrder(),
            'physical' => new ProcessPhysicalOrder(),
            'subscription' => new ProcessSubscription(),
        ],
        new ProcessUnknownOrder() // optional default
    ));

Early Termination

Implement StoppableInterface to halt pipeline execution:

use Georgeff\Pipeline\StageInterface;
use Georgeff\Pipeline\StoppableInterface;

final class ValidationStage implements StageInterface, StoppableInterface
{
    public function __invoke(mixed $payload): mixed
    {
        $payload->validate();
        return $payload;
    }

    public function shouldStop(mixed $payload): bool
    {
        return $payload->hasErrors();
    }
}

Nested Pipelines

Pipelines implement StageInterface, so they can be nested:

$validation = (new Pipeline())
    ->pipe(new ValidateFields())
    ->pipe(new ValidateBusinessRules());

$processing = (new Pipeline())
    ->pipe(new CalculateTotals())
    ->pipe(new ApplyDiscounts());

$pipeline = (new Pipeline())
    ->pipe($validation)
    ->pipe($processing)
    ->pipe(new SaveOrder());

License

MIT