xcgu / amphp-mongodb
Asynchronous MongoDB client for PHP based on Amp.
Installs: 0
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/xcgu/amphp-mongodb
Requires
- php: >=8.1
- amphp/amp: ^3
- amphp/pipeline: ^1
- amphp/socket: ^2
Requires (Dev)
- phpunit/phpunit: ^9
This package is not auto-updated.
Last update: 2025-12-16 19:46:19 UTC
README
Async MongoDB client for PHP based on AMPHP and Revolt.
This library provides a non-blocking, concurrent MongoDB client that leverages PHP fibers for truly asynchronous database operations.
Features
- Fully Async: Built on AMPHP v3 with Revolt event loop
- Connection Pooling: Efficient connection management with automatic cleanup
- Authentication: SCRAM-SHA-256 and SCRAM-SHA-1 support
- Complete CRUD Operations: Insert, find, update, delete with full options support
- Aggregation Pipeline: Execute complex aggregation queries
- Transactions: Full multi-document transaction support
- Wire Protocol: Native MongoDB wire protocol (OP_MSG) implementation
- Type-Safe: Modern PHP 8.1+ with strict types and readonly properties
- Concurrent Operations: Execute multiple queries in parallel
Installation
composer require xcgu/amphp-mongodb
Requirements
- PHP 8.1 or higher
- MongoDB 3.6+ (for OP_MSG support)
Quick Start
<?php require __DIR__ . '/vendor/autoload.php'; use Amp\Mongodb\MongodbConfig; use Amp\Mongodb\MongodbConnectionPool; use function Amp\async; // Create configuration $config = new MongodbConfig( host: 'localhost', port: 27017, database: 'testdb', user: 'user', password: 'pass' ); // Create connection pool $pool = new MongodbConnectionPool($config); // The pool manages connections internally $result = $pool->insertOne('users', [ 'name' => 'John Doe', 'email' => 'john@example.com', 'age' => 30 ]); echo "Inserted {$result->getInsertedCount()} document(s)\n"; // Find documents $cursor = $pool->find('users', ['age' => ['$gte' => 18]]); foreach ($cursor as $document) { echo "Found: {$document['name']}\n"; } // Close pool when done $pool->close();
Concurrent Operations
Execute multiple operations in parallel:
use function Amp\async; use function Amp\Future\await; $pool = new MongodbConnectionPool($config); // Run multiple queries concurrently $futures = [ async(fn() => $pool->countDocuments('users', [])), async(fn() => $pool->countDocuments('posts', [])), async(fn() => $pool->countDocuments('comments', [])) ]; [$userCount, $postCount, $commentCount] = await($futures); echo "Users: $userCount, Posts: $postCount, Comments: $commentCount\n"; $pool->close();
Transactions
$transaction = $pool->beginTransaction(); try { $transaction->insertOne('accounts', [ 'name' => 'Alice', 'balance' => 1000 ]); $transaction->updateOne('accounts', ['name' => 'Bob'], ['$inc' => ['balance' => 100]] ); $transaction->commit(); echo "Transaction committed successfully\n"; } catch (\Throwable $e) { $transaction->rollback(); echo "Transaction rolled back: {$e->getMessage()}\n"; }
Aggregation Pipeline
$pipeline = [ ['$match' => ['status' => 'active']], ['$group' => [ '_id' => '$category', 'total' => ['$sum' => '$amount'] ]], ['$sort' => ['total' => -1]] ]; $cursor = $pool->aggregate('sales', $pipeline); foreach ($cursor as $result) { echo "Category: {$result['_id']}, Total: {$result['total']}\n"; }
Connection Pooling
The connection pool automatically manages connections with configurable limits:
$config = new MongodbConfig( host: 'localhost', port: 27017, database: 'mydb' ); // Configure pool size and timeout when creating the pool $pool = new MongodbConnectionPool( config: $config, maxConnections: 10, // Maximum pool size idleTimeout: 60 // Close idle connections after 60 seconds );
BSON Types
The library supports all MongoDB BSON types:
use Amp\Mongodb\Internal\BSON\Types\ObjectId; use Amp\Mongodb\Internal\BSON\Types\UTCDateTime; use Amp\Mongodb\Internal\BSON\Types\Binary; use Amp\Mongodb\Internal\BSON\Types\Regex; $pool->insertOne('documents', [ '_id' => new ObjectId(), 'created' => new UTCDateTime(), 'data' => new Binary('binary data', Binary::BINARY_GENERIC), 'pattern' => new Regex('^test', 'i') ]);
Error Handling
use Amp\Mongodb\MongodbException; use Amp\Mongodb\MongodbQueryError; use Amp\Mongodb\MongodbConnectionException; try { $result = $pool->insertOne('users', ['name' => 'Test']); } catch (MongodbQueryError $e) { // Query execution errors (duplicate key, validation, etc.) echo "Query error: {$e->getMessage()}\n"; } catch (MongodbConnectionException $e) { // Connection/network errors echo "Connection error: {$e->getMessage()}\n"; } catch (MongodbException $e) { // General MongoDB errors echo "MongoDB error: {$e->getMessage()}\n"; }
API Reference
Connection Methods
find(string $collection, array $filter, array $options = []): MongodbResultfindOne(string $collection, array $filter, array $options = []): ?arrayinsertOne(string $collection, array $document, array $options = []): MongodbCommandResultinsertMany(string $collection, array $documents, array $options = []): MongodbCommandResultupdateOne(string $collection, array $filter, array $update, array $options = []): MongodbCommandResultupdateMany(string $collection, array $filter, array $update, array $options = []): MongodbCommandResultreplaceOne(string $collection, array $filter, array $replacement, array $options = []): MongodbCommandResultdeleteOne(string $collection, array $filter, array $options = []): MongodbCommandResultdeleteMany(string $collection, array $filter, array $options = []): MongodbCommandResultcountDocuments(string $collection, array $filter, array $options = []): intaggregate(string $collection, array $pipeline, array $options = []): MongodbResultfindOneAndUpdate(string $collection, array $filter, array $update, array $options = []): ?arrayfindOneAndDelete(string $collection, array $filter, array $options = []): ?arrayfindOneAndReplace(string $collection, array $filter, array $replacement, array $options = []): ?arraycreateIndex(string $collection, array $keys, array $options = []): arraydropIndex(string $collection, string $indexName): arraylistIndexes(string $collection): MongodbResultlistCollections(array $filter = []): MongodbResultdropCollection(string $collection): arraycreateCollection(string $collection, array $options = []): arraybeginTransaction(array $options = []): MongodbTransactioncommand(array $command, ?string $databaseName = null): arrayclose(): void
Performance Tips
- Use Connection Pooling: Reuse connections instead of creating new ones
- Concurrent Operations: Use
async()andawait()for parallel queries - Batch Operations: Use
insertMany()andupdateMany()when possible - Projection: Limit returned fields with projection option
- Indexes: Create appropriate indexes for frequent queries
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
The MIT License (MIT). Please see LICENSE for more information.