whitedigital-eu/etl-bundle

Extract/Transform/Load processing bundle for Symfony 6+

Installs: 242

Dependents: 0

Suggesters: 0

Security: 0

Stars: 1

Watchers: 1

Forks: 0

Open Issues: 2

Type:symfony-bundle

0.5.5 2023-11-09 10:55 UTC

README

Package for running ETL tasks.

  1. Define custom Extractors, Transformers and Loaders
  2. Create pipeline to run specific data import/export process.

Tasks can be run:

  1. from CLI (bin/console etl:run <task_name>)
  2. from services or controllers
  3. from frontend using Server Sent Events (SSE) API.

Requirements

  1. PHP 8.1+
  2. Symfony 6.2+

Install bundle

composer req "whitedigital-eu/etl-bundle"

Setup task

Example task (HorizonDataExtractor, HorizonCustomerTransformer - should be created separately)

<?php

declare(strict_types=1);

namespace App\ETL\Task;

use App\ETL\Extractor\HorizonDataExtractor;
use App\ETL\Transformer\HorizonCustomerTransformer;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Mailer\Exception\TransportExceptionInterface;
use WhiteDigital\EtlBundle\Attribute\AsTask;
use WhiteDigital\EtlBundle\Exception\EtlException;
use WhiteDigital\EtlBundle\Loader\DoctrineDbalLoader;
use WhiteDigital\EtlBundle\Task\AbstractTask;

#[AsTask(name: 'horizon_customer_import')]
class HorizonCustomerImportTask extends AbstractTask
{
    /**
     * @throws EtlException
     * @throws TransportExceptionInterface
     */
    public function runTask(OutputInterface $output, array $extractorArgs = null): void
    {
        $this->etlPipeline
            ->setOutput($output)
            ->addExtractor(HorizonDataExtractor::class, $extractorArgs ?? ['path' => '/rest/TDdmNorSar/query?columns=K.KODS,K.NOSAUK&orderby=K.NOSAUK asc'])
            ->addTransformer(HorizonCustomerTransformer::class)
            ->addLoader(DoctrineDbalLoader::class)
            ->run();
    }

}

Example Extractor:

<?php

declare(strict_types=1);

namespace App\ETL\Extractor;

use App\Service\HorizonRestApiService;
use Psr\Cache\InvalidArgumentException;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use WhiteDigital\EtlBundle\Exception\ExtractorException;
use WhiteDigital\EtlBundle\Extractor\AbstractExtractor;
use WhiteDigital\EtlBundle\Helper\Queue;

final class HorizonDataExtractor extends AbstractExtractor
{
    public function __construct(
        private readonly HorizonRestApiService $horizonRestApiService,
    )
    {
    }

    /**
     * @param \Closure|null $batchProcessor
     * @return Queue<\stdClass>
     * @throws ClientExceptionInterface
     * @throws ExtractorException
     * @throws InvalidArgumentException
     * @throws RedirectionExceptionInterface
     * @throws ServerExceptionInterface
     * @throws TransportExceptionInterface
     * @throws \JsonException
     */
    public function run(\Closure $batchProcessor = null): Queue
    {
        if (null !== $batchProcessor) {
            throw new ExtractorException(sprintf('Batch mode not supported by %s', __CLASS__));
        }
        $data = new Queue();

        $this->output->writeln(sprintf('Datu iegūšana uzsākta no avota: [%s]', $path = $this->getOption('path')));
        $rawJsonData = $this->horizonRestApiService->makeGetRequest($path);
        foreach ($rawJsonData?->collection->row as $row) {
            $data->push($row);
        }
        $this->output->writeln(sprintf('Iegūti %s ieraksti.', count($data)));

        return $data;
    }
}

Example Transformer:


Example Loader:


Console commands

  1. Run task by its name:
bin/console etl:run horizon_customer_import

or pass extra custom argument:

bin/console etl:run horizon_customer_import random_path.txt
  1. List available tasks:
bin/console etl:list