miquido/observable

Observable library

v1.0.0 2018-09-26 09:28 UTC

This package is not auto-updated.

Last update: 2024-04-19 21:28:37 UTC


README

Build Maintainability Test Coverage MIT Licence

Observable

Set of classes for data streams.

Installation

Use Composer to install the package:

composer require miquido/observable

Code Samples

Please also check miquido/csv-file-reader library for more real-life examples.

Create and subscribe to a data stream

You can start with simple Miquido\Observable\Stream\FromArray::create method.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Observer;

// create an observable stream from an array:
// $stream is an objects that implements Miquido\Observable\ObservableInterface
$stream = FromArray::create([1, 2, 3, 4, 5]);

// then you can subscribe to the stream using Observer (both parameters are optional):
$stream->subscribe(new Observer(
    function (int $i): void { /* this callback will be called 5 times with consecutive 1, 2, 3, 4, 5*/ }, 
    function (): void { /* this callback will be called once after every items in the stream will be emitted */ }
));

// alternatively - if you are only interested in items in the stream you can pass just a callback 
$stream->subscribe(function (int $i): void {
    // do something with numbers
});

Manipulate data in a stream with Operators

Operators can be useful when you want to process the data in the stream before notifying observers. Operators do not interfere with source stream, every operator returns new stream that can be subscribed independently.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);

$squareStream = $stream->pipe(new Operator\Map(function (int $i): int {
    return $i * $i;
}));

$sumStream = $stream->pipe(new Operator\Sum());
$squareSumStream = $squareStream->pipe(new Operator\Sum());
$tripleSumStream = $stream
    ->pipe(new Operator\Map(function (int $i): int {
        return $i ** 3;
    }))
    ->pipe(new Operator\Filter(function (int $i): bool {
        return $i % 3 > 0;
    }));

$squareStream->subscribe(function ($i) {}); // called 5 times with consecutive: 1, 4, 9, 16, 25
$squareSumStream->subscribe(function ($i) {}); // called once with a number 55
$sumStream->subscribe(function ($i) {}); // called once with a number 15

You can also add multiple pipes:

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;


$stream = FromArray::create([1, 2, 3, 4, 5, 6]);
$stream
    // first pipe raises each number to a power 3,
    ->pipe(new Operator\Map(function (int $i): int {
        return $i ** 3;
    }))
    // then BufferCount(3) receives stream of numbers: 1, 8, 25, 64, 125, 216
    // holds the stream until it receives 3 values, then releases array with three values
    ->pipe(new Operator\BufferCount(3))
    // next pipes receives two arrays of three numbers: [1, 8, 27], [64, 125, 216] and returns sum of each group
    ->pipe(new Operator\Map(function (array $numbers): int {
        return array_sum($numbers);
    }))
    ->subscribe(function (int $i): void {
        // subscribe() is called twice with numbers: 36, 405
    });

Using a Subject

Subject acts both as an observer and as an observable. See an example below:

<?php

use Miquido\Observable\Subject\Subject;
use Miquido\Observable\Operator;

// lets create a Subject
$words = new Subject();

// because it is an observable, you can pipe and subscribe to the data
$words
    ->pipe(new Operator\Map(function (string $word): string {
        return \strtoupper($word);
    }))
    ->subscribe(function (string $word): void {
        // receives upper cased words
    });

$words
    ->pipe(new Operator\Map('ucfirst'))
    ->pipe(new Operator\Reduce(
        function (string $sentenceInProgress, string $word) {
            return \sprintf('%s %s', $sentenceInProgress, $word);
        },
        ''
    ))
    ->pipe(new Operator\Map('trim'))
    ->subscribe(function (string $sentence): void {
        // receives a sentence of all words int the stream
    });

// And because a Subject is also an observer, you can push new items to the stream.
$words->next('lorem');
$words->next('ipsum');
$words->next('dolor');
$words->next('sit');
$words->next('amet');

// complete will send a "complete" notification to all observers and will remove observers from the subject
$words->complete();

/**
 * In this example:
 * - first subscriber will receive 5 items: 'LOREM', 'IPSUM', 'DOLOR', 'SIT' and 'AMET'
 * - second subscriber will recive one item: 'Lorem Ipsum Dolor Sit Amet'
 */

List of build-in operators

ArrayCount operator

Transforms array item to number with count value.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([
    [1, 2],
    [3, 4, 5] 
]);
$stream
    ->pipe(new Operator\ArrayCount())
    ->subscribe(function (int $count): void {
        // called twice with values: 2 and 3
    });

BufferCount operator

Groups individual items into an array of provided size.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5, 6]);
$stream
    ->pipe(new Operator\BufferCount(3))
    ->subscribe(function (array $values): void {
        // called twice with values: [1, 2, 3] and [4, 5, 6]
    });

BufferUniqueCount operator

Similar to BufferCount, but removes duplications.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 1, 2, 1, 3, 4, 5, 5, 6]);
$stream
    ->pipe(new Operator\BufferUniqueCount(3))
    ->subscribe(function (array $values): void {
        // called twice with values: [1, 2, 3] and [4, 5, 6]
    });

Count operator

Count all items emitted into the stream.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 1, 2, 1, 3, 4, 5, 5, 6]);
$stream
    ->pipe(new Operator\Count())
    ->subscribe(function (int $count): void {
        // called once with value: 9
    });

Filter operator

Removes all values for which provided callback returns false.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Filter(function (int $number): bool {
        return $number % 2 === 0;
    }))
    ->subscribe(function (int $number): void {
        // called twice with values: 2, 4
    });

Flat operator

If item in a stream is an array, Flat converts this array into set of individual items.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([
    [1, 2],
    [3, 4, 5]
]);
$stream
    ->pipe(new Operator\Flat())
    ->subscribe(function (int $number): void {
        // called 5 times with values: 1, 2, 3, 4, 5
        var_dump($number);
    });

Let operator

Does nothing, do the stream, just fires provided callback for every item in the stream and returns unchanged value.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Let(function (int $number): void {
        // do something with this number, no need to return anything
    }))
    ->subscribe(function (int $number): void {
        // called 5 times with values: 1, 2, 3, 4, 5
        var_dump($number);
    });

Map operator

Transform each item in the stream into new value.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create(['lorem', 'ipsum', 'dolor', 'sit', 'amet']);
$stream
    ->pipe(new Operator\Map(function (string $word): int {
        return \strlen($word);
    }))
    ->subscribe(function (int $length): void {
        // called 5 times with values: 5, 5, 5, 3, 5
    });

Reduce operator

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Reduce(
        function (int $sum, int $number): int {
            return $sum + $number;
        },
        0
    ))
    ->subscribe(function (int $sum): void {
        // called once with value 15
    });

Scan operator

Like Reduce, but observer receives a value after each Scan call.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Scan(
        function (int $sum, int $number): int {
            return $sum + $number;
        },
        0
    ))
    ->subscribe(function (int $sum): void {
        // called 5 times with values: 1, 3, 6, 10, 15
    });

Sum operator

Sums all items in the stream.

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Sum())
    ->subscribe(function (int $sum): void {
        // called once with value 15
    });

Contributing

Pull requests, bug fixes and issue reports are welcome. Before proposing a change, please discuss your change by raising an issue.