whitedigital-eu / etl-bundle
Extract/Transform/Load processing bundle for Symfony 6+
Installs: 358
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 1
Forks: 0
Open Issues: 2
Type:symfony-bundle
Requires
- php: >=8.1
- doctrine/collections: *
- doctrine/dbal: *
- doctrine/orm: *
- doctrine/persistence: *
- psr/container: *
- symfony/config: ^6.2
- symfony/console: ^6.2
- symfony/dependency-injection: ^6.2
- symfony/http-kernel: ^6.2
- symfony/mailer: ^6.2
- symfony/property-access: ^6.2
- symfony/security-bundle: ^6.2
- symfony/service-contracts: *
- symfony/twig-bridge: ^6.2
- whitedigital-eu/audit-service: ^0.5|^0.6|^0.7
- whitedigital-eu/entity-resource-mapper-bundle: *
Requires (Dev)
- roave/security-advisories: dev-latest
- whitedigital-eu/config-pack: *
This package is auto-updated.
Last update: 2024-10-22 13:54:20 UTC
README
Package for running ETL tasks.
- Define custom Extractors, Transformers and Loaders
- Create pipeline to run specific data import/export process.
Tasks can be run:
- from CLI (bin/console etl:run <task_name>)
- from services or controllers
- from frontend using Server Sent Events (SSE) API.
Requirements
- PHP 8.1+
- Symfony 6.2+
Install bundle
composer req "whitedigital-eu/etl-bundle"
Setup task
Example task (HorizonDataExtractor, HorizonCustomerTransformer - should be created separately)
<?php declare(strict_types=1); namespace App\ETL\Task; use App\ETL\Extractor\HorizonDataExtractor; use App\ETL\Transformer\HorizonCustomerTransformer; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Mailer\Exception\TransportExceptionInterface; use WhiteDigital\EtlBundle\Attribute\AsTask; use WhiteDigital\EtlBundle\Exception\EtlException; use WhiteDigital\EtlBundle\Loader\DoctrineDbalLoader; use WhiteDigital\EtlBundle\Task\AbstractTask; #[AsTask(name: 'horizon_customer_import')] class HorizonCustomerImportTask extends AbstractTask { /** * @throws EtlException * @throws TransportExceptionInterface */ public function runTask(OutputInterface $output, array $extractorArgs = null): void { $this->etlPipeline ->setOutput($output) ->addExtractor(HorizonDataExtractor::class, $extractorArgs ?? ['path' => '/rest/TDdmNorSar/query?columns=K.KODS,K.NOSAUK&orderby=K.NOSAUK asc']) ->addTransformer(HorizonCustomerTransformer::class) ->addLoader(DoctrineDbalLoader::class) ->run(); } }
Example Extractor:
<?php declare(strict_types=1); namespace App\ETL\Extractor; use App\Service\HorizonRestApiService; use Psr\Cache\InvalidArgumentException; use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface; use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface; use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface; use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use WhiteDigital\EtlBundle\Exception\ExtractorException; use WhiteDigital\EtlBundle\Extractor\AbstractExtractor; use WhiteDigital\EtlBundle\Helper\Queue; final class HorizonDataExtractor extends AbstractExtractor { public function __construct( private readonly HorizonRestApiService $horizonRestApiService, ) { } /** * @param \Closure|null $batchProcessor * @return Queue<\stdClass> * @throws ClientExceptionInterface * @throws ExtractorException * @throws InvalidArgumentException * @throws RedirectionExceptionInterface * @throws ServerExceptionInterface * @throws TransportExceptionInterface * @throws \JsonException */ public function run(\Closure $batchProcessor = null): Queue { if (null !== $batchProcessor) { throw new ExtractorException(sprintf('Batch mode not supported by %s', __CLASS__)); } $data = new Queue(); $this->output->writeln(sprintf('Datu iegūšana uzsākta no avota: [%s]', $path = $this->getOption('path'))); $rawJsonData = $this->horizonRestApiService->makeGetRequest($path); foreach ($rawJsonData?->collection->row as $row) { $data->push($row); } $this->output->writeln(sprintf('Iegūti %s ieraksti.', count($data))); return $data; } }
Example Transformer:
Example Loader:
Console commands
- Run task by its name:
bin/console etl:run horizon_customer_import
or pass extra custom argument:
bin/console etl:run horizon_customer_import random_path.txt
- List available tasks:
bin/console etl:list