keboola/db-import-export

Package allows to import files to Snowflake from multiple cloud storages


README

Supported operations

  • Load/Import csv from ABS to Snowflake or Synapse
  • Unload/Export table from Snowflake or Synapse to ABS

Features

Import

  • Full load - destination table is truncated before load
  • Incremental load - data are merged
  • Primary key dedup for all engines
  • Convert empty values to NULL (using convertEmptyValuesToNull option)

Export

  • Full unload - destination csv is always rewriten

Development

Preparation

Azure

  • Create storage account template can be found in provisioning ABS create template
  • Create container in storage account Blob service -> Containers note: for tests this step can be skiped container is created with loadAbs cmd
  • Fill env variables in .env file
ABS_ACCOUNT_NAME=storageAccount
ABS_ACCOUNT_KEY=accountKey
ABS_CONTAINER_NAME=containerName
  • Upload test fixtures to ABS docker-compose run --rm dev composer loadAbs

SNOWFLAKE

Role, user, database and warehouse are required for tests. You can create them:

CREATE ROLE "KEBOOLA_DB_IMPORT_EXPORT";
CREATE DATABASE "KEBOOLA_DB_IMPORT_EXPORT";

GRANT ALL PRIVILEGES ON DATABASE "KEBOOLA_DB_IMPORT_EXPORT" TO ROLE "KEBOOLA_DB_IMPORT_EXPORT";
GRANT USAGE ON WAREHOUSE "DEV" TO ROLE "KEBOOLA_DB_IMPORT_EXPORT";

CREATE USER "KEBOOLA_DB_IMPORT_EXPORT"
PASSWORD = 'Password'
DEFAULT_ROLE = "KEBOOLA_DB_IMPORT_EXPORT";

GRANT ROLE "KEBOOLA_DB_IMPORT_EXPORT" TO USER "KEBOOLA_DB_IMPORT_EXPORT";

SYNAPSE

Create synapse server on Azure portal or using CLI.

set up env variables: SYNAPSE_UID SYNAPSE_PWD SYNAPSE_DATABASE SYNAPSE_SERVER

Run query:

CREATE MASTER KEY;

this will create master key for polybase.

Managed Identity

Managed Identity is required when using ABS in vnet. docs How to setup and use Managed Identity is described in docs

TLDR; In IAM of ABS add role assignment "Blob Storage Data {Reader or Contributor}" to your Synapse server principal

Tests

Run tests with following command.

note: azure credentials must be provided and fixtures uploaded

docker-compose run --rm dev composer tests

Unit and functional test can be run sepparetly

#unit test
docker-compose run --rm dev composer tests-unit

#functional test
docker-compose run --rm dev composer tests-functional

Code quality check

#phplint
docker-compose run --rm dev composer phplint

#phpcs
docker-compose run --rm dev composer phpcs

#phpstan
docker-compose run --rm dev composer phpstan

Full CI workflow

This command will run all checks load fixtures and run tests

docker-compose run --rm dev composer ci

Usage

ABS -> Snowflake import/load

use Keboola\Db\ImportExport\Backend\Snowflake\Importer;
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\Storage;

$absSourceFile = new Storage\ABS\SourceFile(...);
$snowflakeDestinationTable = new Storage\Snowflake\Table(...);
$importOptions = new ImportOptions(...);

(new Importer($snowflakeConnection))->importTable(
    $absSourceFile,
    $snowflakeDestinationTable,
    $importOptions
);

Snowflake -> Snowflake copy

use Keboola\Db\ImportExport\Backend\Snowflake\Importer;
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\Storage;

$snowflakeSourceTable = new Storage\Snowflake\Table(...);
$snowflakeDestinationTable = new Storage\Snowflake\Table(...);
$importOptions = new ImportOptions(...);

(new Importer($snowflakeConnection))->importTable(
    $snowflakeSourceTable,
    $snowflakeDestinationTable,
    $importOptions
);

Snowflake -> ABS export/unload

use Keboola\Db\ImportExport\Backend\Snowflake\Exporter;
use Keboola\Db\ImportExport\ExportOptions;
use Keboola\Db\ImportExport\Storage;

$snowflakeSourceTable = new Storage\Snowflake\Table(...);
$absDestinationFile = new Storage\ABS\DestinationFile(...);
$exportOptions = new ExportOptions(...);

(new Exporter($snowflakeConnection))->exportTable(
    $snowflakeSourceTable,
    $absDestinationFile,
    $exportOptions
);

Internals/Extending

Library consists of few simple interfaces.

Create new backend

Importer, Exporter Interface must be implemented in new Backed

Keboola\Db\ImportExport\Backend\ImporterInterface
Keboola\Db\ImportExport\Backend\ExporterInterface

For each backend there is corresponding adapter which supports own combination of SourceInterface and DestinationInterface. Custom adapters can be set with setAdapters method.

Create new storage

Storage is now file storage ABS|S3 (in future) or table storage Snowflake|Synapse. Storage can have Source and Destination which must implement SourceInterface or DestinationInterface. These interfaces are empty and it's up to adapter to support own combination. In general there is one Import/Export adapter per FileStorage <=> TableStorage combination.

Adapter must implement:

  • Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface for import
  • Keboola\Db\ImportExport\Backend\BackendExportAdapterInterface for export

Backend can require own extended AdapterInterface (Synapse and Snowflake do now).