dbrans/stream-pipeline

A Stream based pipeline pattern implementation

Maintainers

Package info

github.com/danibranas/stream-pipeline-php

pkg:composer/dbrans/stream-pipeline

Statistics

Installs: 613

Dependents: 0

Suggesters: 0

Stars: 3

Open Issues: 0

2.5.1 2026-02-20 00:55 UTC

This package is auto-updated.

Last update: 2026-03-20 01:21:20 UTC


README

Build Status

Stream Pipeline

What is it?

A Stream based pipeline pattern implementation.

The Pipeline pattern uses ordered stages to process a sequence of input values. Each implemented task is represented by a stage of the pipeline. You can think of pipelines as similar to assembly lines in a factory, where each item in the assembly line is constructed in stages.

Features

  • Fluent API / Chaining
    Chain methods like map, filter, reduce in a readable, functional style.

  • Lazy evaluation
    The pipeline does not execute any operations until a terminal method like collect() or forEach() is called.
    Each element is processed one at a time through the entire chain, minimizing memory usage and allowing infinite or very large data streams.

  • Lightweight and simple
    No dependencies and easy to integrate into any project.

  • Common functional operations included
    map, filter, reduce, flatMap, distinct, limit, skip...

  • Generics support via docblocks
    Compatible with Psalm/PHPStan for static type checking.

  • Immutable style
    Each operation returns a new pipeline instance, avoiding side effects.

  • Works with any iterable
    Supports arrays, generators, or any PHP iterable.

  • Explicit terminal operations
    Methods like collect() and forEach() allow controlled consumption of data.

How it works

Stream pipeline allows you to go from writing expressions like:

// Old way
$input = [' B1 ', ' B2', 'a1 ', ' a2 ', 'a3', ' b1', ' b2', 'b3'];
$elements = [];

foreach ($input as $e) {
    $elem = strtoupper(trim($e));

    if (substr($e, 0, 1) === 'B' && !in_array($elem, $elements)) {
        $elements[] = $elem;
    }
}

return implode(',', $elements);

to writing functional expressions like:

return Stream::of(' B1 ', ' B2', 'a1 ', ' a2 ', 'a3', ' b1', ' b2', 'b3')
    ->map(Strings::trim())
    ->map(Strings::toUpper())
    ->filter(Strings::startsWith('B'))
    ->distinct()
    ->collect(Collectors::join(','));

Getting started

The library is available as a Composer package on Packagist.org.

To install in your project, just run:

composer require dbrans/stream-pipeline

After this runs successfully, you can include the main class in your application logic:

use StreamPipeline\Stream;

Usage

You can initialize an Stream to use it:

use StreamPipeline\Stream;

$arrStream = Stream::fromIterable($myArray);
// or
$arrStream = Stream::of(' B1 ', ' B2', 'a1 ', ' a2 ', 'a3', ' b1', ' b2', 'b3')

A Stream object exposes several methods to operate with its elements:

use StreamPipeline\Collectors;
use StreamPipeline\Iterators\NumberGenerator;
use StreamPipeline\Operations\Strings;

// ...

$arrStream
    ->map(Strings::trim())
    ->map(Strings::toUpper())
    ->filter(Strings::startsWith('B'))
    ->distinct()
    ->forEach(function ($e) {
        echo $e;
    });

The Stream class is immutable, so each chaining method returns a new Stream.

The execution of a Stream is lazy, so the elements are iterated just one time only when a terminal operation (forEach, reduce, toArray, collect...) is called.

Pipe operations

Each method allows a callable argument:

$arrStream
    ->filter(function ($e) {
        return $e % 2 === 0;
    })
    ->map(function ($e) {
        return $e + 10;
    })
    ->toArray();

The library exposes some common operations to better readability:

$arrStream
    ->filter(Numbers::isEven())
    ->map(Numbers::plus(10))
    ->collect(Collectors::sum());

Please see the Javadoc for more information.

Stream Methods

Initialization static operations:

  • of(...$elements): StreamInterface
  • fromIterable(iterable $collection): StreamInterface
  • iterate($initialValue, callable $stepOperation): StreamInterface

Pipe operations:

  • map(callable $operation): StreamInterface
  • filter(callable $operation): StreamInterface
  • peek(callable $operation): StreamInterface
  • tap(callable $operation): StreamInterface (alias of peek).
  • limit(int $limit): StreamInterface
  • skip(int $number): StreamInterface
  • distinct(?callable $distinctBy = null): StreamInterface
  • flatMap(?callable $operation = null): StreamInterface
  • concat(iterable $elements): StreamInterface
  • takeWhile(callable $operation): StreamInterface
  • dropWhile(callable $operation): StreamInterface

Terminal operations:

  • findFirst()
  • count(): int
  • forEach(callable $callback): void
  • anyMatch(callable $condition): bool
  • allMatch(callable $condition): bool
  • noneMatch(callable $condition): bool
  • reduce(callable $operation, $initialValue)
  • toArray(bool $preserveKeys = false): array
  • collect(?callable $collector)

All callable functions receive: function ($currentElement, $index, $originalIndex) as arguments. Example:

$arrStream
    ->map(function ($elem, $i, $index) {
        return ...
    })
    ->toArray();

Pre-defined Collectors

There are pre-defined collector functions with some common operations. You can use them with the terminal operator collect():

  • Collectors::join(string $delimiter = '')
  • Collectors::sum(?callable $mapper = null)
  • Collectors::groupBy(?callable $classifier = null, ?callable $mapper = null)
  • Collectors::groupAndReduceBy(?callable $keysMapper = null, ?callable $valuesMapper = null, ?callable $reducer = null)

For example:

Stream::of('a', 'b', 'c', 'd', 'e', 'f')
    ->limit(5)
    ->collect(Collectors::join(','));

Iterator classes

  • NumberGenerator: a number generator with an optional step.

Example:

Stream::iterate(1, new NumberGenerator(1))
    ->filter(Numbers::isEven())
    ->skip(5)
    ->limit(11);

Operation classes

  • Logical: logical operations (such as identity, true, false...).
  • Numbers: numbers operations.
  • Objects: generic objects operations.
  • Strings: string utils and functions.
  • Values: polimorphic values operations.