tobento / service-read-write
Flexible data readers, writers, modifiers and processors for PHP applications.
Installs: 0
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/tobento/service-read-write
Requires
- php: >=8.4
- psr/http-message: ^2.0
- tobento/service-collection: ^2.0
- tobento/service-iterable: ^2.0
- tobento/service-support: ^2.0
Requires (Dev)
- guzzlehttp/guzzle: ^7.10
- halaxa/json-machine: ^1.2
- nyholm/psr7: ^1.8
- phpunit/phpunit: ^12.3
- tobento/service-container: ^2.0
- tobento/service-encryption: ^2.0
- tobento/service-event: ^2.0
- tobento/service-file-storage: ^2.0
- tobento/service-pdf: ^2.0
- tobento/service-repository: ^2.0
- tobento/service-sanitizer: ^2.0
- tobento/service-storage: ^2.0
- tobento/service-validation: ^2.0
- tobento/service-view: ^2.0
- vimeo/psalm: ^6.13
Suggests
- halaxa/json-machine: Required for the JSON Stream Reader
- tobento/service-encryption: Required for the Encrypt modifier
- tobento/service-event: May be used to support events
- tobento/service-file-storage: To support file storage writer resource
- tobento/service-pdf: Required for pdf resource writer
- tobento/service-repository: To support repository writer
- tobento/service-sanitizer: Required for the Sanitize modifier
- tobento/service-storage: To support storage writer
- tobento/service-validation: Required for the Validation modifier
README
The Read and Write Service provides a unified way to consume and produce structured data across different formats and sources.
It abstracts common operations such as reading rows from streams or iterables, inspecting column metadata, and writing data back to files or streams in formats like CSV, JSON, or NDJSON.
Key Capabilities
- Readers: Stream data from CSV, JSON arrays, NDJSON, or in-memory iterables.
- Writers: Export rows into CSV, JSON, NDJSON, or other supported formats.
- Consistency: All readers and writers implement common interfaces for predictable usage.
- Flexibility: Works with any PSR-7
StreamInterfaceimplementation (Nyholm, Guzzle, Laminas, Slim, etc.). - Extensibility: Supports modifiers to transform, filter, or enrich row attributes.
Why Use It?
- Simplifies handling of heterogeneous data sources.
- Enables efficient streaming for large datasets without loading everything into memory.
- Provides a consistent developer experience across different formats.
Table of Contents
- Getting started
- Documentation
- Workflow
- Readers Comparison
- Readers
- Writers Comparison
- Writers
- Writer Resources
- Modifiers
- Apply Modifiers If Modifier
- Callable Modifier
- Column Map Modifier
- Combine Fields Modifier
- Compute Modifier
- Default Value Modifier
- Encrypt Modifier
- Filter Fields Modifier
- Format Modifier
- Hash Modifier
- Lookup Modifier
- Mask Modifier
- Redact Modifier
- Remove Fields Modifier
- Replace Modifier
- Sanitize Modifier
- Skip If Modifier
- Split Modifier
- Trim Modifier
- Unique Modifier
- Validation Modifier
- Processors
- Results
- Events
- Learn More
- Credits
Getting started
Add the latest version of the read/write project running this command.
composer require tobento/service-read-write
Requirements
- PHP 8.4 or greater
Documentation
Workflow
use Tobento\Service\ReadWrite\Modifier; use Tobento\Service\ReadWrite\Processor; use Tobento\Service\ReadWrite\Reader; use Tobento\Service\ReadWrite\Writer; $reader = new Reader\CsvStream( // PSR-7 StreamInterface stream: new Psr17Factory()->createStreamFromFile('/data/input.csv'), ); $writer = new Writer\CsvResource( resource: new Writer\Resource\LocalFile('/data/output.csv'), ); $modifiers = new Modifier\Modifiers( new Modifier\ColumnMap(['title' => 'headline']), ); $processor = new Processor\TimeBudgetProcessor( timeBudget: 10, modifiers: $modifiers ); $result = $processor->process(reader: $reader, writer: $writer); print_r($result->timeline());
Readers Comparison
| Reader | Streaming | Supports Preview | Detects Columns | Nested Structures | Skips Invalid Rows | Typical Use Case |
|---|---|---|---|---|---|---|
| CSV Stream Reader | Yes | Yes | Yes | No | Yes | Importing tabular CSV files of any size |
| Iterable Reader | Yes | Yes | Yes (from first row) | Yes (if iterable contains arrays) | Yes | Reading from arrays, generators, API responses |
| JSON Stream Reader | Yes | Yes | Yes | Yes | Yes (SkipRow) |
Large JSON arrays, API exports, structured data |
| NDJSON Stream Reader | Yes | Yes | Yes | Yes (per line) | Yes (SkipRow) |
Log streams, event streams, line-based JSON |
Notes
- Streaming means the reader does not load the entire file into memory.
- Preview refers to
columnsPreview()support. - Detects Columns means the reader can infer column names from the first row.
- Nested Structures apply to JSON and NDJSON, not CSV.
- Skip invalid rows means the reader returns a
SkipRowinstead of throwing.
Readers
CSV Stream Reader
The CsvStream reader allows you to consume CSV data from any source that implements the PSR-7 StreamInterface. It supports reading rows sequentially, applying offsets and limits.
Features
- Reads CSV data from PSR-7 streams (files, HTTP bodies, memory streams, etc.).
- Supports custom delimiters, enclosures, and escape characters.
- Automatically detects and uses the first row as column headers.
- Provides row objects implementing
RowInterfacefor consistent access. - Provides
isFinished()to check if the end of the stream has been reached. - Provides
currentOffset()to get the byte offset for resuming later.
Example
use Nyholm\Psr7\Factory\Psr17Factory; use Tobento\Service\ReadWrite\Reader; // Create a PSR-7 stream from a local file $stream = new Psr17Factory()->createStreamFromFile('/data/input.csv'); // Initialize the CSV reader $reader = new Reader\CsvStream( stream: $stream, delimiter: ',', // optional, defaults to ',' enclosure: '"', // optional, defaults to '"' escape: '\\', // optional, defaults to '\' ); // Read the first 5 rows starting at offset 0 foreach ($reader->read(offset: 0, limit: 5) as $row) { echo $row->key() . ': ' . json_encode($row->all()) . PHP_EOL; } if ($reader->isFinished()) { echo 'Reached end of CSV at offset: ' . $reader->currentOffset(); }
Notes
- You can use any PSR-7 implementation (Nyholm, Laminas, Guzzle, Slim, etc.) to create the stream.
- If the CSV contains a UTF-8 BOM, it will be stripped automatically.
- Invalid rows (wrong number of columns) are returned as
Tobento\Service\ReadWrite\Row\SkipRowobjects with a reason. - Combine with modifiers to transform attributes if needed.
Iterable Reader
The IterableReader wraps any PHP iterable (array, generator, Traversable) into a ReaderInterface.
It is useful for testing, working with in-memory datasets, or adapting existing collections to the reader API.
Features
- Accepts arrays, generators, and custom iterables.
- No stream or PSR-7 dependency required.
- Produces raw attributes exactly as provided by the iterable rows.
- Provides
isFinished()to check if all items have been consumed. - Provides
currentOffset()to track how many items have been read.
Example
use Tobento\Service\ReadWrite\Reader\IterableReader; $data = [ ['title' => 'Hello'], ['title' => 'World'], ]; $reader = new IterableReader($data); foreach ($reader->read() as $row) { echo $row->key() . ': ' . $row->get('title') . PHP_EOL; } echo 'Finished? ' . ($reader->isFinished() ? 'yes' : 'no') . PHP_EOL; echo 'Current offset: ' . $reader->currentOffset() . PHP_EOL;
Notes
- Ideal for unit tests or scenarios where data is already available in memory.
- Rows must implement RowInterface (e.g. Row objects).
- Combine with modifiers to transform attributes if needed.
JSON Stream Reader
The JsonStream reader consumes JSON arrays from any source that implements the PSR-7 StreamInterface.
Each element in the array is parsed into a RowInterface, allowing sequential access with offset and limit support.
Requirements
To use the JsonStream reader you must install the JsonMachine library, which provides efficient streaming of large JSON arrays:
composer require halaxa/json-machine
Features
- Parses top-level JSON arrays into rows.
- Efficient for large JSON arrays thanks to JsonMachine's incremental parsing.
- Efficient for moderately sized JSON arrays via streaming.
- Produces raw attributes exactly as they appear in the source.
- Skips any non-array values, returning them as
SkipRowobjects with a reason. - Provides
isFinished()to detect end of stream. - Provides
currentOffset()for resuming from a specific byte position.
Example
use Nyholm\Psr7\Factory\Psr17Factory; use Tobento\Service\ReadWrite\Reader\JsonStream; $stream = new Psr17Factory()->createStreamFromFile('/data/input.json'); $reader = new JsonStream($stream); // Read the first 3 rows foreach ($reader->read(offset: 0, limit: 3) as $row) { if ($row instanceof \Tobento\Service\ReadWrite\Row\SkipRow) { echo "Skipped row: " . $row->reason() . PHP_EOL; } else { print_r($row->all()); } } echo 'Offset: ' . $reader->currentOffset() . PHP_EOL; if ($reader->isFinished()) { echo 'Reached end of JSON stream'; }
Notes
- The JSON must be a valid array at the top level (e.g.
[ {...}, {...} ]). - Any non-array values encountered will be skipped and returned as SkipRow objects.
- For very large or continuous datasets, consider using the NDJSON Stream Reader instead, as it processes line-by-line and is more memory-efficient.
- Any PSR‑7 implementation (Nyholm, Laminas, Guzzle, Slim, etc.) can be used to create the stream.
- Combine with modifiers to transform attributes if needed.
NDJSON Stream Reader
The NdJsonStream reader consumes NDJSON (newline-delimited JSON) from any source that implements the PSR-7 StreamInterface.
Each line is parsed into a RowInterface, allowing sequential access with offset and limit support.
Features
- Processes entries line by line, memory-efficient for large datasets.
- Produces raw attributes from each JSON object.
- Skips invalid JSON lines, returning them as
SkipRowobjects with a reason. - Provides
isFinished()to check if all lines have been consumed. - Provides
currentOffset()to track the byte position for resuming later. totalRows()always returnsnullsince the total count cannot be determined without scanning the entire stream.
Example
use Nyholm\Psr7\Factory\Psr17Factory; use Tobento\Service\ReadWrite\Reader\NdJsonStream; $stream = new Psr17Factory()->createStreamFromFile('/data/input.ndjson'); $reader = new NdJsonStream(stream: $stream); // Read the first 2 rows starting at offset 0 foreach ($reader->read(offset: 0, limit: 2) as $row) { if ($row instanceof \Tobento\Service\ReadWrite\Row\SkipRow) { echo 'Skipped row: ' . $row->reason() . PHP_EOL; } else { echo json_encode($row->all()) . PHP_EOL; } } echo 'Current offset: ' . $reader->currentOffset() . PHP_EOL; if ($reader->isFinished()) { echo 'Reached end of NDJSON stream'; }
Notes
- NDJSON does not require a top-level array; it streams one JSON object per line.
- Invalid or malformed JSON lines are skipped and returned as SkipRow objects.
JsonStreamuses JsonMachine to stream a top-level JSON array without loading the full file, whileNdJsonStreamreads line-by-line NDJSON. Both are streaming-friendly; choose based on source format (array vs. line-delimited) and whether the data is continuous.- Prefer NDJSON for log-style or continuously appended data; prefer
JsonStreamfor structured arrays. - Any PSR-7 implementation (Nyholm, Laminas, Guzzle, Slim, etc.) can be used to create the stream.
Writers Comparison
| Writer | Streaming | Supports Headers | Supports Attributes | Nested Structures | Append Mode | Typical Use Case |
|---|---|---|---|---|---|---|
| CSV Resource Writer | Yes | Yes | No | No | Yes | Exporting tabular data, spreadsheets, reports |
| JSON Resource Writer | Yes | N/A | N/A | Yes | Yes | APIs, structured exports, debugging |
| NDJSON Resource Writer | Yes | N/A | N/A | Yes (per line) | Yes | Log streams, large datasets, incremental processing |
| XML Resource Writer | Yes | N/A | Yes (@attr) |
Yes | No (Finalize only) | Feeds (RSS, Atom), Google Shopping, catalogs, sitemaps |
| PDF Resource Writer | No | Template-based | Yes (via template) | Yes | No (Finalize only) | PDF reports, tables, invoices, formatted exports |
| Null Writer | Yes | No | No | No | N/A | Discarding output, testing pipelines |
| Repository Writer | No | No | No | No | N/A | Writing rows into repositories or collections |
| Storage Writer | No | No | No | No | N/A | Writing rows into storage services (e.g., key-value stores) |
Notes
- Streaming means the writer does not load the entire dataset into memory.
- Attributes apply only to XML (
@id,@foo, etc.). - Nested structures apply to JSON, NDJSON, and XML.
- Append mode is supported only where the underlying format allows it (CSV, JSON, NDJSON).
- XML uses
Mode::Finalizeinstead of append, because XML cannot be safely appended without rewriting closing tags.
Writers
CSV Resource Writer
The CsvResource writer exports rows into a CSV file or stream using a ResourceInterface.
It supports writing headers, handling BOM, and controlling write modes (overwrite or append).
Features
- Implements
WriterInterfaceandModeAwareInterface. - Supports configurable delimiter, enclosure, and escape characters.
- Optionally writes a UTF-8 BOM when starting fresh.
- Automatically writes column headers on the first row.
- Provides
start(),write(), andfinish()lifecycle methods. - Throws
WriterExceptionorWriteExceptionon errors. - See Writer Resources for details on available resource implementations.
Example
use Tobento\Service\ReadWrite\Resource\LocalFile; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\CsvResource; use Tobento\Service\ReadWrite\Writer\Mode; // Create a file resource $resource = new LocalFile('/data/output.csv'); // Initialize the CSV writer $writer = new CsvResource( resource: $resource, delimiter: ',', // optional enclosure: '"', // optional escape: '\\', // optional writeBom: true // optional ); // Set mode (overwrite or append) $writer->mode(Mode::Overwrite); // Start writing $writer->start(); // Write rows $writer->write(new Row(key: 1, attributes: ['title' => 'Hello', 'status' => 'Draft'])); $writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published'])); // Finish writing $writer->finish();
Notes
- The header row is written automatically based on the first row's attributes.
- If the resource is already open, start() will throw a WriterException.
- Use
Mode::Overwriteto start fresh (writes BOM) orMode::Appendto add to an existing file.
JSON Resource Writer
The JsonResource writer exports rows into a JSON file or stream using a ResourceInterface.
It writes rows as objects inside a top-level JSON array, supporting overwrite, append, and finalize modes.
Features
- Implements
WriterInterfaceandModeAwareInterface. - Writes rows into a JSON array (
[ {...}, {...} ]). - Supports
Mode::Overwrite,Mode::Append, andMode::Finalize. - Automatically handles commas between rows.
- Provides
start(),write(), andfinish()lifecycle methods. - Throws
WriterExceptionorWriteExceptionon errors. - See Writer Resources for details on available resource implementations.
Example
use Tobento\Service\ReadWrite\Resource\LocalFile; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\JsonResource; use Tobento\Service\ReadWrite\Writer\Mode; // Create a file resource $resource = new LocalFile('/data/output.json'); // Initialize the JSON writer $writer = new JsonResource($resource); // Set mode (overwrite or append) $writer->mode(Mode::Overwrite); // Start writing $writer->start(); // Write rows $writer->write(new Row(key: 1, attributes: ['title' => 'Hello', 'status' => 'Draft'])); $writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published'])); // Finish writing $writer->finish();
Notes
- Rows are written as JSON objects inside a top-level array.
- If the resource is already open, start() will throw a WriterException.
- Use
Mode::Overwriteto start fresh (opens a new JSON array). - Use
Mode::Appendto continue writing into an existing JSON array. - Use
Mode::Finalizeto close the JSON array when finished.
NDJSON Resource Writer
The NdJsonResource writer exports rows into an NDJSON file or stream using a ResourceInterface.
It writes each row as a standalone JSON object on its own line, making it ideal for log‑style or streaming data.
Features
- Implements
WriterInterface. - Writes rows as individual JSON objects separated by newlines.
- Provides
start(),write(), andfinish()lifecycle methods. - Throws
WriterExceptionorWriteExceptionon errors. - See Writer Resources for details on available resource implementations.
Example
use Tobento\Service\ReadWrite\Resource\LocalFile; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\Mode; use Tobento\Service\ReadWrite\Writer\NdJsonResource; // Create a file resource $resource = new LocalFile('/data/output.ndjson'); // Initialize the NDJSON writer $writer = new NdJsonResource($resource); // Start writing $writer->start(); // Write rows (each will be a separate line) $writer->write(new Row(key: 1, attributes: ['title' => 'Hello', 'status' => 'Draft'])); $writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published'])); // Finish writing $writer->finish();
Notes
- Each row is written as a JSON object on its own line (NDJSON format).
- If the resource is already open, start() will throw a WriterException.
- NDJSON is well-suited for large or continuously appended datasets (e.g. logs).
Null Writer
The NullWriter is a writer implementation that accepts rows but does nothing with them.
It is useful for:
- dry-run or simulation modes
- validating readers and modifiers without producing output
- debugging pipelines
- performance testing without I/O overhead
- scenarios where writing is optional or intentionally suppressed
The writer fully participates in the processing lifecycle (start, write, finish) but performs no side effects.
Example Use Cases
- Testing an import job without writing any data
- Running a processor to validate rows only
- Using modifiers for transformation or sanitization without persisting results
- Benchmarking reader performance
Example
use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\Mode; use Tobento\Service\ReadWrite\Writer\NullWriter; // Initialize the NullWriter $writer = new NullWriter(); // (Optional) Set mode if your application uses writer modes $writer->mode(Mode::Overwrite); // Start writing (no-op) $writer->start(); // Write rows (no-op) $writer->write(new Row( key: 1, attributes: ['title' => 'Hello', 'status' => 'Draft'] )); $writer->write(new Row( key: 2, attributes: ['title' => 'World', 'status' => 'Published'] )); // Finish writing (no-op) $writer->finish();
PDF Resource Writer
The PdfResource writer exports rows into a PDF document using a template-based rendering system.
It collects all written rows, merges them with optional template data, and generates a PDF using a PdfGeneratorInterface.
Requirements
Install the tobento/service-pdf package, which provides the PDF generation interfaces and utilities used by this writer:
composer require tobento/service-pdf
Features
- Implements
WriterInterface. - Collects rows and passes them to a PDF template as
$rows. - Supports additional template data via the
$templateDataarray (e.g.title,description, metadata). - Provides
start(),write(), andfinish()lifecycle methods. - Throws
WriterExceptionorWriteExceptionon errors. - Uses any
ResourceInterface(local file, stream, memory, etc.). - Works with any
PdfInterfaceimplementation. - See Writer Resources for details on available resource implementations.
Example
use Tobento\Service\Pdf\Enums\Orientation; use Tobento\Service\Pdf\Pdf; use Tobento\Service\Pdf\PdfGenerator; use Tobento\Service\ReadWrite\Resource\LocalFile; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\PdfResource; // Create a file resource $resource = new LocalFile('/data/report.pdf'); // Create a PDF generator: see pdf service. $pdfGenerator = new PdfGenerator(); // Initialize the PDF writer with a template and template data $writer = new PdfResource( resource: $resource, pdfGenerator: $pdfGenerator, templateName: 'pdf/export-table', templateData: [ 'title' => 'Product Report', 'description' => 'Generated on ' . date('Y-m-d'), ], pdf: new Pdf()->orientation(Orientation::LANDSCAPE), ); // Start writing $writer->start(); // Write rows (these will be available as $rows in the template) $writer->write(new Row(key: 1, attributes: ['name' => 'Apple', 'price' => 2.50])); $writer->write(new Row(key: 2, attributes: ['name' => 'Banana', 'price' => 1.20])); // Finish writing and generate the PDF $writer->finish();
Template Example
A PDF template is a regular view file that receives the merged $templateData and the collected $rows.
You may include CSS assets, partials, and any layout structure you need.
<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title><?= $view->esc($title ?? 'Export') ?></title> <?= $view->assets()->render() ?> <?php // Assets can be included in every subview too. $view->asset('assets/css/basis.css'); $view->asset('assets/css/app.css'); ?> </head> <body class="content"> <?= $view->render('inc/header') ?> <?php if (!empty($rows)) { ?> <table> <thead> <tr> <?php foreach (array_keys($rows[0]->all()) as $col) { ?> <th><?= $view->esc($col) ?></th> <?php } ?> </tr> </thead> <tbody> <?php foreach ($rows as $row) { ?> <tr> <?php foreach ($row->all() as $value) { ?> <td><?= $view->esc((string)$value) ?></td> <?php } ?> </tr> <?php } ?> </tbody> </table> <?php } else { ?> <p>No data available.</p> <?php } ?> <?= $view->render('inc/footer') ?> </body> </html>
Repository Writer
The RepositoryWriter writes rows into a WriteRepositoryInterface.
It supports automatic create/update operations or custom writer callbacks for flexible handling.
Requirements
Install the tobento/service-repository package, which provides the WriteRepositoryInterface used by this writer:
composer require tobento/service-repository
Features
- Implements
WriterInterface. - Uses a
WriteRepositoryInterfacefor persistence. - Supports a custom writer callback
(RowInterface, WriteRepositoryInterface): void. - Automatically updates rows if an identifier (
idby default) is present. - Creates new rows when no identifier is found.
- Provides
start(),write(), andfinish()lifecycle methods. - Throws
WriterExceptionorWriteExceptionon errors.
Example
use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\RepositoryWriter; use Tobento\Service\Repository\WriteRepositoryInterface; // Example repository implementing WriteRepositoryInterface $repository = new MyRepository(); // Initialize the repository writer $writer = new RepositoryWriter( repository: $repository, // Optional: specify the identifier column (defaults to 'id') idName: 'id', // Optional: define the columns supported by the repository columns: ['title', 'status', 'created_at'], // Optional: provide representative preview values for each column columnsPreview: [ 'title' => 'Lorem', 'status' => 'Draft | Pending', ], ); // Start writing $writer->start(); // Write rows $writer->write(new Row(key: 1, attributes: ['id' => 1, 'title' => 'Hello', 'status' => 'Draft'])); $writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published'])); // Finish writing $writer->finish();
Custom Writer Example
use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\RowInterface; use Tobento\Service\ReadWrite\Writer\RepositoryWriter; use Tobento\Service\Repository\WriteRepositoryInterface; // Custom writer callback $customWriter = function(RowInterface $row, WriteRepositoryInterface $repo): void { $attributes = $row->all(); // Always create new entries, ignore id $repo->create($attributes); }; $writer = new RepositoryWriter( repository: $repository, writer: $customWriter ); $writer->start(); $writer->write(new Row(key: 1, attributes: ['id' => 99, 'title' => 'Force Create'])); $writer->finish();
Notes
- If a row contains the identifier (id by default), updateById is called.
- If no identifier is present, create is called.
- A custom writer callback overrides the default behavior.
- start() and finish() are no‑ops by default (repositories usually don't need explicit start/finish).
- Errors from the repository are wrapped in WriteException or WriterException for consistent handling.
Storage Writer
The StorageWriter writes rows into a StorageInterface.
It supports automatic create/update operations or custom writer callbacks for flexible handling.
Requirements
Install the tobento/service-storage package, which provides the StorageInterface used by this writer:
composer require tobento/service-storage
Features
- Implements
WriterInterface. - Uses a
StorageInterfacefor persistence. - Supports a custom writer callback
(RowInterface, StorageInterface): void. - Automatically updates rows if an identifier (
idby default) is present. - Creates new rows when no identifier is found.
- Provides
start(),write(), andfinish()lifecycle methods. - Throws
WriterExceptionorWriteExceptionon errors.
Example
use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\StorageWriter; use Tobento\Service\Storage\StorageInterface; // Example storage implementing StorageInterface $storage = new MyStorage(); // Initialize the storage writer $writer = new StorageWriter( storage: $storage, // Optional: specify the identifier column (defaults to 'id') idName: 'id', // Optional: define the columns supported by the repository columns: ['title', 'status', 'created_at'], // Optional: provide representative preview values for each column columnsPreview: [ 'title' => 'Lorem', 'status' => 'Draft | Pending', ], ); // Start writing $writer->start(); // Write rows $writer->write(new Row(key: 1, attributes: ['id' => 1, 'title' => 'Hello', 'status' => 'Draft'])); $writer->write(new Row(key: 2, attributes: ['title' => 'World', 'status' => 'Published'])); // Finish writing $writer->finish();
Custom Writer Example
use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\RowInterface; use Tobento\Service\ReadWrite\Writer\StorageWriter; use Tobento\Service\Storage\StorageInterface; // Custom writer callback $customWriter = function(RowInterface $row, StorageInterface $storage): void { $attributes = $row->all(); // Always create new entries, ignore id $storage->insert($attributes); }; $writer = new StorageWriter( storage: $storage, writer: $customWriter ); $writer->start(); $writer->write(new Row(key: 1, attributes: ['id' => 99, 'title' => 'Force Create'])); $writer->finish();
XML Resource Writer
The XmlResource writer streams rows into an XML document using a ResourceInterface.
It supports configurable root elements, row elements, optional wrapper elements, attributes, nested structures, and safe XML escaping.
Features
- Implements
WriterInterfaceandModeAwareInterface. - Streams XML without loading the full document into memory.
- Writes an XML declaration (
<?xml version="1.0" encoding="UTF-8"?>). - Supports configurable:
- root element
- row element
- optional wrapper element (e.g.,
<channel>in RSS) - root attributes
- Supports attributes via the
@attributekey convention. - Supports nested arrays to generate nested XML structures.
- Numeric array keys produce repeated XML elements.
- Automatically escapes values using
ENT_XML1. - Validates XML version, encoding, and element names.
- Provides
start(),write(), andfinish()lifecycle methods. - Throws
WriterExceptionorWriteExceptionon errors. - See Writer Resources for details on available resource implementations.
Example
use Tobento\Service\ReadWrite\Resource\LocalFile; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\XmlResource; use Tobento\Service\ReadWrite\Writer\Mode; // Create a file resource $resource = new LocalFile('/data/feed.xml'); // Initialize the XML writer $writer = new XmlResource( resource: $resource, rootElement: 'products', rowElement: 'product', rowWrapper: null, // optional rootAttributes: ['xmlns:g' => 'http://base.google.com/ns/1.0'], // optional xmlVersion: '1.0', // optional encoding: 'UTF-8' // optional ); // Set mode (overwrite or finalize) $writer->mode(Mode::Overwrite); // Start writing $writer->start(); // Write rows $writer->write(new Row(key: 1, attributes: [ '@id' => '123', 'title' => 'Red Shoes', 'price' => '49.99', 'tags' => [ 'tag' => ['fashion', 'shoes'], // repeated elements ], ])); $writer->write(new Row(key: 2, attributes: [ '@id' => '124', 'title' => 'Blue Shirt', 'price' => '29.99', ])); // Finish writing $writer->mode(Mode::Finalize); $writer->finish();
Atom Feed Example
use Tobento\Service\ReadWrite\Resource\LocalFile; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\XmlResource; use Tobento\Service\ReadWrite\Writer\Mode; $resource = new LocalFile('/data/atom.xml'); $writer = new XmlResource( resource: $resource, rootElement: 'feed', rowElement: 'entry', rowWrapper: null, rootAttributes: [ 'xmlns' => 'http://www.w3.org/2005/Atom', ] ); $writer->mode(Mode::Overwrite); $writer->start(); $writer->write(new Row(1, [ 'title' => 'Hello World', 'id' => 'urn:uuid:123', 'updated' => '2025-01-01T12:00:00Z', 'link' => [ '@href' => 'https://example.com/hello', ], 'content' => 'This is an Atom entry.', ])); $writer->mode(Mode::Finalize); $writer->finish();
Google Shopping Product Feed Example
use Tobento\Service\ReadWrite\Resource\LocalFile; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\Writer\XmlResource; use Tobento\Service\ReadWrite\Writer\Mode; $resource = new LocalFile('/data/google-shopping.xml'); $writer = new XmlResource( resource: $resource, rootElement: 'rss', rowElement: 'item', rowWrapper: 'channel', rootAttributes: [ 'version' => '2.0', 'xmlns:g' => 'http://base.google.com/ns/1.0', ] ); $writer->mode(Mode::Overwrite); $writer->start(); $writer->write(new Row(1, [ 'g:id' => 'SKU123', 'g:title' => 'Red Shoes', 'g:description' => 'Comfortable running shoes.', 'g:link' => 'https://example.com/red-shoes', 'g:image_link' => 'https://example.com/red-shoes.jpg', 'g:price' => '49.99 USD', 'g:availability' => 'in stock', ])); $writer->mode(Mode::Finalize); $writer->finish();
Notes
- Attributes are written using the
@attributekey convention:['@id' => '123']into<product id="123"> - Nested arrays generate nested XML structures.
- Numeric keys inside arrays produce repeated elements:
['tag' => ['a', 'b']]into<tag>a</tag><tag>b</tag> - If the resource is already open,
start()will throw aWriterException. - Use
Mode::Overwriteto create a fresh XML document. - Use
Mode::Finalizeto close wrapper and root elements when finishing.
Writer Resources
Writer Resources define where and how data is physically written when using a writer (CSV, JSON, NDJSON, etc.).
They act as the destination layer, abstracting away the details of file handling, memory buffering, or storage backends.
Why They Are Used
- Separation of concerns: Writers focus on formatting rows (CSV, JSON, NDJSON), while resources handle persistence (local file, memory, storage system).
- Flexibility: You can swap out the resource depending on your needs without changing the writer logic.
- Write directly to a local file (
LocalFile). - Buffer in memory for testing or debugging (
InMemory). - Commit to a storage backend like filesystem or cloud (
FileStorage).
- Write directly to a local file (
- Consistency: All resources implement
ResourceInterface, so writers interact with them in a uniform way (open(),write(),rewind(),close()). - Error handling: Resources encapsulate low-level errors (file not found, write failure, storage commit issues) and translate them into
WriterExceptions. - Reusability: The same writer can be reused with different resources, making the system modular and adaptable.
Examples of Resources
- LocalFile: Writes directly to a file on the local filesystem.
- FileStorage: Buffers data and commits it to a
StorageInterfacebackend (e.g. filesystem, cloud). - InMemory: Stores all written data in memory, useful for testing or capturing output without persistence.
By combining a writer (format) with a resource (destination), you can easily control both how data is structured and where it is stored.
File Storage Resource
The FileStorage resource provides a bridge between writers and a storage backend implementing StorageInterface.
It buffers written data into a temporary stream and commits the contents to the configured storage when closed.
Requirements
Install the tobento/service-file-storage package, which provides the StorageInterface used by the FileStorage resource:
composer require tobento/service-file-storage
Features
- Implements
ResourceInterface. - Buffers data in a temporary
php://tempstream. - Commits buffered data to the configured
StorageInterfaceonclose(). - Provides lifecycle methods:
open(),write(),rewind(), andclose(). - Throws
WriterExceptionon errors (open, write, rewind, commit).
Example
use Tobento\Service\FileStorage\NullStorage; use Tobento\Service\FileStorage\StorageInterface; use Tobento\Service\ReadWrite\Exception\WriterException; use Tobento\Service\ReadWrite\Writer\Resource\FileStorage; // Create a storage (NullStorage for demo) $storage = new NullStorage(name: 'null'); // Initialize the FileStorage resource $resource = new FileStorage(storage: $storage, filename: 'output.csv'); // Open resource $resource->open(); // Write data $resource->write("id,title,status\n"); $resource->write("1,Hello,Draft\n"); $resource->write("2,World,Published\n"); // Rewind if needed $resource->rewind(); // Close and commit to storage $resource->close();
Notes
- Data is first written to a temporary stream (php://temp) before being committed to the storage backend.
- If the resource is already open, open() will throw a WriterException.
- close() commits the buffered contents to the configured StorageInterface and releases the handle.
- Use this resource with writers like CSV, JSON, or NDJSON to persist output into storage backends.
In Memory Resource
The InMemory resource provides a simple, non-persistent buffer for writers.
It stores all written data in memory, making it useful for testing, debugging, or scenarios where you don't want to write to disk or external storage.
Features
- Implements
ResourceInterface. - Buffers all written data in a string property.
- Provides lifecycle methods:
open(),write(),rewind(), andclose(). - Exposes
getContent()to retrieve the full written content. - Throws
WriterExceptionon errors (open, write).
Example
use Tobento\Service\ReadWrite\Writer\Resource\InMemory; use Tobento\Service\ReadWrite\Exception\WriterException; // Initialize the in-memory resource $resource = new InMemory(); // Open resource $resource->open(); // Write data $resource->write("id,title,status\n"); $resource->write("1,Hello,Draft\n"); $resource->write("2,World,Published\n"); // Rewind if needed $resource->rewind(); // Close resource $resource->close(); // Retrieve buffered content echo $resource->getContent();
Notes
- Data is stored entirely in memory and is lost once the resource is closed or discarded.
- If the resource is not open, write() will throw a WriterException.
- rewind() is effectively a no-op, since memory buffering does not require pointer management.
- Use this resource with writers like CSV, JSON, or NDJSON when you want to capture output without persisting it to disk or external storage.
Local File Resource
The LocalFile resource provides direct file access for writers.
It opens a file on the local filesystem, writes data to it, and manages the file handle lifecycle.
Features
- Implements
ResourceInterface. - Opens a file using
fopen()with a configurable mode (default:'w'). - Provides lifecycle methods:
open(),write(),rewind(), andclose(). - Throws
WriterExceptionon errors (open, write, rewind, close). - Exposes the underlying file handle via
getHandle().
Example
use Tobento\Service\ReadWrite\Writer\Resource\LocalFile; use Tobento\Service\ReadWrite\Exception\WriterException; // Initialize the LocalFile resource $resource = new LocalFile(filename: '/data/output.csv', mode: 'w'); // Open resource $resource->open(); // Write data $resource->write("id,title,status\n"); $resource->write("1,Hello,Draft\n"); $resource->write("2,World,Published\n"); // Rewind if needed $resource->rewind(); // Close resource $resource->close();
Notes
- The file is opened with the specified mode ('w', 'a', etc.), allowing overwrite or append behavior.
- If the file cannot be opened, open() will throw a WriterException.
- If the resource is not open, write() will throw a WriterException.
- close() ensures the file handle is released properly.
- Use this resource with writers like CSV, JSON, or NDJSON to persist output directly to the local filesystem.
Modifiers
Modifiers allow you to transform, filter, or reshape rows as they move through the processing pipeline.
They operate between the reader and writer, giving you full control over how data is normalized, renamed, enriched, or reduced before it is written.
Modifiers are applied by processors, not by writers directly.
This keeps the system flexible and composable: you can chain multiple modifiers, reorder them, or reuse them across different read/write workflows.
Why Modifiers Are Useful
- Schema alignment: Rename or remap columns so input and output formats match.
- Data transformation: Convert values, normalize formats, or enrich rows.
- Filtering: Remove unwanted columns or skip rows entirely.
- Consistency: All modifiers implement
ModifierInterface, making them easy to plug into any pipeline. - Reusability: Modifiers can be combined to build powerful, declarative data-processing flows.
Use modifiers whenever you need to adapt data between reading and writing without changing your reader or writer implementations.
Apply Modifiers If Modifier
The ApplyModifiersIf modifier applies one or more modifiers only when a condition is met.
If the condition does not match, the wrapped modifiers are skipped and the row
continues unchanged.
Features
- Wraps a single modifier or a chain of modifiers.
- Supports
callable,array,string, orboolconditions. - Does nothing when the condition is false.
- Dot-aware field paths supported.
Supported Conditions
1. Boolean Condition
use Tobento\Service\ReadWrite\Modifier\ApplyModifiersIf; // Always apply: new ApplyModifiersIf(true, $modifier); // Never apply: new ApplyModifiersIf(false, $modifier);
2. String Condition
Apply only if the field exists and is not empty:
use Tobento\Service\ReadWrite\Modifier\ApplyModifiersIf; new ApplyModifiersIf('country', $modifier);
3. Array Condition
use Tobento\Service\ReadWrite\Modifier\ApplyModifiersIf; // Apply if field is missing: new ApplyModifiersIf( ['field' => 'postal_code', 'missing' => true], $modifier ); // Apply if field equals a specific value: new ApplyModifiersIf( ['field' => 'country', 'equals' => 'CH'], $modifier );
4. Callable Condition
Apply only for Swiss addresses:
use Tobento\Service\ReadWrite\Modifier\ApplyModifiersIf; use Tobento\Service\ReadWrite\RowInterface; new ApplyModifiersIf( fn(RowInterface $row): bool => $row->get('country') === 'CH', $modifier );
Callable Modifier
The CallableModifier allows you to apply custom transformation logic to a row using any user-defined callable.
It provides maximum flexibility when built-in modifiers are not sufficient or when you want to encapsulate small, one-off transformations.
Features
- Implements
ModifierInterface. - Accepts any valid PHP callable.
- The callable receives the current
RowInterface,ReaderInterface, andWriterInterface. - Returns a modified
RowInterface. - Throws
ModifyExceptionif the callable fails. - Applied by processors within the read/write pipeline.
- See Modifiers for more transformation utilities.
Example
use Tobento\Service\ReadWrite\Modifier\CallableModifier; use Tobento\Service\ReadWrite\ReaderInterface; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\ReadWrite\RowInterface; use Tobento\Service\ReadWrite\WriterInterface; // Define a custom modifier $modifier = new CallableModifier(function(RowInterface $row, ReaderInterface $reader, WriterInterface $writer): RowInterface { $attributes = $row->all(); $attributes['title'] = strtoupper($attributes['title'] ?? ''); return new Row( key: $row->key(), attributes: $attributes ); }); // Normally applied by processors, but can be invoked manually: $modified = $modifier->modify($row, $reader, $writer);
Column Map Modifier
The ColumnMap modifier transforms a row's attributes by remapping column names according to a user-defined mapping.
It is useful when the input and output schemas differ, or when you need to rename columns during read/write operations.
Features
- Implements
ModifierInterface. - Remaps column names using a simple
['from' => 'to']array. - Only mapped columns that exist in the row are included in the output.
- Throws
ModifyExceptionif the mapping is empty. - Works with any reader and writer combination.
- See Modifiers for more transformation utilities.
Example
use Tobento\Service\ReadWrite\Modifier\ColumnMap; use Tobento\Service\ReadWrite\Row\Row; // Define a column mapping $modifier = new ColumnMap([ 'title' => 'name', 'status' => 'state', ]); // Example row $row = new Row( key: 1, attributes: [ 'title' => 'Hello', 'status' => 'Draft', 'ignored' => 'Not mapped', ] ); // Normally applied by processors, but can be invoked manually: $modified = $modifier->modify($row, $reader, $writer); // Resulting row attributes: // ['name' => 'Hello', 'state' => 'Draft']
Notes
- Only columns defined in the mapping are included in the resulting row.
- If the mapping array is empty, a ModifyException is thrown.
- Useful for renaming columns when exporting or normalizing inconsistent input data.
- Combine with other modifiers (e.g., filters or transformers) for more complex pipelines.
Combine Fields Modifier
The CombineFields modifier merges multiple attributes into a single target attribute.
It is useful for creating derived fields such as full_name from first_name and last_name.
Features
- Implements
ModifierInterface. - Accepts an array of source fields (dot-aware paths).
- Combines string and numeric values into a target field using a configurable separator.
- Skips non-stringable values (booleans, arrays, objects).
- Optionally removes source fields after combining.
- Throws
ModifyExceptionif no fields or target are defined.
Example
use Tobento\Service\ReadWrite\Modifier\CombineFields; use Tobento\Service\ReadWrite\Row\Row; $modifier = new CombineFields( fields: ['first_name', 'last_name', 'age'], into: 'summary', separator: ' ', removeSourceFields: true ); $row = new Row(1, [ 'first_name' => 'John', 'last_name' => 'Doe', 'age' => 42, ]); $modified = $modifier->modify($row, $reader, $writer); // Result: // ['summary' => 'John Doe 42']
Compute Modifier
The Compute modifier computes a field value using a user-defined callback.
It is useful for creating derived fields, performing custom calculations, or
generating values based on the entire row.
Features
- Implements
ModifierInterface. - Writes the computed value to a single target field.
- Callback receives the full
RowInterfaceinstance. - Dot-notation supported for the output field.
Example
use Tobento\Service\ReadWrite\Modifier\Compute; use Tobento\Service\ReadWrite\Row\Row; $modifier = new Compute( field: 'full_name', computeFn: function (RowInterface $row) { return trim( $row->get('first_name', '') . ' ' . $row->get('last_name', '') ); } ); $row = new Row( key: 1, attributes: [ 'first_name' => 'John', 'last_name' => 'Doe', ] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'first_name' => 'John', // 'last_name' => 'Doe', // 'full_name' => 'John Doe', // ]
Default Value Modifier
The DefaultValue modifier ensures that missing or empty attributes are populated with predefined default values.
It is useful for filling in required fields during imports or normalizing incomplete input data.
Features
- Implements
ModifierInterface. - Accepts a
['attribute' => defaultValue]array. - Dot‑aware paths supported (e.g.
user.name,tags.0). - Applies defaults when attributes are:
- missing
null- empty string (
'')
- Throws
ModifyExceptionif no defaults are defined. - Works with any reader and writer combination.
- See Modifiers for more cleaning and transformation utilities.
Example
use Tobento\Service\ReadWrite\Modifier\DefaultValue; use Tobento\Service\ReadWrite\Row\Row; // Define default values $modifier = new DefaultValue([ 'status' => 'Draft', 'created_at' => date('Y-m-d'), ]); // Example row with missing and empty attributes $row = new Row( key: 1, attributes: [ 'title' => 'Hello', 'status' => '', ] ); // Normally applied by processors, but can be invoked manually: $modified = $modifier->modify($row, $reader, $writer); // Resulting row attributes: // [ // 'title' => 'Hello', // 'status' => 'Draft', // replaced empty string // 'created_at' => '2025-12-24', // ]
Encrypt Modifier
The Encrypt modifier encrypts one or more fields using an application-defined
encryption service based on
tobento/service-encryption.
It is useful for protecting sensitive data such as API keys, tokens, personal
information, or any value that must be stored securely but still be decryptable later.
Requirements
To use the Encrypt modifier, you must install the
tobento/service-encryption package:
composer require tobento/service-encryption
Features
- Implements
ModifierInterface. - Accepts a single field or an array of fields.
- Dot-aware paths supported (e.g.
user.ssn,credentials.token). - Uses an
EncrypterInterfacefromservice-encryption. - Skips
nullvalues automatically. - Wraps encryption errors in a
ModifyException. - Works with any reader and writer combination.
Example
use Tobento\Service\Encryption\EncrypterInterface; use Tobento\Service\ReadWrite\Modifier\Encrypt; use Tobento\Service\ReadWrite\Row\Row; // $encrypter is an instance of EncrypterInterface $modifier = new Encrypt( fields: ['api.key', 'api.secret'], encrypter: $encrypter, ); $row = new Row( key: 1, attributes: [ 'api' => [ 'key' => 'my-api-key', 'secret' => 'super-secret', ], ] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'api' => [ // 'key' => 'ENCRYPTED', // 'secret' => 'ENCRYPTED', // ], // ]
Notes
- Only encryption is performed - decryption must be handled elsewhere using the same
EncrypterInterface. - If encryption fails, a
ModifyExceptionis thrown with the original row attached. - Dot-notation allows encrypting nested values inside arrays or objects.
- The modifier does not enforce any specific encryption algorithm - it relies entirely on the configured
EncrypterInterface.
Filter Fields Modifier
The FilterFields modifier keeps only the specified fields and removes all others.
It is useful for privacy filtering, export shaping, or limiting output to a
defined subset of fields.
Features
- Implements
ModifierInterface. - Accepts a single field or an array of fields.
- Dot-notation supported (e.g.
user.email,meta.tags.0). - Removes all fields not listed.
Example
use Tobento\Service\ReadWrite\Modifier\FilterFields; use Tobento\Service\ReadWrite\Row\Row; $modifier = new FilterFields(['id', 'email']); $row = new Row( key: 1, attributes: [ 'id' => 1, 'email' => 'a@b.com', 'password' => 'secret', ] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'id' => 1, // 'email' => 'a@b.com', // ]
Format Modifier
The Format modifier applies a user-defined formatting callback to a specific
field. It is useful for normalizing values, converting types, trimming,
cleaning, or applying any custom transformation to a single attribute.
Features
- Implements
ModifierInterface. - Formats a single field using a callback.
- Callback receives the current field value and the full
RowInterface. - Dot-notation supported for the target field.
Example
use Tobento\Service\ReadWrite\Modifier\Format; use Tobento\Service\ReadWrite\Row\Row; $modifier = new Format( field: 'email', formatter: function ($value, RowInterface $row) { return strtolower(trim((string)$value)); } ); $row = new Row( key: 1, attributes: [ 'email' => ' JOHN.DOE@EXAMPLE.COM ', ] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'email' => 'john.doe@example.com', // ]
Hash Modifier
The Hash modifier hashes one or more fields using a user-defined hashing callable.
It is useful for securely transforming sensitive values such as passwords, tokens,
API keys, or other secrets before they are stored or processed further.
Features
- Implements
ModifierInterface. - Accepts a single field or an array of fields.
- Dot-aware paths supported (e.g.
user.password,credentials.token). - Uses a user-provided hashing callable:
fn(mixed $value): string
- Skips
nullvalues automatically. - Wraps hashing errors in a
ModifyException. - Works with any reader and writer combination.
Example
use Tobento\Service\ReadWrite\Modifier\Hash; use Tobento\Service\ReadWrite\Row\Row; // Hash a password using PHP's password_hash $modifier = new Hash( fields: 'password', hasher: fn($value) => password_hash($value, PASSWORD_DEFAULT), ); $row = new Row( key: 1, attributes: ['password' => 'secret123'] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'password' => '$2y$10$....' // hashed value // ]
Notes
- Empty strings are treated as values and will be hashed unless handled externally.
- If hashing fails, a
ModifyExceptionis thrown with the original row attached. - The modifier does not enforce any specific hashing algorithm - you may use
password_hash,hash(), a framework hasher, or any custom callable. - Dot-notation allows hashing nested values inside arrays or objects.
Lookup Modifier
The LookupModifier maps a source field's value to another value using either a lookup array or a callable resolver.
It is useful for normalizing human-readable labels into IDs or codes (e.g. category name to category ID).
Features
- Implements
ModifierInterface. - Maps a source field to a target field using:
- A static lookup array (string, int, or float keys supported).
- A callable resolver (e.g. repository, service).
- Dot-aware paths supported.
- Optionally removes the source field after mapping.
- Throws
ModifyExceptionif source/target fields are missing.
Example (array lookup)
use Tobento\Service\ReadWrite\Modifier\Lookup; use Tobento\Service\ReadWrite\Row\Row; $modifier = new Lookup( field: 'category', into: 'category_id', lookup: [ 'Books' => 1, 42 => 'Answer', 3.14 => 'Pi', ], removeSourceField: true ); $row = new Row(1, ['category' => 42]); $modified = $modifier->modify($row, $reader, $writer); // Result: // ['category_id' => 'Answer']
Example (callable lookup)
$modifier = new Lookup( field: 'category', into: 'category_id', lookup: fn($value) => CategoryRepository::findIdByName($value), removeSourceField: true ); $row = new Row(1, ['category' => 'Books']); $modified = $modifier->modify($row, $reader, $writer); // Result: // ['category_id' => 1]
Mask Modifier
The Mask modifier masks one or more fields to hide sensitive information while
preserving enough structure for display or logging.
It is useful for partially hiding emails, phone numbers, tokens, names, or any
other sensitive value that should not appear in plain text.
Features
- Implements
ModifierInterface. - Accepts a single field or an array of fields.
- Dot-aware paths supported (e.g.
user.email,customer.phone). - Supports a custom masking callable:
fn(mixed $value): string
- Provides sensible default masking rules:
- Emails:
john@example.comtoj***@example.com - Strings:
JonathantoJ*****n - Numbers:
123456789to1*******9
- Emails:
- Skips
nullvalues automatically. - Wraps masking errors in a
ModifyException.
Example
use Tobento\Service\ReadWrite\Modifier\Mask; use Tobento\Service\ReadWrite\Row\Row; $modifier = new Mask( fields: ['user.email', 'user.phone'] ); $row = new Row( key: 1, attributes: [ 'user' => [ 'email' => 'john@example.com', 'phone' => '1234567890', ], ] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'user' => [ // 'email' => 'j***@example.com', // 'phone' => '1********0', // ], // ]
Example: Custom masking
use Tobento\Service\ReadWrite\Modifier\Mask; // Custom masking: always replace the value with six asterisks $modifier = new Mask( fields: 'phone', masker: fn(mixed $value): string => '******', );
Notes
- Default masking rules apply only when no custom callable is provided.
- Masking is intended for display/logging - not for security or encryption.
- Dot-notation allows masking nested values inside arrays or objects.
- If masking fails, a
ModifyExceptionis thrown with the original row attached.
Redact Modifier
The Redact modifier removes sensitive information by replacing one or more
fields with a fixed value.
It is useful for eliminating confidential data such as passwords, tokens,
personal identifiers, or any value that must not appear in logs, exports, or
downstream systems.
Features
- Implements
ModifierInterface. - Accepts a single field or an array of fields.
- Dot-aware paths supported (e.g.
user.password,customer.card.number). - Replaces values with a configurable replacement (default:
null). - Skips fields that do not exist.
- Wraps errors in a
ModifyException.
Example
use Tobento\Service\ReadWrite\Modifier\Redact; use Tobento\Service\ReadWrite\Row\Row; $modifier = new Redact( fields: ['user.password', 'user.token'] ); $row = new Row( key: 1, attributes: [ 'user' => [ 'password' => 'secret123', 'token' => 'abc123xyz', ], ] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'user' => [ // 'password' => null, // 'token' => null, // ], // ]
Example: Custom replacement
use Tobento\Service\ReadWrite\Modifier\Redact; // Replace values with a fixed string instead of null $modifier = new Redact( fields: 'api.key', replacement: 'REDACTED', );
Notes
- Redaction is intended for privacy and security - it removes data entirely.
- If you need partial hiding instead of full removal, use the Mask modifier.
- Dot-notation allows redacting nested values inside arrays or objects.
- If redaction fails, a
ModifyExceptionis thrown with the original row attached.
Remove Fields Modifier
The RemoveFields modifier removes the specified fields from a row.
It is useful for stripping sensitive data, removing debug or internal fields,
or cleaning up unwanted input before further processing.
Features
- Implements
ModifierInterface. - Accepts a single field or an array of fields.
- Dot-notation supported (e.g.
user.password,meta.debug.flag). - Removes only the listed fields.
Example
use Tobento\Service\ReadWrite\Modifier\RemoveFields; use Tobento\Service\ReadWrite\Row\Row; $modifier = new RemoveFields(['password', 'debug']); $row = new Row( key: 1, attributes: [ 'id' => 1, 'email' => 'a@b.com', 'password' => 'secret', 'debug' => 'x', ] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'id' => 1, // 'email' => 'a@b.com', // ]
Replace Modifier
The Replace modifier replaces values or substrings in one or more fields.
It is useful for cleaning up imported data, normalizing inconsistent values,
or converting placeholder values such as "N/A" or "none" into meaningful
representations.
Features
- Implements
ModifierInterface. - Accepts a single field or an array of fields.
- Dot-aware paths supported (e.g.
user.status,meta.note). - Supports exact match replacement (
strict = true). - Supports substring replacement (
strict = false). - Supports replacing
nullvalues whenforceNullReplacementis enabled. - Wraps errors in a
ModifyException.
Example
use Tobento\Service\ReadWrite\Modifier\Replace; use Tobento\Service\ReadWrite\Row\Row; $modifier = new Replace( fields: ['status'], replacements: [ 'N/A' => null, 'yes' => true, 'no' => false, ], strict: true, ); $row = new Row( key: 1, attributes: [ 'status' => 'yes', ] ); $modified = $modifier->modify($row, $reader, $writer); // Resulting attributes: // [ // 'status' => true, // ]
Example: Substring replacement
use Tobento\Service\ReadWrite\Modifier\Replace; $modifier = new Replace( fields: 'comment', replacements: [ 'foo' => 'bar', ], strict: false, );
Example: Replace null values
use Tobento\Service\ReadWrite\Modifier\Replace; $modifier = new Replace( fields: 'email', replacements: [ null => 'unknown', ], strict: true, forceNullReplacement: true, );
Notes
- In strict mode, only exact matches are replaced.
- In substring mode, all occurrences of the search string are replaced.
- Non-string values are left unchanged unless replacing null.
- Dot-notation allows replacing values inside nested arrays or objects.
Sanitize Modifier
The Sanitize modifier cleans row attributes using the tobento/service-sanitizer package.
It is useful for removing unwanted HTML, normalizing input, and ensuring safe, consistent values before validation or storage.
Requirements
To use the Sanitize modifier, you must install the tobento/service-sanitizer package:
composer require tobento/service-sanitizer
Features
- Implements
ModifierInterface. - Accepts a
['attribute' => rules]array of sanitation rules. - Dot‑aware paths supported (e.g.
user.name,tags.0). - Uses
SanitizerInterfaceto apply rules such asstrip_tags,trim,email, etc. strictSanitation: if true, sanitizes missing data too.returnSanitizedOnly: if true, returns only sanitized attributes, otherwise all.- Throws
ModifyExceptionif no rules are defined. - Works with any reader and writer combination.
Example
use Tobento\Service\ReadWrite\Modifier\Sanitize; use Tobento\Service\Sanitizer\Sanitizer; use Tobento\Service\ReadWrite\Row\Row; $sanitizer = new Sanitizer(); // Define sanitation rules $modifier = new Sanitize([ 'title' => 'strip_tags|trim', 'published_at' => 'date:Y-m-d:d.m.Y', ], $sanitizer); $row = new Row( key: 1, attributes: [ 'title' => '<h1>Hello</h1> ', 'published_at' => '24.12.2025', ] ); // Normally applied by processors, but can be invoked manually: $modified = $modifier->modify($row, $reader, $writer); // Resulting row attributes: // [ // 'title' => 'Hello', // 'published_at' => '2025-12-24', // normalized // ]
Skip If Modifier
The SkipIf modifier skips a row when a condition is met and returns a SkipRow
implementing SkippableInterface.
It is useful for filtering out invalid, incomplete, or unwanted rows before they
reach later modifiers or writers.
Features
- Implements
ModifierInterface. - Supports
callable,array,string, orboolconditions. - Returns a
SkipRowwith a human-readable skip reason. - Stops further modifiers from executing.
- Dot-aware field paths supported.
Supported Conditions
1. Boolean Condition
use Tobento\Service\ReadWrite\Modifier\SkipIf; // Skip all rows: new SkipIf(true, 'Skipping all rows'); // Skip no rows: new SkipIf(false);
2. String Condition
Skip if the field is missing or empty:
use Tobento\Service\ReadWrite\Modifier\SkipIf; new SkipIf('email', 'Email is required');
3. Array Condition
use Tobento\Service\ReadWrite\Modifier\SkipIf; // Skip if field is missing or empty: new SkipIf( ['field' => 'username', 'missing' => true], 'Username is missing' ); // Skip if field equals a specific value: new SkipIf( ['field' => 'status', 'equals' => 'N/A'], 'Status is N/A' ); // Skip if field equals zero: new SkipIf( ['field' => 'age', 'equals' => 0], 'Age cannot be zero' );
4. Callable Condition
Skip if age is under 18:
use Tobento\Service\ReadWrite\Modifier\SkipIf; use Tobento\Service\ReadWrite\RowInterface; new SkipIf( fn(RowInterface $row): bool => $row->get('age') < 18, 'User is under 18' );
Split Modifier
The Split modifier splits a single string field into multiple fields using a separator.
It is useful for breaking composite values into structured parts
(e.g. "John Doe" into first_name, last_name).
Features
- Implements
ModifierInterface. - Splits one source field into multiple target fields.
- Uses a configurable separator (default: space).
- Dot-aware paths supported.
- Optionally removes the source field after splitting.
- Throws
ModifyExceptionif source or target fields are missing.
Example
use Tobento\Service\ReadWrite\Modifier\Split; use Tobento\Service\ReadWrite\Row\Row; $modifier = new Split( field: 'full_name', into: ['first_name', 'last_name'], separator: ' ', removeSourceField: true ); $row = new Row(1, ['full_name' => 'John Doe']); $modified = $modifier->modify($row, $reader, $writer); // Result: // ['first_name' => 'John', 'last_name' => 'Doe']
Notes
- Extra parts beyond the number of target fields are ignored.
- If there are fewer parts than target fields, missing parts are skipped.
Trim Modifier
The Trim modifier cleans a row's attributes by removing leading and trailing whitespace (or other specified characters) from defined fields.
It is useful for normalizing text input, ensuring consistent values before validation, transformation, or storage.
Features
- Implements
ModifierInterface. - Trims only the attributes explicitly defined in the constructor.
- Supports trimming custom characters via the optional
$charsparameter. - Supports nested and indexed paths (e.g.
user.name,tags.0). - Skips attributes that do not exist in the row.
- Throws
ModifyExceptionif no attributes are defined. - Works with any reader and writer combination.
- See Modifiers for more cleaning utilities.
Example
use Tobento\Service\ReadWrite\Modifier\Trim; use Tobento\Service\ReadWrite\Row\Row; // Define which attributes to trim $modifier = new Trim(['title', 'status']); // Example row $row = new Row( key: 1, attributes: [ 'title' => ' Hello World ', 'status' => ' Draft ', 'description' => ' untouched ', ] ); // Normally applied by processors, but can be invoked manually: $modified = $modifier->modify($row, $reader, $writer); // Resulting row attributes: // [ // 'title' => 'Hello World', // 'status' => 'Draft', // 'description' => ' untouched ', // unchanged // ]
Unique Modifier
The Unique modifier ensures that one or more fields contain unique values across the import.
It is useful for preventing duplicate emails, SKUs, usernames, or other
identifiers that must not repeat.
Features
- Implements
ModifierInterface. - Accepts a single field or an array of fields.
- Dot-aware paths supported (e.g.
user.email,items.0.sku). - Supports an optional lookup callable for external uniqueness checks
(e.g. database, repository, API). - Falls back to in-memory uniqueness when no lookup is provided.
- On duplicate:
'fail'throwsModifyException'skip'returnsSkipRow
- Uses a safe stringify helper for readable error messages.
- Works with any reader and writer combination.
Example
use Tobento\Service\ReadWrite\Modifier\Unique; use Tobento\Service\ReadWrite\Row\Row; // Ensure email is unique across the import $modifier = new Unique( fields: 'email', lookup: null, // use in-memory uniqueness onFail: 'skip', // or 'fail' ); $row1 = new Row( key: 1, attributes: ['email' => 'john@example.com'] ); $row2 = new Row( key: 2, attributes: ['email' => 'john@example.com'] ); // First row passes $modifier->modify($row1, $reader, $writer); // Second row is skipped: $modified = $modifier->modify($row2, $reader, $writer); // Result: // SkipRow { // key: 2, // attributes: ['email' => 'john@example.com'], // reason: 'Duplicate value for unique field "email": john@example.com' // }
Example: Database lookup
use Tobento\Service\ReadWrite\Modifier\Unique; use Tobento\Service\ReadWrite\RowInterface; $modifier = new Unique( fields: 'sku', lookup: fn(string $field, mixed $value, RowInterface $row): bool => $productRepository->skuExists($value), onFail: 'fail' );
Notes
- null values are ignored and do not count as duplicates.
- In-memory uniqueness is per import process.
- For persistent uniqueness, use a lookup callable.
- Composite uniqueness can be implemented via the lookup callable.
Validation Modifier
The Validation modifier validates row attributes using the
tobento/service-validation package.
It is useful for enforcing required fields, checking formats, and ensuring data integrity before further processing or storage.
Requirements
To use the Validation modifier, you must install the
tobento/service-validation package:
composer require tobento/service-validation
Features
- Implements
ModifierInterface. - Accepts a
['attribute' => rules]array of validation rules. - Dot-aware paths supported (e.g.
user.email,items.0.price). - Uses
ValidatorInterfaceto apply rules such asrequired,email,int,min, etc. - On validation failure:
'fail'throwsModifyException'skip'returnsSkipRow
- Groups error messages by field for readable output.
- Works with any reader and writer combination.
Example
use Tobento\Service\ReadWrite\Modifier\Validation; use Tobento\Service\ReadWrite\Row\Row; use Tobento\Service\Validation\Validator; $validator = new Validator(); // Define validation rules $modifier = new Validation( rules: [ 'email' => 'required|email', 'age' => 'required|int|minNum:18', ], validator: $validator, onFail: 'skip', // or 'fail' ); $row = new Row( key: 1, attributes: [ 'email' => 'not-an-email', 'age' => '17', 'name' => 'John Doe', ] ); // Normally applied by processors, but can be invoked manually: $modified = $modifier->modify($row, $reader, $writer); // Result when validation fails and onFail = 'skip': // SkipRow { // key: 1, // attributes: [ // 'email' => 'not-an-email', // 'age' => '17', // 'name' => 'John Doe', // ], // reason: 'Validation failed: email: The email must be a valid email address; age: Must be at least 18' // } // If onFail = 'fail', a ModifyException is thrown instead.
Processors
Processors form the transformation layer between reading and writing.
They take each row produced by a reader and pass it through a configurable pipeline of modifiers, filters, and other processing steps before the row reaches the writer.
A processor does not change how data is read or written - instead, it controls how data flows and is transformed between the two. This makes processors the central place for applying business logic, normalization, validation, or schema adjustments.
Why Processors Are Useful
- Pipeline orchestration: They execute modifiers in sequence, ensuring each row is transformed consistently.
- Separation of concerns: Readers read, writers write - processors handle everything in between.
- Flexibility: You can add, remove, or reorder processing steps without touching readers or writers.
- Reusability: The same processor configuration can be reused across different import/export workflows.
- Consistency: All processors follow the same contract, making them easy to integrate and extend.
Use processors whenever you need to apply transformations, filtering, or mapping logic to rows before they are written.
Default Processor
The default Processor coordinates the full read → modify → write workflow using a reader, writer, and a set of modifiers.
It handles row iteration, modifier execution, writer mode selection, error tracking, and optional result handling.
Key Responsibilities
- Iterates through rows from the reader using
read(offset, limit). - Applies all modifiers in sequence via
ModifiersInterface. - Handles skippable rows before and after modification.
- Writes processed rows to the writer.
- Selects writer mode (
Overwrite,Append,Finalize) when supported. - Tracks successful, failed, and skipped rows.
- Wraps unexpected errors in a
ProcessException. - Produces a
Resultobject summarizing the operation. - Delegates row-level and result-level events to an optional
ResultHandlerInterface.
Example
use Nyholm\Psr7\Factory\Psr17Factory; use Tobento\Service\ReadWrite\Modifier\Modifiers; use Tobento\Service\ReadWrite\Processor\Processor; use Tobento\Service\ReadWrite\Reader; use Tobento\Service\ReadWrite\Writer; // Reader and writer $reader = new Reader\CsvStream( stream: new Psr17Factory()->createStreamFromFile('/data/input.csv'), ); $writer = new Writer\CsvResource( resource: new Writer\Resource\LocalFile('/data/output.csv'), ); // Create modifiers (empty for this example) $modifiers = new Modifiers(); // Create processor $processor = new Processor( modifiers: $modifiers, resultHandler: null, ); // Process rows starting from the reader's current offset $result = $processor->process( reader: $reader, writer: $writer, offset: $reader->currentOffset(), limit: null, ); // Inspect result echo $result->successfulRows(); // e.g. 42
Processing Flow
-
Determine writer mode
If the writer implementsModeAwareInterface, the processor sets the mode based on the current offset and reader state:Overwritewhen starting at offset0Appendwhen continuing and the reader is not finishedFinalizewhen processing the last chunk
-
Start the writer
The processor callswriter->start()before any rows are processed. -
Iterate through rows
For each row returned byreader->read(offset, limit):- Skip immediately if the row implements
SkippableInterface - Apply all modifiers via
ModifiersInterface - Skip again if modifiers mark the row as skippable
- Write the row using
writer->write()
- Skip immediately if the row implements
-
Error handling
ModifyExceptionandWriteExceptionincrement the failed row count- Any other exception is wrapped in a
ProcessException
-
Finish the writer
After all rows are processed, the processor callswriter->finish(). -
Build the result
AResultobject is created containing:- Number of successful, failed, and skipped rows
- The reader, writer, and modifiers used
- Start and finish timestamps
-
Notify result handler
If aResultHandlerInterfaceis provided, it receives:- Row success events
- Row skip events
- Row failure events
- The final result event
Time Budget Processor
The TimeBudgetProcessor works like the default processor but adds a strict execution time limit.
It processes rows only as long as the predicted time for the next row stays within the configured time budget.
This makes it ideal for cron jobs, queue workers, or long-running tasks where execution time must be controlled.
The processor estimates future row duration using a moving average of the last N processed rows.
Key Responsibilities
- Processes rows until the time budget is reached or the reader is exhausted.
- Predicts next-row cost using a moving average of recent row processing times.
- Stops early when the next row would exceed the remaining time budget.
- Applies modifiers and handles skippable rows.
- Writes processed rows to the writer.
- Selects writer mode (
Overwrite,Append,Finalize) when supported. - Tracks successful, failed, and skipped rows.
- Wraps unexpected errors in a
ProcessException. - Produces a
Resultsummarizing the operation. - Supports optional
ResultHandlerInterfacefor row-level and result-level events.
Example
use Nyholm\Psr7\Factory\Psr17Factory; use Tobento\Service\ReadWrite\Modifier\Modifiers; use Tobento\Service\ReadWrite\Processor\TimeBudgetProcessor; use Tobento\Service\ReadWrite\Reader; use Tobento\Service\ReadWrite\Writer; // Reader and writer $reader = new Reader\CsvStream( stream: new Psr17Factory()->createStreamFromFile('/data/input.csv'), ); $writer = new Writer\CsvResource( resource: new Writer\Resource\LocalFile('/data/output.csv'), ); // Create modifiers (empty for this example) $modifiers = new Modifiers(); // Create processor with a 20-second time budget $processor = new TimeBudgetProcessor( timeBudget: 20, modifiers: $modifiers, resultHandler: null, ); // Process rows starting from the reader's current offset $result = $processor->process( reader: $reader, writer: $writer, offset: $reader->currentOffset(), limit: null, ); // Inspect result echo $result->successfulRows();
Processing Flow
-
Determine writer mode
If the writer implementsModeAwareInterface, the processor sets the mode based on the current offset and reader state:Overwritewhen starting at offset0Appendwhen continuing and the reader is not finishedFinalizewhen processing the last chunk
-
Start the writer
The processor callswriter->start()before any rows are processed. -
Initialize time-budget tracking
- Convert the configured time budget from seconds to nanoseconds
- Record the start time using
hrtime(true) - Prepare a moving window of recent row durations (default: last 50 rows)
-
Predict next row cost
Before processing each row:- Compute the moving average of recent row processing times
- Calculate elapsed time and remaining budget
- Stop early if the predicted next row would exceed the remaining time budget
-
Iterate through rows
For each row returned byreader->read(offset, limit):- Skip immediately if the row implements
SkippableInterface - Apply all modifiers via
ModifiersInterface - Skip again if modifiers mark the row as skippable
- Write the row using
writer->write() - Measure the row's processing time and update the moving average window
- Skip immediately if the row implements
-
Error handling
ModifyExceptionandWriteExceptionincrement the failed row count- Any other exception is wrapped in a
ProcessException
-
Finish the writer
After processing stops (either naturally or due to time budget), the processor callswriter->finish(). -
Build the result
AResultobject is created containing:- Number of successful, failed, and skipped rows
- The reader, writer, and modifiers used
- Start and finish timestamps
-
Notify result handler
If aResultHandlerInterfaceis provided, it receives:- Row success events
- Row skip events
- Row failure events
- The final result event
Results
The Results system provides structured feedback about what happened during processing.
Every processor returns a ResultInterface describing how many rows were processed successfully, how many failed, and how many were skipped. This makes it easy to track progress, display summaries, log outcomes, or resume processing later.
Results are split into two parts:
- Result Handler - optional callbacks that receive row-level and final result events during processing.
- Result Object - the final summary returned by the processor, containing counters, timestamps, and references to the reader, writer, and modifiers used.
Together, these components give you full visibility into the processing workflow and allow you to integrate reporting, logging, or UI updates in a clean and consistent way.
Result Handler
A ResultHandlerInterface allows you to react to processing events as they occur.
While the Result object provides a final summary after processing finishes, a result handler gives you real-time hooks for row-level and batch-level events.
Result handlers are especially useful in queue-driven or event-driven architectures, where you may want to:
- Log each processed row
- Dispatch queue jobs
- Track skipped or failed rows
- Update progress indicators
- Emit domain events
- Build monitoring dashboards
Processors call the handler methods synchronously and only if a handler is provided.
Handlers never modify rows or influence the processing flow - they are purely observational.
Event Methods
-
handleRowSuccess(RowInterface $row)
Triggered when a row is processed successfully.
Useful for logging, progress updates, or dispatching follow-up jobs. -
handleRowSkip(SkippableInterface $row)
Triggered when a row is skipped by the reader or modifiers.
Useful for tracking skipped rows or recording data quality issues. -
handleRowFailure(RowInterface $row, Throwable $exception)
Triggered when a row fails due to a modification or write error.
Useful for logging errors, queueing retries, or alerting. -
handleResult(ResultInterface $result)
Triggered once after processing completes.
Useful for logging summaries, dispatching "batch finished" events, or updating dashboards.
Example: PSR‑3 Logging Result Handler
use Psr\Log\LoggerInterface; use Tobento\Service\ReadWrite\ResultHandlerInterface; use Tobento\Service\ReadWrite\ResultInterface; use Tobento\Service\ReadWrite\RowInterface; use Tobento\Service\ReadWrite\Row\SkippableInterface; use Throwable; class LoggingResultHandler implements ResultHandlerInterface { public function __construct( private LoggerInterface $logger, ) {} public function handleRowSuccess(RowInterface $row): void { $this->logger->info('Row processed successfully', [ 'row' => $row, ]); } public function handleRowSkip(SkippableInterface $row): void { $this->logger->notice('Row skipped', [ 'row' => $row, ]); } public function handleRowFailure(RowInterface $row, Throwable $exception): void { $this->logger->error('Row processing failed', [ 'row' => $row, 'exception' => $exception, ]); } public function handleResult(ResultInterface $result): void { $this->logger->info('Processing finished', [ 'successful' => $result->successfulRows(), 'failed' => $result->failedRows(), 'skipped' => $result->skippedRows(), 'runtime' => $result->runtimeInSeconds(), ]); } }
Result Object
The default Result implementation provides a complete summary of a processing run.
It is returned by every processor and contains counters, timestamps, references to the components involved, and optional metadata.
This makes it easy to display summaries, log outcomes, or resume processing based on the final state.
The Result object is immutable: all values are set at construction time and exposed through read-only accessors.
Features
- Tracks successful, failed, and skipped rows.
- Stores references to the reader, writer, and modifiers used during processing.
- Records start and finish timestamps.
- Computes total runtime in seconds.
- Supports custom metadata for additional context.
- Provides a structured timeline summary for logging or monitoring.
Example
$result = $processor->process($reader, $writer); echo $result->successfulRows(); // e.g. 42 echo $result->failedRows(); // e.g. 3 echo $result->runtimeInSeconds(); // e.g. 1.52
Available Data
Row Counters
-
successfulRows()
Returns the number of rows that were processed and written successfully. -
failedRows()
Returns the number of rows that failed during modification or writing. -
skippedRows()
Returns the number of rows skipped either by the reader or by modifiers.
Components
-
reader()
Returns theReaderInterfaceinstance used during processing. -
writer()
Returns theWriterInterfaceinstance used during processing. -
modifiers()
Returns theModifiersInterfaceapplied to each row.
These references allow you to inspect configuration, offsets, or writer state after processing.
Metadata
meta()
Returns an array of custom metadata passed into the result.
Useful for storing batch IDs, session information, or additional context.
Timing Information
-
startedAt()
Timestamp when processing began. -
finishedAt()
Timestamp when processing ended. -
runtimeInSeconds()
Total processing duration in seconds, based on the difference between start and finish timestamps.
Timeline Summary
timeline()
Returns a structured array containing timestamps, runtime, and row statistics.
Ideal for logging, monitoring, or debugging.
[
'started_at' => '2025-01-01T12:00:00+00:00',
'finished_at' => '2025-01-01T12:00:01+00:00',
'runtime_seconds' => 1.0,
'rows' => [
'successful' => 42,
'failed' => 3,
'skipped' => 1,
'total' => 46,
],
]
Events
The Import/Export workflow provides simple event value objects that describe different stages of processing:
- ProcessStarted - emitted when processing begins
- PartialProcess - emitted during processing (e.g., after a chunk)
- ProcessCompleted - emitted when processing finishes successfully
- ProcessFailed - emitted when processing stops due to an error
These events are not dispatched automatically.
Your application is responsible for dispatching them, which gives you full control over how and when they are used (e.g., for logging, queueing, or UI updates).
Each event contains a ResultInterface so listeners can inspect progress or statistics.
Example: Dispatching Events While Running a Processor
use Psr\EventDispatcher\EventDispatcherInterface; use Tobento\Service\ReadWrite\Event; use Tobento\Service\ReadWrite\ProcessorInterface; use Tobento\Service\ReadWrite\Result; class ImportRunner { public function __construct( private ProcessorInterface $processor, private EventDispatcherInterface $events, ) {} public function run($reader, $writer): void { // Dispatch: processing started $this->events->dispatch(new Event\ProcessStarted( result: new Result( successfulRows: 0, failedRows: 0, skippedRows: 0, reader: $reader, writer: $writer, modifiers: $this->processor->modifiers(), ), )); try { // Run the processor $result = $this->processor->process( reader: $reader, writer: $writer, offset: $reader->currentOffset() ); // Dispatch: partial progress (optional) $this->events->dispatch(new Event\PartialProcess($result)); // Dispatch: processing completed if ($reader->isFinished()) { $this->events->dispatch(new Event\ProcessCompleted($result)); } } catch (\Throwable $e) { // Build a minimal failure result snapshot $failureResult = new Result( successfulRows: 0, failedRows: 0, skippedRows: 0, reader: $reader, writer: $writer, modifiers: $this->processor->modifiers(), meta: ['failed' => true], ); // Dispatch: processing failed $this->events->dispatch(new Event\ProcessFailed( result: $failureResult, exception: $e, )); throw $e; } } }
Just make sure you pass an event dispatcher to your runner!
Learn More
Using Processors with Registries and Queues
This package focuses on reading, writing, modifying, and processing data.
It does not define how readers, writers, or modifiers are registered or configured.
Different applications solve this differently (CRUD fields, JSON config, DI factories, etc.).
Because of this, applications are expected to provide their own registry layer that knows how to create readers, writers, and modifiers from stored job definitions.
Below is a simplified example showing how an application might integrate a processor with a registry and a
queue worker.
The example demonstrates a possible job handler for the queue service.
use Psr\Container\ContainerInterface; use Psr\EventDispatcher\EventDispatcherInterface; use Tobento\Service\ReadWrite\Event; use Tobento\Service\ReadWrite\Processor\TimeBudgetProcessor; use Tobento\Service\ReadWrite\RegistriesInterface; use Tobento\Service\ReadWrite\Result; use Tobento\Service\ReadWrite\ResultHandlerInterface; use Tobento\Service\Queue\JobHandlerInterface; use Tobento\Service\Queue\JobInterface; use Tobento\Service\Queue\Parameter; use Tobento\Service\Queue\QueuesInterface; class TimeBudgetJobHandler implements JobHandlerInterface { public function __construct( protected ContainerInterface $container, protected RegistriesInterface $registries, protected JobRepositoryInterface $jobRepository, protected QueuesInterface $queues, protected null|ResultHandlerInterface $resultHandler = null, protected null|EventDispatcherInterface $eventDispatcher = null, ) {} public function handleJob(JobInterface $job): void { $id = (int)($job->getPayload()['job_id'] ?? 0); if (is_null($jobEntity = $this->jobRepository->findById($id))) { // Optional: dispatch job-not-found event return; } // Resolve reader $readerRegistry = $this->registries->get($jobEntity->readerId()); if (is_null($readerRegistry)) { return; } $reader = $readerRegistry->createReader($this->container, $jobEntity); // Resolve writer + modifiers $writerRegistry = $this->registries->get($jobEntity->writerId()); if (is_null($writerRegistry)) { return; } $writer = $writerRegistry->createWriter($this->container, $jobEntity); $modifiers = $writerRegistry->createModifiers($this->container, $jobEntity); // Job data (offset) $data = $job->parameters()->get(Parameter\Data::class) ?? new Parameter\Data(['offset' => 0]); if (! $job->parameters()->has(Parameter\Data::class)) { $job->parameters()->add($data); } $offset = $data->get('offset', 0); // Optional: dispatch start event $this->eventDispatcher?->dispatch(new Event\ProcessStarted( result: new Result( successfulRows: 0, failedRows: 0, skippedRows: 0, reader: $reader, writer: $writer, modifiers: $modifiers, ), )); try { // Run processor with time budget $processor = new TimeBudgetProcessor( timeBudget: $job->getPayload()['timeBudget'] ?? 20, modifiers: $modifiers, resultHandler: $this->resultHandler, ); $result = $processor->process( reader: $reader, writer: $writer, offset: $offset, ); $this->eventDispatcher?->dispatch(new Event\PartialProcess($result)); // Requeue if not finished if (! $reader->isFinished()) { $data->set('offset', $reader->currentOffset()); $job->parameters()->add($data); $this->queues ->queue($job->parameters()->get(Parameter\Queue::class)->name()) ->push($job); return; } // Completed $this->eventDispatcher?->dispatch(new Event\ProcessCompleted($result)); } catch (\Throwable $e) { $failureResult = new Result( successfulRows: 0, failedRows: 0, skippedRows: 0, reader: $reader, writer: $writer, modifiers: $modifiers, meta: ['failed' => true], ); $this->eventDispatcher?->dispatch(new Event\ProcessFailed( result: $failureResult, exception: $e, )); throw $e; } } }
Full Example
A complete implementation, including registries, CRUD configuration, and job entities, is available in the Import/Export App.