georgeff / pipeline
Immutable pipeline builder
1.0.0
2026-02-06 11:12 UTC
Requires
- php: ^8.2
Requires (Dev)
- phpstan/phpstan: ^2.0
- phpunit/phpunit: ^11.0
- squizlabs/php_codesniffer: ^3.11
README
An immutable pipeline library for PHP 8.2+.
Installation
composer require georgeff/pipeline
Basic Usage
use Georgeff\Pipeline\Stage; use Georgeff\Pipeline\Pipeline; $pipeline = (new Pipeline()) ->pipe(Stage::from(fn($payload) => $payload * 2)) ->pipe(Stage::from(fn($payload) => $payload + 1)); $result = $pipeline->process(5); // 11
Stages
All stages must implement StageInterface:
use Georgeff\Pipeline\StageInterface; final class MultiplyStage implements StageInterface { public function __construct(private readonly int $factor) { } public function __invoke(mixed $payload): mixed { return $payload * $this->factor; } } $pipeline = (new Pipeline()) ->pipe(new MultiplyStage(2)) ->pipe(new MultiplyStage(3)); $result = $pipeline->process(5); // 30
For simple transformations, use the Stage helper:
$pipeline = (new Pipeline()) ->pipe(Stage::from(fn($payload) => $payload * 2));
Immutability
The pipeline is immutable. Each call to pipe() returns a new instance:
$base = (new Pipeline()) ->pipe(new ValidateOrder()); $adminPipeline = $base->pipe(new AdminCheck()); $userPipeline = $base->pipe(new UserCheck()); // $base remains unchanged
Factory
Build pipelines with initial stages:
use Georgeff\Pipeline\PipelineFactory; $pipeline = PipelineFactory::build( new ValidateOrder(), new ProcessPayment(), new SendConfirmation() );
Conditional Branching
Execute different stages based on a condition:
use Georgeff\Pipeline\ConditionalStage; $pipeline = (new Pipeline()) ->pipe(new ConditionalStage( fn($order) => $order->isPriority(), new PriorityProcessing(), new StandardProcessing() ));
The false branch is optional. If omitted, the payload passes through unchanged:
$pipeline = (new Pipeline()) ->pipe(new ConditionalStage( fn($order) => $order->needsReview(), new FlagForReview() ));
Switch Branching
Select a stage based on a key:
use Georgeff\Pipeline\SwitchStage; $pipeline = (new Pipeline()) ->pipe(new SwitchStage( fn($order) => $order->getType(), [ 'digital' => new ProcessDigitalOrder(), 'physical' => new ProcessPhysicalOrder(), 'subscription' => new ProcessSubscription(), ], new ProcessUnknownOrder() // optional default ));
Early Termination
Implement StoppableInterface to halt pipeline execution:
use Georgeff\Pipeline\StageInterface; use Georgeff\Pipeline\StoppableInterface; final class ValidationStage implements StageInterface, StoppableInterface { public function __invoke(mixed $payload): mixed { $payload->validate(); return $payload; } public function shouldStop(mixed $payload): bool { return $payload->hasErrors(); } }
Nested Pipelines
Pipelines implement StageInterface, so they can be nested:
$validation = (new Pipeline()) ->pipe(new ValidateFields()) ->pipe(new ValidateBusinessRules()); $processing = (new Pipeline()) ->pipe(new CalculateTotals()) ->pipe(new ApplyDiscounts()); $pipeline = (new Pipeline()) ->pipe($validation) ->pipe($processing) ->pipe(new SaveOrder());
License
MIT