phalanx/postgres

Async PostgreSQL for Phalanx via amphp/postgres

Maintainers

Package info

github.com/havy-tech/phalanx-postgres

pkg:composer/phalanx/postgres

Statistics

Installs: 0

Dependents: 0

Suggesters: 1

Stars: 0

Open Issues: 0

v0.2.0 2026-03-27 04:00 UTC

This package is auto-updated.

Last update: 2026-03-27 13:53:25 UTC


README

Phalanx

phalanx/postgres

Async PostgreSQL with connection pooling, prepared statements, transactions, and LISTEN/NOTIFY — all non-blocking, all composable with Phalanx's concurrency primitives.

Installation

composer require phalanx/postgres

Requires ext-pgsql and PHP 8.4+.

Setup

Register the service bundle when building your application:

<?php

use Phalanx\Postgres\PgServiceBundle;

$app = Application::starting()
    ->providers(new PgServiceBundle())
    ->compile();

PgServiceBundle registers PgPool and PgListener as singletons with automatic shutdown hooks. Connections close cleanly when the scope tears down.

Configuration

PgConfig accepts a DSN string or individual parameters:

<?php

// DSN
PgConfig::fromDsn('postgresql://user:pass@localhost:5432/mydb');

// Individual params
new PgConfig(
    host: 'localhost',
    port: 5432,
    user: 'app',
    password: 'secret',
    database: 'mydb',
    maxConnections: 10,
    idleTimeout: 30,
);

Environment variables work out of the box — PgConfig resolves PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DATABASE when no explicit values are provided.

Queries and Execution

PgPool wraps Amphp's PostgresConnectionPool with a clean async interface:

<?php

$pg = $scope->service(PgPool::class);

// Parameterized SELECT — returns rows as associative arrays
$users = $pg->query(
    'SELECT * FROM users WHERE active = $1 LIMIT $2',
    [true, 50]
);

// Non-SELECT statements — returns affected row count
$pg->execute(
    'UPDATE users SET last_seen = NOW() WHERE id = $1',
    [$userId]
);

// Prepared statements — parse once, execute many
$stmt = $pg->prepare('INSERT INTO events (type, payload) VALUES ($1, $2)');
foreach ($events as $event) {
    $stmt->execute([$event->type, json_encode($event->data)]);
}

Every query uses parameterized placeholders ($1, $2, ...). No string interpolation, no injection surface.

Concurrent Queries

Because PgPool manages multiple connections, you can run queries concurrently through Phalanx's concurrency primitives:

<?php

[$users, $orders, $stats] = $scope->concurrent([
    Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT * FROM users')),
    Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT * FROM orders')),
    Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT count(*) FROM events')),
]);

Three queries, three connections, one await. The pool handles connection checkout and return automatically.

Transactions

<?php

$pg = $scope->service(PgPool::class);

$tx = $pg->beginTransaction();
try {
    $tx->execute('INSERT INTO orders (user_id, total) VALUES ($1, $2)', [$userId, $total]);
    $tx->execute('UPDATE inventory SET qty = qty - $1 WHERE sku = $2', [$qty, $sku]);
    $tx->commit();
} catch (\Throwable $e) {
    $tx->rollback();
    throw $e;
}

The transaction holds a dedicated connection from the pool until committed or rolled back.

LISTEN/NOTIFY

PostgreSQL's LISTEN/NOTIFY turns your database into a lightweight message broker. PgListener provides a dedicated connection for subscriptions so notifications never compete with query traffic.

<?php

$listener = $scope->service(PgListener::class);

// Subscribe — returns an async iterable
foreach ($listener->listen('order_created') as $notification) {
    $order = json_decode($notification->payload, true);
    processOrder($order);
}

Publishing from any connection:

<?php

$pg = $scope->service(PgPool::class);
$pg->notify('order_created', json_encode(['id' => $orderId, 'total' => $total]));

Combine LISTEN with Phalanx's concurrency to fan out event handling across multiple channels without blocking:

<?php

$scope->concurrent([
    Task::of(static fn($s) => handleChannel($s, 'order_created')),
    Task::of(static fn($s) => handleChannel($s, 'payment_received')),
    Task::of(static fn($s) => handleChannel($s, 'inventory_low')),
]);

Shutdown

PgServiceBundle registers disposal hooks on the scope. When the scope tears down — whether from normal completion, cancellation, or error — open connections drain and close. No manual cleanup required.