entreya / csvquery
High-performance CSV query engine with Go-powered indexing and Yii2-like fluent API
Requires
- php: >=8.1
- ext-json: *
Requires (Dev)
- phpunit/phpunit: ^10.0
Suggests
- ext-posix: For Unix Domain Socket support (recommended)
README
CsvQuery
Query massive CSV files like a database β powered by a Go sidecar engine.
CsvQuery is a high-performance query engine that turns CSV files (10 GBβ1 TB+) into searchable, indexed data stores. A fluent PHP API (inspired by Yii2 ActiveQuery) communicates with a Go sidecar daemon over Unix Domain Sockets, delivering sub-millisecond query latency without the overhead of a traditional database.
π Full documentation β entreya.github.io/csvquery-docs
Table of Contents
- Key Features
- Architecture
- Installation
- Quick Start
- Usage Examples
- API Reference
- Data Flow
- Performance
- CLI Reference
- Project Structure
- Testing
- Contributing
- License & Acknowledgments
β¨ Key Features
| Feature | Description |
|---|---|
| π SIMD-Accelerated Parsing | AVX2 / SSE4.2 instructions scan CSV delimiters at hardware speed (10 GB/s+) |
| π Massive Scale | Benchmarked on 18 M+ rows, 10 GB+ files |
| πΎ Memory Efficient | mmap-based file access with LZ4-compressed indexes |
| π Yii2-like Fluent API | Familiar find()βwhere()βall() query builder for PHP developers |
| β‘ Zero-IO Index Scans | COUNT(*) operations resolve from index metadata alone β the CSV is never opened |
| π Live Updates | Sidecar _updates.json system makes immutable CSVs mutable |
| π Daemon Mode | Persistent Go process over UDS for ~1 ms latency (vs ~200 ms per-process spawn) |
| π Cross-Platform | Pre-compiled binaries for Linux, macOS, and Windows (AMD64 & ARM64) |
ποΈ Architecture
CsvQuery uses a PHP + Go Sidecar design. Your PHP application talks to a long-lived Go daemon through Unix Domain Sockets (UDS), keeping query latency under a millisecond.
graph TB
subgraph "PHP Application Layer"
A["Your PHP App"] --> B["CsvQuery"]
B --> C["ActiveQuery"]
C --> D["GoBridge"]
D --> E["SocketClient"]
end
subgraph "Go Sidecar Engine"
E <== "JSON over UDS" ==> F["UDS Daemon"]
F --> G["Query Engine"]
G --> H["Index Reader"]
G --> I["SIMD Scanner"]
H --> J["LZ4 Decompressor"]
end
subgraph "Disk Storage"
K[("Raw CSV File")]
L[(".cidx Indexes")]
M[(".bloom Filters")]
N[("_updates.json")]
end
G -.- K
H -.- L
G -.- M
G -.- N
Loading
Key Technologies
| Component | Technology | Why |
|---|---|---|
| Parsing | AVX2 / SSE4.2 SIMD | Scan delimiters at hardware speed |
| Compression | LZ4 block codec | 10Γ faster decompression than Gzip |
| File Access | mmap |
Zero-copy reads, OS-managed page cache |
| IPC | Unix Domain Sockets | ~1 ms round-trip vs ~200 ms process spawn |
| Probabilistic Filter | Bloom filters | Reject non-matching index blocks before decompression |
Design Patterns
| Pattern | Where | Purpose |
|---|---|---|
| Sidecar Process | Go daemon | Offload CPU-intensive work from PHP |
| Fluent Builder | ActiveQuery |
Chainable, composable query construction |
| Singleton | SocketClient |
Reuse a single daemon connection per request |
| Generator / Streaming | each(), GoBridge::query() |
Process millions of rows without loading them all into memory |
| External Merge Sort | Indexer | Build indexes on files larger than available RAM |
π¦ Installation
Prerequisites
| Requirement | Version |
|---|---|
| PHP | 8.1+ |
| Composer | 2.x |
| Go (optional β pre-compiled binaries included) | 1.21+ |
Via Composer
composer require entreya/csvquery
The Go binary is compiled automatically on composer install. If Go is not available on your machine, the bundled pre-compiled binary for your platform will be used instead.
Build Commands
composer build # Build for current platform composer build:all # Cross-compile for all platforms composer build:clean # Remove all compiled binaries
Manual Build
php scripts/build.php # Current OS/arch php scripts/build.php --all # All platforms
Platform Notes
| Platform | Notes |
|---|---|
| macOS | ARM64 (Apple Silicon) and AMD64 binaries included |
| Linux | AMD64 and ARM64 binaries included |
| Windows | Statically linked .exe β no MinGW / Cygwin / WSL required |
See CONTRIBUTING.md for per-platform development setup.
π Quick Start
<?php require 'vendor/autoload.php'; use Entreya\CsvQuery\Core\CsvQuery; // 1. Point to your CSV $csv = new CsvQuery('/data/sales.csv', [ 'indexDir' => '/data/indexes', // optional β defaults to CSV directory 'workers' => 8, // optional β parallel indexing workers ]); // 2. Create indexes (one-time, ~400 k rows/sec) $csv->createIndex(['STATUS', 'CATEGORY']); // 3. Query with a fluent API $results = $csv->find() ->select(['ID', 'NAME', 'STATUS']) ->where(['STATUS' => 'active']) ->andWhere(['>', 'SCORE', 80]) ->orderBy(['SCORE' => SORT_DESC]) ->limit(100) ->all(); // 4. Stream large result sets with generators foreach ($csv->find()->where(['CATEGORY' => 'premium'])->each() as $row) { echo $row['NAME'] . "\n"; } // 5. Aggregations $count = $csv->find()->where(['STATUS' => 'active'])->count(); $total = $csv->find()->groupBy('CATEGORY')->sum('AMOUNT');
π Usage Examples
Filtering with Operators
// Comparison operators $csv->find()->where(['>', 'PRICE', 100])->all(); $csv->find()->where(['<=', 'AGE', 30])->all(); $csv->find()->where(['!=', 'STATUS', 'deleted'])->all(); // BETWEEN $csv->find()->where(['BETWEEN', 'AGE', 18, 65])->all(); // IN $csv->find()->where(['IN', 'CATEGORY', ['A', 'B', 'C']])->all(); // LIKE $csv->find()->where(['LIKE', 'NAME', '%john%'])->all();
Complex Nested Conditions
$csv->find() ->where(['OR', ['STATUS' => 'active'], ['AND', ['>', 'SCORE', 90], ['TYPE' => 'vip'] ] ]) ->all();
Aggregations & Group By
// Simple aggregations $avg = $csv->find()->where(['STATUS' => 'active'])->average('SCORE'); $min = $csv->find()->min('PRICE'); $max = $csv->find()->max('PRICE'); $sum = $csv->find()->sum('AMOUNT'); $count = $csv->find()->count(); // Group-by counts $stats = $csv->find()->groupBy('CATEGORY')->count(); // β ['A' => 1200, 'B' => 890, 'C' => 450]
Data Mutation
// Insert a single row $csv->insert(['ID' => '999', 'NAME' => 'Alice', 'STATUS' => 'active']); // Batch insert $csv->batchInsert([ ['ID' => '1000', 'NAME' => 'Bob', 'STATUS' => 'active'], ['ID' => '1001', 'NAME' => 'Carol', 'STATUS' => 'pending'], ]); // Update rows (sidecar β does not rewrite CSV) $csv->update( ['STATUS' => 'inactive'], // SET ['ID' => '123'] // WHERE ); // Add a virtual column $csv->addColumn('REGION', 'UNKNOWN');
Query Debugging
// Inspect the generated SQL-like representation $command = $csv->find() ->select(['ID', 'NAME']) ->where(['STATUS' => 'active']) ->orderBy(['NAME' => SORT_ASC]) ->limit(10) ->createCommand(); echo $command->getQuery(); // β SELECT ID, NAME FROM `sales` WHERE `STATUS` = 'active' ORDER BY NAME ASC LIMIT 10 // Explain the execution plan $plan = $csv->find()->where(['STATUS' => 'active'])->explain(); // β ['strategy' => 'IndexScan', 'index' => 'STATUS', ...]
Efficient Iteration
// each() returns a Generator β only one row is in memory at a time foreach ($csv->find()->where(['STATUS' => 'active'])->each() as $row) { // $row is a Row object with ArrayAccess echo $row['NAME']; // Or use model methods $cell = $row->getCell('SCORE'); echo $cell->asInt(); // type-safe cast echo $cell->isNumeric(); // true/false } // Index results by a column $byId = $csv->find() ->indexBy('ID') ->all(); // β ['42' => Row, '43' => Row, ...] // Return plain arrays instead of Row objects $arrays = $csv->find()->asArray()->all();
π API Reference
CsvQuery β Entry Point
use Entreya\CsvQuery\Core\CsvQuery; $csv = new CsvQuery(string $csvPath, array $options = []);
Constructor Options
| Option | Type | Default | Description |
|---|---|---|---|
indexDir |
string |
CSV directory | Directory to store .cidx / .bloom files |
separator |
string |
, |
CSV column delimiter |
workers |
int |
CPU count | Parallel workers for indexing |
memoryMB |
int |
500 |
Memory limit per worker (MB) |
binaryPath |
string |
Auto-detect | Custom path to the Go binary |
Methods
| Method | Returns | Description |
|---|---|---|
find() |
ActiveQuery |
Start a new query |
where($condition, $value) |
ActiveQuery |
Shorthand for find()->where(...) |
andWhere($condition, $value) |
ActiveQuery |
Shorthand for find()->andWhere(...) |
createIndex(array $columns, bool $verbose, array $options) |
bool |
Create indexes (single or composite) |
hasIndex(string|array $column) |
bool |
Check whether an index exists |
dropIndex(string|array $column) |
bool |
Remove a specific index |
clearIndexes() |
int |
Remove all indexes; returns count deleted |
getHeaders() |
array |
Column names from the CSV header row |
insert(array $row) |
void |
Append a single row |
batchInsert(array $rows) |
void |
Append multiple rows |
update(array $attributes, array $conditions) |
int |
Update matching rows via sidecar; returns count |
addColumn(string $name, string $default) |
void |
Add a virtual column |
getMeta() |
array |
Index metadata (row count, file hash, etc.) |
validateIntegrity() |
bool |
Check if indexes are still valid for the current CSV |
ActiveQuery β Query Builder
$query = $csv->find();
Condition Methods (return $this for chaining)
| Method | Description |
|---|---|
where($condition, $value = null) |
Set the WHERE clause |
andWhere($condition, $value = null) |
Append an AND condition |
orWhere($condition, $value = null) |
Append an OR condition |
filterWhere(array $condition) |
WHERE, ignoring empty/null values |
andFilterWhere(array $condition) |
AND, ignoring empty/null values |
orFilterWhere(array $condition) |
OR, ignoring empty/null values |
Modifier Methods (return $this for chaining)
| Method | Description |
|---|---|
select(array $columns) |
Choose which columns to return |
orderBy(array|string $columns) |
Set ORDER BY |
addOrderBy(array|string $columns) |
Append additional ORDER BY |
groupBy(array|string $columns) |
Set GROUP BY |
addGroupBy(array|string $columns) |
Append additional GROUP BY |
limit(int $n) |
Maximum rows to return |
offset(int $n) |
Skip first n rows |
indexBy(string|callable $column) |
Key results by a column value |
asArray(bool $flag = true) |
Return plain arrays instead of Row objects |
debug(bool $enable = true) |
Enable debug mode on the bridge |
Execution Methods
| Method | Returns | Description |
|---|---|---|
all() |
array |
All matching rows |
one() |
?Row |
First matching row |
count(string $q = '*') |
int |
Count of matching rows |
sum(string $column) |
float |
Sum of column values |
average(string $column) |
float |
Average of column values |
min(string $column) |
mixed |
Minimum column value |
max(string $column) |
mixed |
Maximum column value |
each() |
Generator |
Stream rows one at a time |
aggregate(string $col, array $fns) |
array |
Multiple aggregations in one pass |
explain() |
array |
Query execution plan |
createCommand() |
Command |
SQL-like debug representation |
Condition Syntax
// Hash format (implicit AND) ->where(['STATUS' => 'active', 'TYPE' => 'premium']) // Operator format ->where(['>', 'SCORE', 80]) ->where(['BETWEEN', 'AGE', 18, 65]) ->where(['IN', 'CATEGORY', ['A', 'B', 'C']]) ->where(['LIKE', 'NAME', '%john%']) // Nested logical groups ->where(['OR', ['STATUS' => 'active'], ['AND', ['>', 'SCORE', 90], ['TYPE' => 'vip'] ] ])
Row β Result Object
Implements ArrayAccess, IteratorAggregate, and JsonSerializable.
$row = $csv->find()->one(); // Array access echo $row['NAME']; // Magic property echo $row->NAME; // Model methods $row->getColumn('NAME'); // β Column object $row->getCell('NAME'); // β Cell object $row->toAssociativeArray(); // β ['NAME' => 'Alice', ...] $row->getLineNumber(); // β int|null $row->toJson(); // β JSON string
Cell β Value Wrapper
$cell = $row->getCell('SCORE'); $cell->getValue(); // raw value $cell->asInt(); // (int) cast with default $cell->asFloat(); // (float) cast with default $cell->asBool(); // 1/true/yes/on/y β true $cell->asString(); // (string) cast $cell->isEmpty(); // null or '' $cell->isNumeric(); // is_numeric() // Inline validation $result = $cell->validate(['required', 'numeric', 'min:1', 'max:100']); // β ['valid' => true, 'errors' => []]
Column β Column View
$col = $row->getColumn('NAME'); $col->getValue(); $col->getName(); // 'NAME' $col->getIndex(); // column position (0-based) $col->getCell(); // β Cell $col->trim(); $col->toUpper(); $col->toLower();
π Data Flow
Query Lifecycle
sequenceDiagram
participant App as PHP App
participant CQ as CsvQuery
participant AQ as ActiveQuery
participant Bridge as GoBridge
participant Socket as SocketClient
participant Daemon as Go Daemon
participant Engine as QueryEngine
participant Disk as Filesystem
App->>CQ: find()->where(['ID' => '123'])->one()
CQ->>AQ: new ActiveQuery
AQ->>Bridge: query(...)
alt Socket Mode (default)
Bridge->>Socket: getInstance()
Socket->>Socket: ensureConnected()
Socket->>Daemon: {"action":"select","where":{"ID":"123"}}
else CLI Fallback
Bridge->>Bridge: spawn csvquery query ...
end
Daemon->>Engine: evaluate query
Engine->>Engine: findBestIndex()
alt Index Found
Engine->>Disk: Read .cidx sparse index
Engine->>Engine: Binary search for key
Engine->>Disk: Decompress LZ4 block
Engine->>Disk: fseek() to CSV offset
else No Index
Engine->>Disk: mmap entire CSV
Engine->>Engine: Parallel SIMD scan
end
Engine-->>Daemon: matching offsets
Daemon-->>Socket: JSON response
Socket-->>Bridge: parsed result
Bridge->>Disk: Read rows at offsets
Bridge-->>AQ: hydrate Row objects
AQ-->>App: Row | array
Loading
Index Creation
flowchart LR
A["createIndex(['STATUS'])"] --> B[GoBridge]
B --> C["Go: csvquery index"]
C --> D["SIMD Scanner<br/>(parallel workers)"]
D --> E["External Merge Sort"]
E --> F[".cidx Binary Index"]
E --> G[".bloom Filter"]
E --> H["_meta.json"]
Loading
Query Strategy Decision
flowchart TD
A[Incoming Query] --> B{Index exists<br/>for WHERE columns?}
B -- Yes --> C{Index valid?<br/>validateIntegrity}
C -- Yes --> D["IndexScan<br/>(binary search + LZ4)"]
C -- No --> E["Rebuild index or<br/>fall back to scan"]
B -- No --> F{Bloom filter<br/>available?}
F -- Yes --> G["Bloom-assisted<br/>FullScan"]
F -- No --> H["FullScan<br/>(parallel SIMD)"]
D --> I[Return offsets]
G --> I
H --> I
E --> I
Loading
β‘ Performance
Benchmarks on 1,000,000 rows (Darwin ARM64, Apple M-series):
Indexing Throughput
| Task | Throughput | Wall Time |
|---|---|---|
| Single-column index | ~400,000 rows/sec | ~2.5 s |
| Composite index (2 cols) | ~350,000 rows/sec | ~2.9 s |
Query Latency
| Query Type | Matching Rows | Latency |
|---|---|---|
COUNT(*) β no filter |
all | ~10 ms |
COUNT β indexed filter (0 hits) |
0 | ~14 ms |
COUNT β indexed filter |
150 K | ~25 ms |
SELECT β indexed filter |
1 K | ~50 ms |
| Full table scan (no index) | all | ~2,000 ms |
Tip
Zero-IO Index Scans β If the query can be satisfied entirely by index metadata (e.g. COUNT(*)), the engine never opens the CSV file.
π οΈ CLI Reference
The Go binary can be invoked directly for maintenance, debugging, or scripting:
./bin/csvquery <command> [flags]
Commands
index β Create indexes from a CSV file
./bin/csvquery index \
--input data.csv \
--columns '["STATUS", "CATEGORY"]' \
--output /path/to/indexes \
--workers 8 \
--memory 500 \
--bloom 0.01 \
--verbose
| Flag | Default | Description |
|---|---|---|
--input |
(required) | Path to CSV file |
--output |
CSV directory | Output directory for index files |
--columns |
[] |
JSON array of columns to index |
--separator |
, |
CSV delimiter |
--workers |
CPU count | Parallel workers |
--memory |
500 |
Memory limit per worker (MB) |
--bloom |
0.01 |
Bloom filter false-positive rate |
--verbose |
false |
Print progress |
query β Execute queries
./bin/csvquery query \
--csv data.csv \
--index-dir /path/to/indexes \
--where '{"STATUS":"active"}' \
--limit 100 \
--count
| Flag | Default | Description |
|---|---|---|
--csv |
Path to CSV file | |
--index-dir |
CSV directory | Index directory |
--where |
{} |
JSON conditions |
--limit |
0 (unlimited) |
Max results |
--offset |
0 |
Skip first n results |
--count |
false |
Output only the count |
--explain |
false |
Print query execution plan |
--group-by |
Column to group by | |
--agg-col |
Column to aggregate | |
--agg-func |
Aggregation function |
daemon β Start the UDS server
./bin/csvquery daemon \ --socket /tmp/csvquery.sock \ --index-dir /path/to/indexes \ --workers 50
| Flag | Default | Description |
|---|---|---|
--socket |
/tmp/csvquery.sock |
Unix socket path |
--host |
127.0.0.1 |
TCP host (if --port is set) |
--port |
0 |
TCP port (0 = use Unix socket) |
--csv |
Default CSV path | |
--index-dir |
Default index directory | |
--workers |
50 |
Max concurrent handlers |
write β Append rows to a CSV file
./bin/csvquery write \ --csv data.csv \ --data '[["1001","Alice","active"]]' \ --headers '["ID","NAME","STATUS"]'
| Flag | Default | Description |
|---|---|---|
--csv |
(required) | Target CSV file |
--headers |
[] |
JSON array of headers (new file only) |
--data |
[] |
JSON array of row arrays |
--separator |
, |
CSV delimiter |
version β Print version
./bin/csvquery version
# β CsvQuery v1.0.0 (2026-02-07)
π Project Structure
csvquery/
βββ src/
β βββ php/ # PHP source (Entreya\CsvQuery\)
β β βββ Core/
β β β βββ CsvQuery.php # Entry point, index lifecycle
β β βββ Query/
β β β βββ ActiveQuery.php # Fluent query builder
β β β βββ Command.php # SQL-like debug output
β β βββ Bridge/
β β β βββ GoBridge.php # Go binary wrapper
β β β βββ SocketClient.php # UDS daemon client (singleton)
β β βββ Models/
β β βββ Row.php # Row with ArrayAccess
β β βββ Cell.php # Type-safe cell wrapper
β β βββ Column.php # Column metadata view
β βββ go/ # Go source
β βββ main.go # CLI entry point
β βββ go.mod
β βββ internal/
β βββ common/ # Shared types (IndexRecord, Meta)
β βββ indexer/ # CSV indexing pipeline
β βββ query/ # Query engine, index selection
β βββ server/ # Unix socket daemon
β βββ simd/ # AVX2/SSE4.2 scanning
β βββ alter/ # Schema modifications
β βββ update/ # Row update operations
β βββ updatemgr/ # Update file management
β βββ writer/ # CSV write operations
β βββ schema/ # Virtual columns
βββ bin/ # Pre-compiled Go binaries
βββ benchmarks/ # Performance benchmarks
βββ examples/
β βββ quick_start.php
βββ tests/ # PHPUnit tests
βββ scripts/
β βββ build.php # Cross-platform build script
βββ ARCHITECTURE.md
βββ CHANGELOG.md
βββ CONTRIBUTING.md
βββ composer.json
βββ LICENSE
βββ README.md
See ARCHITECTURE.md for detailed module documentation.
π§ͺ Testing
PHP
composer install composer test # or: ./vendor/bin/phpunit tests/
Go
cd src/go go test -v ./internal/... go test -bench=. -benchmem ./internal/query/
π€ Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
# 1. Fork & clone git clone https://github.com/<you>/csvquery.git && cd csvquery # 2. Create a feature branch git checkout -b feature/amazing-feature # 3. Make changes, run tests composer test # 4. Commit (conventional commits encouraged) git commit -m 'feat: add amazing feature' # 5. Push & open a Pull Request git push origin feature/amazing-feature
π License & Acknowledgments
This project is licensed under the MIT License β see the LICENSE file for details.
Acknowledgments:
- Query API inspired by Yii2 ActiveQuery
- LZ4 compression via pierrec/lz4
- SIMD optimizations inspired by simdjson
