phalanx / postgres
Async PostgreSQL for Phalanx via amphp/postgres
Requires
- php: ^8.4
- amphp/postgres: ^2.2
- phalanx/core: ^1.0
- phalanx/stream: ^1.0
- revolt/event-loop-adapter-react: ^1.1
This package is auto-updated.
Last update: 2026-03-27 13:53:25 UTC
README
phalanx/postgres
Async PostgreSQL with connection pooling, prepared statements, transactions, and LISTEN/NOTIFY — all non-blocking, all composable with Phalanx's concurrency primitives.
Installation
composer require phalanx/postgres
Requires ext-pgsql and PHP 8.4+.
Setup
Register the service bundle when building your application:
<?php use Phalanx\Postgres\PgServiceBundle; $app = Application::starting() ->providers(new PgServiceBundle()) ->compile();
PgServiceBundle registers PgPool and PgListener as singletons with automatic shutdown hooks. Connections close cleanly when the scope tears down.
Configuration
PgConfig accepts a DSN string or individual parameters:
<?php // DSN PgConfig::fromDsn('postgresql://user:pass@localhost:5432/mydb'); // Individual params new PgConfig( host: 'localhost', port: 5432, user: 'app', password: 'secret', database: 'mydb', maxConnections: 10, idleTimeout: 30, );
Environment variables work out of the box — PgConfig resolves PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DATABASE when no explicit values are provided.
Queries and Execution
PgPool wraps Amphp's PostgresConnectionPool with a clean async interface:
<?php $pg = $scope->service(PgPool::class); // Parameterized SELECT — returns rows as associative arrays $users = $pg->query( 'SELECT * FROM users WHERE active = $1 LIMIT $2', [true, 50] ); // Non-SELECT statements — returns affected row count $pg->execute( 'UPDATE users SET last_seen = NOW() WHERE id = $1', [$userId] ); // Prepared statements — parse once, execute many $stmt = $pg->prepare('INSERT INTO events (type, payload) VALUES ($1, $2)'); foreach ($events as $event) { $stmt->execute([$event->type, json_encode($event->data)]); }
Every query uses parameterized placeholders ($1, $2, ...). No string interpolation, no injection surface.
Concurrent Queries
Because PgPool manages multiple connections, you can run queries concurrently through Phalanx's concurrency primitives:
<?php [$users, $orders, $stats] = $scope->concurrent([ Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT * FROM users')), Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT * FROM orders')), Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT count(*) FROM events')), ]);
Three queries, three connections, one await. The pool handles connection checkout and return automatically.
Transactions
<?php $pg = $scope->service(PgPool::class); $tx = $pg->beginTransaction(); try { $tx->execute('INSERT INTO orders (user_id, total) VALUES ($1, $2)', [$userId, $total]); $tx->execute('UPDATE inventory SET qty = qty - $1 WHERE sku = $2', [$qty, $sku]); $tx->commit(); } catch (\Throwable $e) { $tx->rollback(); throw $e; }
The transaction holds a dedicated connection from the pool until committed or rolled back.
LISTEN/NOTIFY
PostgreSQL's LISTEN/NOTIFY turns your database into a lightweight message broker. PgListener provides a dedicated connection for subscriptions so notifications never compete with query traffic.
<?php $listener = $scope->service(PgListener::class); // Subscribe — returns an async iterable foreach ($listener->listen('order_created') as $notification) { $order = json_decode($notification->payload, true); processOrder($order); }
Publishing from any connection:
<?php $pg = $scope->service(PgPool::class); $pg->notify('order_created', json_encode(['id' => $orderId, 'total' => $total]));
Combine LISTEN with Phalanx's concurrency to fan out event handling across multiple channels without blocking:
<?php $scope->concurrent([ Task::of(static fn($s) => handleChannel($s, 'order_created')), Task::of(static fn($s) => handleChannel($s, 'payment_received')), Task::of(static fn($s) => handleChannel($s, 'inventory_low')), ]);
Shutdown
PgServiceBundle registers disposal hooks on the scope. When the scope tears down — whether from normal completion, cancellation, or error — open connections drain and close. No manual cleanup required.