dynamik-dev / zenpipe-php
A simple, fluent pipeline implementation for PHP.
Installs: 4
Dependents: 0
Suggesters: 0
Security: 0
Stars: 7
Watchers: 0
Forks: 1
Open Issues: 0
pkg:composer/dynamik-dev/zenpipe-php
Requires (Dev)
- laravel/pint: ^1.2
- nyholm/psr7: ^1.8
- pestphp/pest: ^3.7
- phpstan/phpstan: ^2.1
- psr/http-message: ^1.0 || ^2.0
- psr/http-server-middleware: ^1.0
Suggests
- psr/http-message: Required for PSR-15 middleware support (^1.0 || ^2.0)
- psr/http-server-middleware: Required for PSR-15 middleware support (^1.0)
This package is auto-updated.
Last update: 2025-12-02 17:57:00 UTC
README
ZenPipe is a simple and flexible PHP pipeline library that allows you to chain operations together to process, transform, or act on input.
$calculator = zenpipe() ->pipe(fn($price, $next) => $next($price * 0.8)) // 20% discount ->pipe(fn($price, $next) => $next($price * 1.1)); // add 10% tax $calculator(100); // $88 (100 -> 80 -> 88)
You can also run the pipeline on demand:
zenpipe(100) ->pipe(fn($price, $next) => $next($price * 0.8)) // 20% discount ->pipe(fn($price, $next) => $next($price * 1.1)) // add 10% tax ->process(); // 88
Sections
Requirements
- PHP 8.2 or higher
Installation
composer require dynamik-dev/zenpipe-php
Usage
Pipeline Operations
Pipeline operations are functions that take an input and return a processed value. Each operation can receive up to four parameters:
$input: The value being processed$next: A callback to pass the value to the next operation$return: (Optional) A callback to exit the pipeline early with a value$context: (Optional) A shared context object passed to all operations
Basic Operation Example
Let's build an input sanitization pipeline:
// String sanitization pipeline $sanitizer = zenpipe() ->pipe(fn($input, $next) => $next(trim($input))) ->pipe(fn($input, $next) => $next(preg_replace('/\s+/', ' ', $input))) ->pipe(fn($input, $next) => $next(strip_tags($input))) ->pipe(fn($input, $next) => $next(htmlspecialchars($input))) ->pipe(fn($input, $next) => $next(mb_convert_encoding( $input, 'UTF-8', mb_detect_encoding($input) ))); // Usage examples: $dirtyInput = " <script>alert('xss')</script> Hello World! ¥€$ "; $cleanInput = $sanitizer($dirtyInput); // Output: "Hello World! ¥€$" // Can also be run on demand: $result = zenpipe($dirtyInput) ->pipe(fn($input, $next) => $next(trim($input))) ->pipe(fn($input, $next) => $next(strip_tags($input))) ->process();
Operation with Early Return
Below is a practical example of a content moderation pipeline with early returns:
// Content moderation pipeline with early returns $moderationPipeline = zenpipe() ->pipe(function($content, $next, $return) { // Skip moderation for trusted authors if (Auth::user()->isTrusted()) { return $return([ 'status' => 'approved', 'content' => $content, 'skipped' => true ]); } return $next($content); }) ->pipe(function($content, $next, $return) { // Quick check for banned words if (containsBannedWords($content)) { return $return([ 'status' => 'rejected', 'reason' => 'prohibited_content' ]); } return $next($content); }) ->pipe(function($content, $next) { // Send to AI moderation for nuanced analysis return $next( AI::moderate($content) ); }); // Usage: $result = $moderationPipeline("Hello, world!"); // Trusted user: Immediately returns approved // Regular user: Goes through full moderation
Class Methods as Operations
You can also use class methods as operations, with the same parameter options:
class MyClass { public function validate($input, $next, $return) { if (empty($input)) { return $return('Input cannot be empty'); } return $next(strtoupper($input)); } } $pipeline = zenpipe() ->pipe([MyClass::class, 'validate']);
You can also pass an array of operations:
$pipeline = zenpipe() ->pipe([ fn($input, $next) => $next(strtoupper($input)), [MyClass::class, 'validate'] ]);
Context Passing
You can pass a shared context object to all operations using withContext(). This is useful for sharing state, configuration, or dependencies across the pipeline without threading them through the value.
// Use any object as context - your own DTO, stdClass, or array class RequestContext { public function __construct( public string $userId, public array $permissions, public array $logs = [] ) {} } $context = new RequestContext( userId: 'user-123', permissions: ['read', 'write'] ); $result = zenpipe(['action' => 'update', 'data' => [...]]) ->withContext($context) ->pipe(function ($request, $next, $return, RequestContext $ctx) { if (!in_array('write', $ctx->permissions)) { return $return(['error' => 'Unauthorized']); } $ctx->logs[] = "Permission check passed for {$ctx->userId}"; return $next($request); }) ->pipe(function ($request, $next, $return, RequestContext $ctx) { $ctx->logs[] = "Processing {$request['action']}"; return $next([...$request, 'processed_by' => $ctx->userId]); }) ->process(); // Context is mutable - logs are accumulated across operations // $context->logs = ['Permission check passed for user-123', 'Processing update']
Type hint your context parameter in the operation signature for IDE support:
/** @var ZenPipe<array, RequestContext> */ $pipeline = zenpipe() ->withContext(new RequestContext(...)) ->pipe(fn($value, $next, $return, RequestContext $ctx) => ...);
Exception Handling
Use catch() to handle exceptions gracefully without breaking the pipeline:
$result = zenpipe($userData) ->pipe(fn($data, $next) => $next(validateInput($data))) ->pipe(fn($data, $next) => $next(processPayment($data))) // might throw ->pipe(fn($data, $next) => $next(sendConfirmation($data))) ->catch(fn(Throwable $e, $originalValue) => [ 'error' => $e->getMessage(), 'input' => $originalValue, ]) ->process();
The catch handler receives:
$e: The thrown exception (Throwable)$value: The original input value passed toprocess()$context: The context set viawithContext()(null if not set)
If no catch handler is set, exceptions propagate normally.
PSR-15 Middleware
ZenPipe provides bidirectional PSR-15 middleware support. Requires psr/http-server-middleware.
Using PSR-15 Middleware in a Pipeline
Pass any MiddlewareInterface directly to pipe():
$response = zenpipe($request) ->pipe(new CorsMiddleware()) ->pipe(new AuthMiddleware()) ->pipe(fn($req, $next, $return) => $return(new Response(200))) ->process();
When using PSR-15 middleware, the pipeline must return a ResponseInterface.
Using ZenPipe as PSR-15 Middleware
Wrap a pipeline with asMiddleware() for use in PSR-15 frameworks:
$pipeline = zenpipe() ->pipe(fn($req, $next) => $next($req->withAttribute('processed', true))); $app->middleware($pipeline->asMiddleware());
Behavior:
- If the pipeline returns a
ResponseInterface, it's returned directly - If the pipeline returns a
ServerRequestInterface, it's passed to the next handler - The PSR-15 handler is available via
$context->handlerfor explicit delegation
$authPipeline = zenpipe() ->pipe(function ($req, $next, $return, $ctx) { if (!$req->hasHeader('Authorization')) { return $return(new Response(401)); } return $ctx->handler->handle($req); }); $app->middleware($authPipeline->asMiddleware());
More Examples
RAG Processes
This pipeline can be used for RAG processes, where the output of one model is used as input for another.
$ragPipeline = zenpipe() ->pipe(fn($query, $next) => $next([ 'query' => $query, 'embeddings' => OpenAI::embeddings()->create([ 'model' => 'text-embedding-3-small', 'input' => $query ])->embeddings[0]->embedding ])) ->pipe(fn($data, $next) => $next([ ...$data, 'context' => Qdrant::collection('knowledge-base') ->search($data['embeddings'], limit: 3) ->map(fn($doc) => $doc->content) ->join("\n") ])) ->pipe(fn($data, $next) => $next( OpenAI::chat()->create([ 'model' => 'gpt-4-turbo-preview', 'messages' => [ [ 'role' => 'system', 'content' => 'Answer using the provided context only.' ], [ 'role' => 'user', 'content' => "Context: {$data['context']}\n\nQuery: {$data['query']}" ] ] ])->choices[0]->message->content )); $answer = $ragPipeline("What's our refund policy?");
Email Validation with Early Return
This pipeline demonstrates early returns for email validation:
$emailValidationPipeline = zenpipe() ->pipe(function($input, $next, $return) { if (!is_string($input)) { return $return('Input must be a string'); } return $next(filter_var($input, FILTER_VALIDATE_EMAIL)); }) ->pipe(function($email, $next, $return) { if (!$email) { return $return('Invalid email format'); } $domain = substr(strrchr($email, "@"), 1); $mxhosts = []; if (!getmxrr($domain, $mxhosts)) { return $return('Domain has no valid mail servers'); } return $next(true); }); $result = $emailValidationPipeline('example@example.com'); // Returns: 'Domain has no valid mail servers' $result = $emailValidationPipeline('invalid-email'); // Returns: 'Invalid email format'
API Reference
See API Reference for details.
Contributing
See CONTRIBUTING.md for details.
License
The MIT License (MIT). See LICENSE for details.
