boutdecode/etl-core-bundle

Symfony Bundle providing a configurable ETL (Extract/Transform/Load) pipeline engine with CQS, scheduling and workflow support.

Maintainers

Package info

github.com/boutdecode/etl-core

Type:symfony-bundle

pkg:composer/boutdecode/etl-core-bundle

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

This package is auto-updated.

Last update: 2026-03-17 20:32:37 UTC


README

A Symfony Bundle providing a configurable ETL (Extract / Transform / Load) pipeline engine built on top of Domain-Driven Design, CQS, Symfony Messenger, Symfony Workflow and Flow-PHP.

PHP Version Symfony License

Requirements

Dependency Version
PHP >= 8.2
Symfony ^6.4 || ^7.0
Doctrine ORM ^3.6
Flow-PHP ETL ~0.25

Concepts

ETL — Extract, Transform, Load

ETL is a data processing pattern split into three sequential stages:

Stage Role
Extract Read raw data from a source (CSV file, API, database, …)
Transform Filter, map, enrich or validate the extracted data
Load Write the processed data to a destination (database, JSON file, …)

Each stage is implemented as a Step — a single, focused unit of work. Steps are chained together so the output of one becomes the input of the next, flowing through a shared Context object.

Workflow vs Pipeline

These two terms look similar but represent fundamentally different things in this bundle:

Workflow — the reusable template

A Workflow is a named, static definition that describes what should happen:

  • the ordered list of steps to execute (stepConfiguration), each identified by a code that maps to a registered ExecutableStep service
  • the default configuration for each step
  • global options (timeout, retry policy, …) via configuration

A Workflow has no notion of time, data, or execution state. It never runs by itself. Think of it as a class or a recipe.

Pipeline — the execution instance

A Pipeline is a concrete, time-bound instance created from a Workflow. It represents one specific run:

  • it holds the actual input data for that run (e.g. path to the file to import)
  • it may override the step configuration for that run specifically
  • it carries a status (pendingin_progresscompleted / failed) managed by a Symfony Workflow state machine
  • it records timestamps (scheduledAt, startedAt, finishedAt)

Think of it as an object instantiated from a class — or a ticket raised against a recipe.

Workflow  ──createFromWorkflowId()──►  Pipeline  ──dispatch()──►  execution
(template, reusable)                   (instance, stateful)        (runtime)

Step — configuration vs execution

The same word "step" covers two distinct things:

Concept Where Role
Step (config) Stored with the Pipeline Carries code, order, and per-step configuration. A value object — no logic.
ExecutableStep (service) Symfony DI container Implements the actual ETL logic in process(Context). Tagged boutdecode_etl_core.executable_step.

At runtime the StepResolver bridges the two: it looks up the ExecutableStep service whose tag matches Step::getCode(), clones it, applies the step configuration, and hands it to the execution chain.

Installation

composer require boutdecode/etl-core-bundle

If you are not using Symfony Flex, register the bundle manually:

// config/bundles.php
return [
    // ...
    BoutDeCode\ETLCoreBundle\BoutDeCodeETLCoreBundle::class => ['all' => true],
];
// config/packages/boutdecode_etl_core.yaml

imports:
    - { resource: "@BoutDeCodeETLCoreBundle/Resources/config/config.yaml" }

Configuration

No configuration is required. The bundle works out of the box with sensible defaults.

The bundle exposes no configurable keys under boutdecode_etl_core: — all service IDs, tags, and bus names are fixed constants defined by the bundle itself:

Constant Value
Command bus boutdecode_etl_core.command.bus
Query bus boutdecode_etl_core.query.bus
Executable step tag boutdecode_etl_core.executable_step
Step middleware tag boutdecode_etl_core.step_middleware
Pipeline middleware tag boutdecode_etl_core.pipeline_middleware

Data

Entities

The bundle does not ship Doctrine entities. You must create them in your application and then generate the migrations.

The bundle provides abstract base classes to extend and interfaces to implement:

What to create Extends Implements
Workflow entity AbstractWorkflow
Step entity AbstractStep
Pipeline entity AbstractPipeline
StepHistory entity AbstractStepHistory
PipelineHistory entity AbstractPipelineHistory

Each abstract class holds all the typed properties and method implementations. The only thing left to add in the concrete entity is:

  • A Doctrine #[ORM\Entity] / #[ORM\Table] mapping.
  • An $id property with its getter (getId(): string), except for Step and history entities where you may choose any PK strategy.
  • The ORM column/relation mappings on the inherited properties (use #[ORM\Column] etc. directly in the child class).

Example — minimal entity set

// src/Entity/Workflow.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractWorkflow;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'workflow')]
class Workflow extends AbstractWorkflow
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\Column]
    protected string $name;

    #[ORM\Column(nullable: true)]
    protected ?string $description = null;

    #[ORM\Column(type: 'json')]
    protected array $stepConfiguration = [];

    #[ORM\Column(type: 'json')]
    protected array $configuration = [];

    #[ORM\Column]
    protected \DateTimeImmutable $createdAt;

    #[ORM\Column(nullable: true)]
    protected ?\DateTimeImmutable $updatedAt = null;

    public function __construct(string $name)
    {
        $this->id = (string) Uuid::v7();
        $this->name = $name;
        $this->createdAt = new \DateTimeImmutable();
        $this->stepConfiguration = [];
        $this->configuration = [];
    }

    public function getId(): string
    {
        return $this->id;
    }
}
// src/Entity/Step.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractStep;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'step')]
class Step extends AbstractStep
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\ManyToOne(targetEntity: Workflow::class)]
    #[ORM\JoinColumn(nullable: false)]
    private Workflow $workflow;

    #[ORM\Column(nullable: true)]
    protected ?string $name = null;

    #[ORM\Column]
    protected string $code;

    #[ORM\Column(type: 'json')]
    protected array $configuration = [];

    #[ORM\Column]
    protected int $order = 0;

    public function __construct(string $code, Workflow $workflow)
    {
        $this->id = (string) Uuid::v7();
        $this->code = $code;
        $this->workflow = $workflow;
    }

    public function getId(): string
    {
        return $this->id;
    }
}
// src/Entity/Pipeline.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractPipeline;
use BoutDeCode\ETLCoreBundle\Core\Domain\Enum\PipelineStatus;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Collections\Collection;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'pipeline')]
class Pipeline extends AbstractPipeline
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\ManyToOne(targetEntity: Workflow::class)]
    #[ORM\JoinColumn(nullable: false)]
    protected Workflow $workflow;

    #[ORM\OneToMany(targetEntity: Step::class, mappedBy: 'pipeline', cascade: ['persist'])]
    #[ORM\OrderBy(['order' => 'ASC'])]
    protected iterable $steps;

    #[ORM\Column(type: 'json')]
    protected array $configuration = [];

    #[ORM\Column(type: 'json')]
    protected array $input = [];

    #[ORM\Column(enumType: PipelineStatus::class)]
    protected PipelineStatus $status;

    #[ORM\Column]
    protected \DateTimeImmutable $createdAt;

    #[ORM\Column(nullable: true)]
    protected ?\DateTimeImmutable $scheduledAt = null;

    #[ORM\Column(nullable: true)]
    protected ?\DateTimeImmutable $startedAt = null;

    #[ORM\Column(nullable: true)]
    protected ?\DateTimeImmutable $finishedAt = null;

    public function __construct(Workflow $workflow)
    {
        $this->id = (string) Uuid::v7();
        $this->workflow = $workflow;
        $this->status = PipelineStatus::PENDING;
        $this->createdAt = new \DateTimeImmutable();
        $this->steps = new ArrayCollection();
        $this->runnableSteps = new ArrayCollection();
    }

    public function getId(): string
    {
        return $this->id;
    }
}
// src/Entity/StepHistory.php
use BoutDeCode\ETLCoreBundle\Run\Domain\Model\AbstractStepHistory;
use BoutDeCode\ETLCoreBundle\Run\Domain\Enum\StepHistoryStatusEnum;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'step_history')]
class StepHistory extends AbstractStepHistory
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\Column(enumType: StepHistoryStatusEnum::class)]
    protected StepHistoryStatusEnum $status;

    #[ORM\Column]
    protected \DateTimeImmutable $createdAt;

    #[ORM\Column(type: 'json', nullable: true)]
    protected mixed $input = null;

    #[ORM\Column(type: 'json', nullable: true)]
    protected mixed $result = null;

    public function __construct(StepHistoryStatusEnum $status, mixed $input, mixed $result)
    {
        $this->id = (string) Uuid::v7();
        $this->status = $status;
        $this->createdAt = new \DateTimeImmutable();
        $this->input = $input;
        $this->result = $result;
    }

    public function getId(): string
    {
        return $this->id;
    }
}
// src/Entity/PipelineHistory.php
use BoutDeCode\ETLCoreBundle\Run\Domain\Model\AbstractPipelineHistory;
use BoutDeCode\ETLCoreBundle\Run\Domain\Enum\PipelineHistoryStatusEnum;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Pipeline as PipelineInterface;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;

#[ORM\Entity]
#[ORM\Table(name: 'pipeline_history')]
class PipelineHistory extends AbstractPipelineHistory
{
    #[ORM\Id]
    #[ORM\Column(type: 'uuid', unique: true)]
    private string $id;

    #[ORM\ManyToOne(targetEntity: Pipeline::class)]
    #[ORM\JoinColumn(nullable: false)]
    protected PipelineInterface $pipeline;

    #[ORM\Column(enumType: PipelineHistoryStatusEnum::class)]
    protected PipelineHistoryStatusEnum $status;

    #[ORM\Column]
    protected \DateTimeImmutable $createdAt;

    #[ORM\Column(type: 'json', nullable: true)]
    protected mixed $input = null;

    #[ORM\Column(type: 'json', nullable: true)]
    protected mixed $result = null;

    #[ORM\OneToMany(targetEntity: StepHistory::class, mappedBy: 'pipelineHistory', cascade: ['persist'])]
    protected iterable $stepHistories;

    public function __construct(PipelineInterface $pipeline, PipelineHistoryStatusEnum $status, mixed $input, mixed $result)
    {
        $this->id = (string) Uuid::v7();
        $this->pipeline = $pipeline;
        $this->status = $status;
        $this->createdAt = new \DateTimeImmutable();
        $this->input = $input;
        $this->result = $result;
        $this->stepHistories = new ArrayCollection();
    }

    public function getId(): string
    {
        return $this->id;
    }
}

Migrations

Once all entities are created, generate and run the Doctrine migrations:

php bin/console doctrine:migrations:diff
php bin/console doctrine:migrations:migrate

Architecture

src/
├── ETLCoreBundle.php               # Bundle entry point
├── DependencyInjection/
│   ├── ETLCoreExtension.php        # Loads services, exposes config parameters
│   └── Configuration.php           # Config tree (boutdecode_etl_core:)
├── Resources/config/
│   ├── services.yaml               # Service definitions & tagged iterators
│   ├── config.yaml                 # Root import (messenger + workflow)
│   └── packages/
│       ├── messenger.yaml          # Buses & routing
│       └── workflow.yaml           # pipeline_lifecycle state machine
├── Core/                           # Central domain (Pipeline, Step, Context)
├── ETL/                            # ETL logic (Extract, Transform, Load)
├── Run/                            # Execution engine & middleware
└── CQS/                            # Command / Query Separation

Key patterns

Pattern Where
Domain-Driven Design */Domain/ layers
CQS (Command / Query Separation) src/CQS/
Middleware chain Run/Domain/Middleware/
Strategy (pluggable steps) ETL/Domain/Model/ + ExecutableStep tag
State machine pipeline_lifecycle Symfony Workflow

Implementing a Custom Step

1. Declare the step with #[AsExecutableStep]

Every step class must carry the #[AsExecutableStep] attribute. It serves two purposes:

  • code — the unique machine identifier used to resolve the step at runtime (e.g. when a Workflow references a step by its code). It must be unique across the whole application.
  • configurationDescription (optional) — a map of configuration key → human-readable description, returned by getConfigurationDescription(). Useful for documentation and introspection.
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractExtractorStep;

#[AsExecutableStep(
    code: 'app.extractor.my_csv',
    configurationDescription: [
        'source'    => 'Absolute path to the CSV file',
        'delimiter' => 'Field delimiter character (default: ",")',
    ],
)]
final class MyCsvExtractorStep extends AbstractExtractorStep
{
    // …
}

2. Implement the stage method

Extend one of the three abstract base classes and implement the corresponding method. The Context $context parameter is always injected by the framework — you do not need to call process() yourself.

Base class Method to implement Signature
AbstractExtractorStep extract() extract(mixed $source, Context $context, array $configuration = []): mixed
AbstractTransformerStep transform() transform(mixed $data, Context $context, array $configuration = []): mixed
AbstractLoaderStep load() load(mixed $data, mixed $destination, Context $context, array $configuration = []): mixed

The $configuration array is automatically populated from the step's entry in the pipeline configuration. Default values can also be injected via the constructor and stored in $this->configuration.

Extractor example

use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractExtractorStep;

#[AsExecutableStep(
    code: 'app.extractor.my_csv',
    configurationDescription: [
        'source'    => 'Absolute path to the CSV file',
        'delimiter' => 'Field delimiter character (default: ",")',
    ],
)]
final class MyCsvExtractorStep extends AbstractExtractorStep
{
    public function __construct(
        private readonly string $delimiter = ',',
    ) {}

    public function extract(mixed $source, Context $context, array $configuration = []): array
    {
        $filePath  = is_string($source) ? $source : ($configuration['source'] ?? '');
        $delimiter = $configuration['delimiter'] ?? $this->delimiter;

        // … read and return rows …
        return [];
    }
}

Transformer example

use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractTransformerStep;

#[AsExecutableStep(
    code: 'app.transformer.uppercase_name',
    configurationDescription: [
        'field' => 'Name of the field to uppercase (default: "name")',
    ],
)]
final class UppercaseNameTransformStep extends AbstractTransformerStep
{
    public function transform(mixed $data, Context $context, array $configuration = []): mixed
    {
        if (! is_array($data)) {
            return $data;
        }

        $field = $configuration['field'] ?? 'name';

        return array_map(static function (array $row) use ($field): array {
            if (isset($row[$field]) && is_string($row[$field])) {
                $row[$field] = strtoupper($row[$field]);
            }

            return $row;
        }, $data);
    }
}

Loader example

use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractLoaderStep;

#[AsExecutableStep(
    code: 'app.loader.csv_file',
    configurationDescription: [
        'destination' => 'Absolute path to the output CSV file',
    ],
)]
final class CsvFileLoadStep extends AbstractLoaderStep
{
    public function load(mixed $data, mixed $destination, Context $context, array $configuration = []): bool
    {
        if (! is_string($destination)) {
            throw new \InvalidArgumentException('Destination must be a file path string.');
        }

        // … write $data to $destination …
        return true;
    }
}

3. Register the step

All classes that implement ExecutableStep (which all three abstract base classes do) are automatically tagged boutdecode_etl_core.executable_step via _instanceof — no manual service configuration is required as long as your class is picked up by Symfony's autowiring.

# config/services.yaml  (standard Symfony service autodiscovery — nothing extra needed)
App\ETL\Step\:
    resource: '../src/ETL/Step/'
    autowire: true
    autoconfigure: true

If for any reason you need to register a step explicitly:

# config/services.yaml
App\ETL\Step\MyCsvExtractorStep:
    tags:
        - { name: boutdecode_etl_core.executable_step }

4. Reference the step in a Workflow

Use the code declared in #[AsExecutableStep] as the step identifier in your Workflow's stepConfiguration:

$workflow->setStepConfiguration([
    [
        'code'          => 'app.extractor.my_csv',
        'name'          => 'extract_customers',
        'order'         => 1,
        'configuration' => [
            'source'    => '/data/customers.csv',
            'delimiter' => ';',
        ],
    ],
    [
        'code'          => 'app.transformer.uppercase_name',
        'name'          => 'normalize_names',
        'order'         => 2,
        'configuration' => [
            'field' => 'name',
        ],
    ],
    [
        'code'          => 'app.loader.csv_file',
        'name'          => 'save_result',
        'order'         => 3,
        'configuration' => [
            'destination' => '/output/result.csv',
        ],
    ],
]);

At runtime, the StepResolver reads each step's code, finds the matching tagged service in the container, and injects the per-step configuration before execution.

CQS — Commands & Queries

Dispatching a command

Inject CommandBus and call dispatch() with any object implementing Command:

use BoutDeCode\ETLCoreBundle\CQS\Application\Operation\Command\CommandBus;

class MyService
{
    public function __construct(private readonly CommandBus $commandBus) {}

    public function doSomething(): void
    {
        $this->commandBus->dispatch(new MyCommand(/* ... */));
    }
}

Running a pipeline from a Workflow

The bundle ships one built-in command: ExecuteWorkflowCommand. It takes a persisted pipeline ID and triggers the full middleware chain asynchronously.

Step 1 — Implement PipelineFactory

The bundle provides the PipelineFactory interface but no concrete implementation — you must provide one (typically a Doctrine-backed service):

// src/Factory/PipelineFactory.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Factory\PipelineFactory as PipelineFactoryInterface;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Pipeline;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Step;
use BoutDeCode\ETLCoreBundle\Core\Domain\Data\Provider\WorkflowProvider;
use BoutDeCode\ETLCoreBundle\Core\Domain\Data\Persister\PipelinePersister;

final class PipelineFactory implements PipelineFactoryInterface
{
    public function __construct(
        private readonly WorkflowProvider $workflowProvider,
        private readonly PipelinePersister $pipelinePersister,
    ) {}

    public function create(array $steps = [], array $configuration = []): Pipeline
    {
        // build a Pipeline from a list of Step objects
        // ...
    }

    /**
     * @param array<string, mixed> $overrideConfiguration
     * @param array<string, mixed> $input
     */
    public function createFromWorkflowId(
        string $workflowId,
        array $overrideConfiguration = [],
        array $input = [],
    ): Pipeline {
        $workflow = $this->workflowProvider->findWorkflowByIdentifier($workflowId);

        // build Pipeline from Workflow steps & config, then persist it
        $pipeline = new \App\Entity\Pipeline($workflow);
        // ... populate steps, configuration, input ...

        return $this->pipelinePersister->create($pipeline);
    }
}

The bundle's DataInterfaceAliasPass compiler pass automatically creates the DI alias as soon as your class is registered as a service — no manual wiring needed.

Step 2 — Create and persist the Pipeline

use BoutDeCode\ETLCoreBundle\Core\Domain\Factory\PipelineFactory;

final class StartImportHandler
{
    public function __construct(
        private readonly PipelineFactory $pipelineFactory,
    ) {}

    public function handle(string $workflowId): string
    {
        $pipeline = $this->pipelineFactory->createFromWorkflowId(
            workflowId: $workflowId,
            overrideConfiguration: [
                'extract_step' => ['file' => '/data/import.csv'],
            ],
            input: ['source' => 'manual'],
        );

        // Pipeline is now persisted with PipelineStatus::PENDING
        return $pipeline->getId();
    }
}

Step 3 — Dispatch the execution command

use BoutDeCode\ETLCoreBundle\CQS\Application\Operation\Command\CommandBus;
use BoutDeCode\ETLCoreBundle\Run\Application\Operation\Command\ExecuteWorkflowCommand;

final class StartImportHandler
{
    public function __construct(
        private readonly PipelineFactory $pipelineFactory,
        private readonly CommandBus $commandBus,
    ) {}

    public function handle(string $workflowId): void
    {
        $pipeline = $this->pipelineFactory->createFromWorkflowId(
            workflowId: $workflowId,
            input: ['source' => 'manual'],
        );

        // ExecuteWorkflowCommand implements AsyncCommand:
        // routed to an async Messenger transport if one is configured,
        // otherwise handled synchronously.
        $this->commandBus->dispatch(
            new ExecuteWorkflowCommand(pipelineId: $pipeline->getId())
        );
    }
}

Note: ExecuteWorkflowCommand implements AsyncCommand. If you configure a Symfony Messenger transport for the async routing key the execution will be deferred to a worker. The pipeline must be in PipelineStatus::PENDING — if it is already IN_PROGRESS, COMPLETED, or FAILED the handler returns silently without re-running it.

Reading the results

CommandBus::dispatch() returns the value produced by the handler (Context). You can inspect the results directly when running synchronously:

use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;

/** @var Context $context */
$context = $this->commandBus->dispatch(
    new ExecuteWorkflowCommand(pipelineId: $pipeline->getId())
);

// Last result produced by the pipeline
$result = $context->getResult();

// Result keyed by step name
$extracted = $context->getResultByKey('extract_step');

// Check for step failures
$errors = $context->getErrors(); // array<string, mixed>

Adding Custom Middleware

Pipeline middleware

use BoutDeCode\ETLCoreBundle\Run\Domain\Middleware\Middleware;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Context;

final class AuditPipelineMiddleware implements Middleware
{
    public function process(Context $context, callable $next): Context
    {
        // before
        $result = $next($context);
        // after
        return $result;
    }
}
# config/services.yaml
App\Middleware\AuditPipelineMiddleware:
    tags:
        - { name: boutdecode_etl_core.pipeline_middleware, priority: 50 }

Step middleware

Same pattern, tag name: boutdecode_etl_core.step_middleware.

Built-in middleware priority reference:

Middleware Tag Priority
PipelineStartMiddleware pipeline 100
PipelineFailureMiddleware pipeline 1
PipelineProcessMiddleware pipeline 0
PipelineHistoryMiddleware pipeline -50
PipelineSuccessMiddleware pipeline -100
StepStartMiddleware step 100
StepFailureMiddleware step 1
StepProcessMiddleware step 0
StepHistoryMiddleware step -50
StepSuccessMiddleware step -100

Testing

# All tests
composer test

# Unit tests only
composer test:unit

# Integration tests only
composer test:integration

Current status: 394 unit tests, 3 integration tests — all passing.

License

MIT — see LICENSE.

Built with ❤️ by Boutdecode