matchory/data-pipe

A data processing pipeline framework

0.1.0 2021-09-23 14:42 UTC

This package is auto-updated.

Last update: 2024-12-23 22:03:53 UTC


README

An opinionated framework for building data enrichment pipelines in PHP

Data Pipe is a framework to create data enrichment pipelines in PHP. Such an application works by taking a piece of information, enriching it with additional data, and enhancing that data by applying transformations on them.

As a more tangible example, take a customer pipeline: It ingests the name of a customer, retrieves their shopping history and age, then enhances the record by removing old items from the shopping history, and assigning a targeting group to the customer.

While that, of course, merely describes some arbitrary business logic, Data Pipe helps you to describe this process with a set of reusable, composable, and encapsulated steps!

Preface

Please note that this package is still under active development and NOT ready to be used in production environments yet. We're still building our own workflow on top of data-pipe, so everything is subject to change until the 1.0 release. If you're interested in shaping the future of this library, you're very welcome to jump in!

Installation

Install the library as a dependency using composer:

php composer require matchory/data-pipe

Symfony Usage

This package includes a Symfony integration. Please read the instructions to get started.
The integration will add fully automatic pipeline configuration to your app.

Laravel Usage

This package includes an incomplete Laravel integration. Please read the instructions to get started.

Note: We didn't implement Laravel support yet, because we don't currently need it. If you're interested in using data-pipe within a Laravel application, and would like to have automatic pipeline configuration as with Symfony, please open an issue.

Usage

Note: Before getting started with Data Pipe, you should familiarize yourself with its core concepts.

Data Pipe works by setting up pipelines with a pre-configured set of inter-dependent nodes. There are currently two types of nodes: Collector nodes and Transformer nodes (which are both variants of generic pipeline nodes).
Nodes take a payload object, modify and return it. Enriching nodes add new data, post-processing nodes transform existing values. This distinction might seem irrelevant, but it allows lots of runtime-optimizations.

Creating nodes

In its simplest form, an enriching node might look like this:

use Matchory\DataPipe\Nodes\AbstractCollector as Node;
use Matchory\DataPipe\PipelineContext;

class MyNode extends Node
{
    public function __construct(protected $yourInternalAgeApi) {}

    public function pipe(PipelineContext $context): PipelineContext
    {
        // Work with the data payload
        $email = $context->getPayload()->getAttribute('email');
        
        // Perform domain-specific work
        $age = $this->yourInternalAgeApi->query($email);
        
        // Update the payload
        if ($age) {
            $context->proposeChange($this, 'age', $age);
        }
        
        return $context;
    }
}

Proposing changes

Note that you cannot directly update the payload: Every node receives just a clone of the actual payload. Instead, you can propose a change to the payload. Data Pipe provides a simple algorithm for best-fit change application. This allows to keep and compare multiple values for a single attribute.

Creating pipelines

Now that we have a node, let's create a pipeline to add it to:

use Matchory\DataPipe\Payload\Payload;
use Matchory\DataPipe\Pipeline;
use Symfony\Component\EventDispatcher\EventDispatcher;

$nodes = [
    new MyNode(),
];
$eventDispatcher = new EventDispatcher();
$pipeline = new Pipeline($nodes, $eventDispatcher);

function(): Generator {
    yield new Payload([
        'email' => 'foo@bar.com'
    ]);
}

$pipeline->process(fetchNextPayload());

DI usage

This is a contrived example, of course; in reality, a dependency-injection container would handle almost everything for you:

use Matchory\DataPipe\Pipeline;

class EntryPoint {
    public function main(Pipeline $pipeline, Generator $recordFetcher): void
    {
        foreach ($recordFetcher as $record) {
            $pipeline->process($recordFetcher);
        }
    }
}

Core Concepts

Data Pipe uses a few building blocks to structure your pipelines.

Pipeline nodes

Nodes are the stages forming a pipeline. They can depend on other nodes to have been executed previously; these dependencies will be figured out before the pipeline runs, so you don't have to define an order manually. Every payload processed by the pipeline will be piped to all nodes in it, each having the option to suggest changes to the data.
There are two types of nodes currently:

Collector nodes

Nodes that enhance a record with additional information are called collector nodes. These nodes may optionally define a cost: It is used to order those nodes by cost, and determine whether executing additional nodes is even necessary.
Imagine you have two data sources -- your own, internal database, and an external system that charges per API call. The node for your database will have a lower cost than that or the external API. Now, if we're looking for a piece of information, we'll first execute the "cheaper" node (your internal database), then, only if it can't satisfy our request, we'll also execute the more expensive node.

The more nodes you have, the more apparent the advantage of granular costs will be: Information will always be acquired with the cheapest means possible.

Transformer nodes

Transformer nodes allow you to refine, modify, or compare previously gathered information. This is different from data enriching nodes, as they're typically executed after those nodes.

Best-Fit change application

The more data sources you have, the more variants of pieces of information you will collect. What's problematic is determining the best of those variants - think of an email address for example:

Depending on a few rules, you're probably able to infer which is the closest variant to what you're looking for. Now, to keep a sequence of nodes from overriding each other's results, instead of setting an attribute on the payload, they can suggest changes instead:

$context->proposeChange($this, 'attribute_name', 42);

All nodes may propose changes to existing data, along with an optional confidence score: In the email case above, for example, we'd probably have a grey-list of trashmail domains, and assign that address a low confidence score. The idea here is, take that email if nothing better can be found later on.