flow-php / etl-adapter-reactphp
PHP ETL - Asynchronous Adapter on ReactPHP
1.x-dev
2022-06-09 23:37 UTC
Requires
- php: ~8.1
- flow-php/etl: ^1.0@dev
- monolog/monolog: ^2.0 || ^3.0
- react/child-process: ^0.6.4
- react/socket: ^1.11
This package is auto-updated.
Last update: 2022-06-09 23:37:25 UTC
README
This adapter providers async local pipeline server/worker elements implemented on reactphp.
Following communication protocols are supported:
- TCP/IP (only local) -
127.0.0.1:6651
- Unix Domain Socket -
uinx:///var/run/etl.sock
Installation
composer require flow-php/etl-adapter-reactphp
Working example:
<?php use Aeon\Calendar\Stopwatch; use Doctrine\DBAL\DriverManager; use Doctrine\DBAL\Schema\Column; use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; use Flow\ETL\Adapter\CSV\League\CSVExtractor; use Flow\ETL\Adapter\Doctrine\DbalLoader; use Flow\ETL\Cache\InMemoryCache; use Flow\ETL\Config; use Flow\ETL\Monitoring\Memory\Consumption; use Flow\ETL\Pipeline\LocalSocketPipeline; use Flow\ETL\Async\ReactPHP\Worker\ChildProcessLauncher; use Flow\ETL\Async\ReactPHP\Server\SocketServer; use Flow\ETL\DSL\Transform; use Flow\ETL\Flow; use Monolog\Handler\StreamHandler; use Monolog\Logger; use Psr\Log\LogLevel; require __DIR__ . '/vendor/autoload.php'; $logger = new Logger('server'); $logger->pushHandler(new StreamHandler("php://stdout", LogLevel::DEBUG, false)); $logger->pushHandler(new StreamHandler("php://stderr", LogLevel::ERROR, false)); $stopwatch = new Stopwatch(); $stopwatch->start(); $memory = new Consumption(); (new Flow) ->read(new CSVExtractor( $path = __DIR__ . '/data/dataset.csv', 10_000, 0 )) ->pipeline( new LocalSocketPipeline( SocketServer::unixDomain(__DIR__ . "/var/run/", $logger), new ChildProcessLauncher(__DIR__ . "/vendor/bin/worker-reactphp", $logger), $workers = 8 ) ) ->rows(Transform::array_unpack('row')) ->drop('row') ->rows(Transform::to_integer("id")) ->rows(Transform::string_concat(['name', 'last_name'], ' ', 'name')) ->drop('last_name') ->load(new DbalLoader($tableName, $chunkSize = 1000, $dbConnectionParams)) ->run();
This adapter comes with built-in worker CLI application but feel free to create custom. Customization of the works will let you adjust logger or autoloader.