dynamik-dev/zenpipe-php

A simple, fluent pipeline implementation for PHP.

Installs: 4

Dependents: 0

Suggesters: 0

Security: 0

Stars: 7

Watchers: 0

Forks: 1

Open Issues: 0

pkg:composer/dynamik-dev/zenpipe-php

v0.2.0 2025-12-02 17:54 UTC

README

GitHub Actions BuyMeACoffee

ZenPipe Logo

ZenPipe is a simple and flexible PHP pipeline library that allows you to chain operations together to process, transform, or act on input.

$calculator = zenpipe()
   ->pipe(fn($price, $next) => $next($price * 0.8)) // 20% discount
   ->pipe(fn($price, $next) => $next($price * 1.1)); // add 10% tax

$calculator(100); // $88 (100 -> 80 -> 88)

You can also run the pipeline on demand:

zenpipe(100)
   ->pipe(fn($price, $next) => $next($price * 0.8)) // 20% discount
   ->pipe(fn($price, $next) => $next($price * 1.1)) // add 10% tax
   ->process(); // 88

Sections

  1. ZenPipe
  2. Requirements
  3. Installation
  4. Usage
  5. API Reference
  6. Contributing
  7. License

Requirements

  • PHP 8.2 or higher

Installation

composer require dynamik-dev/zenpipe-php

Usage

Pipeline Operations

Pipeline operations are functions that take an input and return a processed value. Each operation can receive up to four parameters:

  • $input: The value being processed
  • $next: A callback to pass the value to the next operation
  • $return: (Optional) A callback to exit the pipeline early with a value
  • $context: (Optional) A shared context object passed to all operations

Basic Operation Example

Let's build an input sanitization pipeline:

// String sanitization pipeline
$sanitizer = zenpipe()
    ->pipe(fn($input, $next) => $next(trim($input)))
    ->pipe(fn($input, $next) => $next(preg_replace('/\s+/', ' ', $input)))
    ->pipe(fn($input, $next) => $next(strip_tags($input)))
    ->pipe(fn($input, $next) => $next(htmlspecialchars($input)))
    ->pipe(fn($input, $next) => $next(mb_convert_encoding(
        $input, 'UTF-8', mb_detect_encoding($input)
    )));

// Usage examples:
$dirtyInput = "  <script>alert('xss')</script>  Hello   World! ¥€$ ";
$cleanInput = $sanitizer($dirtyInput);
// Output: "Hello World! ¥€$"

// Can also be run on demand:
$result = zenpipe($dirtyInput)
    ->pipe(fn($input, $next) => $next(trim($input)))
    ->pipe(fn($input, $next) => $next(strip_tags($input)))
    ->process();

Operation with Early Return

Below is a practical example of a content moderation pipeline with early returns:

// Content moderation pipeline with early returns
$moderationPipeline = zenpipe()
    ->pipe(function($content, $next, $return) {
        // Skip moderation for trusted authors
        if (Auth::user()->isTrusted()) {
            return $return([
                'status' => 'approved',
                'content' => $content,
                'skipped' => true
            ]);
        }
        return $next($content);
    })
    ->pipe(function($content, $next, $return) {
        // Quick check for banned words
        if (containsBannedWords($content)) {
            return $return([
                'status' => 'rejected',
                'reason' => 'prohibited_content'
            ]);
        }
        return $next($content);
    })
    ->pipe(function($content, $next) {
        // Send to AI moderation for nuanced analysis
        return $next(
            AI::moderate($content)
        );
    });

// Usage:
$result = $moderationPipeline("Hello, world!");
// Trusted user: Immediately returns approved
// Regular user: Goes through full moderation

Class Methods as Operations

You can also use class methods as operations, with the same parameter options:

class MyClass
{
    public function validate($input, $next, $return)
    {
        if (empty($input)) {
            return $return('Input cannot be empty');
        }
        return $next(strtoupper($input));
    }
}

$pipeline = zenpipe()
   ->pipe([MyClass::class, 'validate']);

You can also pass an array of operations:

$pipeline = zenpipe()
   ->pipe([
        fn($input, $next) => $next(strtoupper($input)),
        [MyClass::class, 'validate']
    ]);

Context Passing

You can pass a shared context object to all operations using withContext(). This is useful for sharing state, configuration, or dependencies across the pipeline without threading them through the value.

// Use any object as context - your own DTO, stdClass, or array
class RequestContext
{
    public function __construct(
        public string $userId,
        public array $permissions,
        public array $logs = []
    ) {}
}

$context = new RequestContext(
    userId: 'user-123',
    permissions: ['read', 'write']
);

$result = zenpipe(['action' => 'update', 'data' => [...]])
    ->withContext($context)
    ->pipe(function ($request, $next, $return, RequestContext $ctx) {
        if (!in_array('write', $ctx->permissions)) {
            return $return(['error' => 'Unauthorized']);
        }
        $ctx->logs[] = "Permission check passed for {$ctx->userId}";
        return $next($request);
    })
    ->pipe(function ($request, $next, $return, RequestContext $ctx) {
        $ctx->logs[] = "Processing {$request['action']}";
        return $next([...$request, 'processed_by' => $ctx->userId]);
    })
    ->process();

// Context is mutable - logs are accumulated across operations
// $context->logs = ['Permission check passed for user-123', 'Processing update']

Type hint your context parameter in the operation signature for IDE support:

/** @var ZenPipe<array, RequestContext> */
$pipeline = zenpipe()
    ->withContext(new RequestContext(...))
    ->pipe(fn($value, $next, $return, RequestContext $ctx) => ...);

Exception Handling

Use catch() to handle exceptions gracefully without breaking the pipeline:

$result = zenpipe($userData)
    ->pipe(fn($data, $next) => $next(validateInput($data)))
    ->pipe(fn($data, $next) => $next(processPayment($data))) // might throw
    ->pipe(fn($data, $next) => $next(sendConfirmation($data)))
    ->catch(fn(Throwable $e, $originalValue) => [
        'error' => $e->getMessage(),
        'input' => $originalValue,
    ])
    ->process();

The catch handler receives:

  • $e: The thrown exception (Throwable)
  • $value: The original input value passed to process()
  • $context: The context set via withContext() (null if not set)

If no catch handler is set, exceptions propagate normally.

PSR-15 Middleware

ZenPipe provides bidirectional PSR-15 middleware support. Requires psr/http-server-middleware.

Using PSR-15 Middleware in a Pipeline

Pass any MiddlewareInterface directly to pipe():

$response = zenpipe($request)
    ->pipe(new CorsMiddleware())
    ->pipe(new AuthMiddleware())
    ->pipe(fn($req, $next, $return) => $return(new Response(200)))
    ->process();

When using PSR-15 middleware, the pipeline must return a ResponseInterface.

Using ZenPipe as PSR-15 Middleware

Wrap a pipeline with asMiddleware() for use in PSR-15 frameworks:

$pipeline = zenpipe()
    ->pipe(fn($req, $next) => $next($req->withAttribute('processed', true)));

$app->middleware($pipeline->asMiddleware());

Behavior:

  • If the pipeline returns a ResponseInterface, it's returned directly
  • If the pipeline returns a ServerRequestInterface, it's passed to the next handler
  • The PSR-15 handler is available via $context->handler for explicit delegation
$authPipeline = zenpipe()
    ->pipe(function ($req, $next, $return, $ctx) {
        if (!$req->hasHeader('Authorization')) {
            return $return(new Response(401));
        }
        return $ctx->handler->handle($req);
    });

$app->middleware($authPipeline->asMiddleware());

More Examples

RAG Processes

This pipeline can be used for RAG processes, where the output of one model is used as input for another.

$ragPipeline = zenpipe()
    ->pipe(fn($query, $next) => $next([
        'query' => $query,
        'embeddings' => OpenAI::embeddings()->create([
            'model' => 'text-embedding-3-small',
            'input' => $query
        ])->embeddings[0]->embedding
    ]))
    ->pipe(fn($data, $next) => $next([
        ...$data,
        'context' => Qdrant::collection('knowledge-base')
            ->search($data['embeddings'], limit: 3)
            ->map(fn($doc) => $doc->content)
            ->join("\n")
    ]))
    ->pipe(fn($data, $next) => $next(
        OpenAI::chat()->create([
            'model' => 'gpt-4-turbo-preview',
            'messages' => [
                [
                    'role' => 'system',
                    'content' => 'Answer using the provided context only.'
                ],
                [
                    'role' => 'user',
                    'content' => "Context: {$data['context']}\n\nQuery: {$data['query']}"
                ]
            ]
        ])->choices[0]->message->content
    ));

$answer = $ragPipeline("What's our refund policy?");

Email Validation with Early Return

This pipeline demonstrates early returns for email validation:

$emailValidationPipeline = zenpipe()
    ->pipe(function($input, $next, $return) {
        if (!is_string($input)) {
            return $return('Input must be a string');
        }
        return $next(filter_var($input, FILTER_VALIDATE_EMAIL));
    })
    ->pipe(function($email, $next, $return) {
        if (!$email) {
            return $return('Invalid email format');
        }

        $domain = substr(strrchr($email, "@"), 1);
        $mxhosts = [];

        if (!getmxrr($domain, $mxhosts)) {
            return $return('Domain has no valid mail servers');
        }

        return $next(true);
    });

$result = $emailValidationPipeline('example@example.com');
// Returns: 'Domain has no valid mail servers'

$result = $emailValidationPipeline('invalid-email');
// Returns: 'Invalid email format'

API Reference

See API Reference for details.

Contributing

See CONTRIBUTING.md for details.

License

The MIT License (MIT). See LICENSE for details.