xcgu/amphp-mongodb

Asynchronous MongoDB client for PHP based on Amp.

Installs: 0

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/xcgu/amphp-mongodb

v1.1.0 2025-11-30 12:41 UTC

This package is not auto-updated.

Last update: 2025-12-16 19:46:19 UTC


README

MIT License PHP Version

Async MongoDB client for PHP based on AMPHP and Revolt.

This library provides a non-blocking, concurrent MongoDB client that leverages PHP fibers for truly asynchronous database operations.

Features

  • Fully Async: Built on AMPHP v3 with Revolt event loop
  • Connection Pooling: Efficient connection management with automatic cleanup
  • Authentication: SCRAM-SHA-256 and SCRAM-SHA-1 support
  • Complete CRUD Operations: Insert, find, update, delete with full options support
  • Aggregation Pipeline: Execute complex aggregation queries
  • Transactions: Full multi-document transaction support
  • Wire Protocol: Native MongoDB wire protocol (OP_MSG) implementation
  • Type-Safe: Modern PHP 8.1+ with strict types and readonly properties
  • Concurrent Operations: Execute multiple queries in parallel

Installation

composer require xcgu/amphp-mongodb

Requirements

  • PHP 8.1 or higher
  • MongoDB 3.6+ (for OP_MSG support)

Quick Start

<?php

require __DIR__ . '/vendor/autoload.php';

use Amp\Mongodb\MongodbConfig;
use Amp\Mongodb\MongodbConnectionPool;
use function Amp\async;

// Create configuration
$config = new MongodbConfig(
    host: 'localhost',
    port: 27017,
    database: 'testdb',
    user: 'user',
    password: 'pass'
);

// Create connection pool
$pool = new MongodbConnectionPool($config);

// The pool manages connections internally
$result = $pool->insertOne('users', [
    'name' => 'John Doe',
    'email' => 'john@example.com',
    'age' => 30
]);

echo "Inserted {$result->getInsertedCount()} document(s)\n";

// Find documents
$cursor = $pool->find('users', ['age' => ['$gte' => 18]]);

foreach ($cursor as $document) {
    echo "Found: {$document['name']}\n";
}

// Close pool when done
$pool->close();

Concurrent Operations

Execute multiple operations in parallel:

use function Amp\async;
use function Amp\Future\await;

$pool = new MongodbConnectionPool($config);

// Run multiple queries concurrently
$futures = [
    async(fn() => $pool->countDocuments('users', [])),
    async(fn() => $pool->countDocuments('posts', [])),
    async(fn() => $pool->countDocuments('comments', []))
];

[$userCount, $postCount, $commentCount] = await($futures);

echo "Users: $userCount, Posts: $postCount, Comments: $commentCount\n";

$pool->close();

Transactions

$transaction = $pool->beginTransaction();

try {
    $transaction->insertOne('accounts', [
        'name' => 'Alice',
        'balance' => 1000
    ]);
    
    $transaction->updateOne('accounts', 
        ['name' => 'Bob'],
        ['$inc' => ['balance' => 100]]
    );
    
    $transaction->commit();
    echo "Transaction committed successfully\n";
    
} catch (\Throwable $e) {
    $transaction->rollback();
    echo "Transaction rolled back: {$e->getMessage()}\n";
}

Aggregation Pipeline

$pipeline = [
    ['$match' => ['status' => 'active']],
    ['$group' => [
        '_id' => '$category',
        'total' => ['$sum' => '$amount']
    ]],
    ['$sort' => ['total' => -1]]
];

$cursor = $pool->aggregate('sales', $pipeline);

foreach ($cursor as $result) {
    echo "Category: {$result['_id']}, Total: {$result['total']}\n";
}

Connection Pooling

The connection pool automatically manages connections with configurable limits:

$config = new MongodbConfig(
    host: 'localhost',
    port: 27017,
    database: 'mydb'
);

// Configure pool size and timeout when creating the pool
$pool = new MongodbConnectionPool(
    config: $config,
    maxConnections: 10,      // Maximum pool size
    idleTimeout: 60          // Close idle connections after 60 seconds
);

BSON Types

The library supports all MongoDB BSON types:

use Amp\Mongodb\Internal\BSON\Types\ObjectId;
use Amp\Mongodb\Internal\BSON\Types\UTCDateTime;
use Amp\Mongodb\Internal\BSON\Types\Binary;
use Amp\Mongodb\Internal\BSON\Types\Regex;

$pool->insertOne('documents', [
    '_id' => new ObjectId(),
    'created' => new UTCDateTime(),
    'data' => new Binary('binary data', Binary::BINARY_GENERIC),
    'pattern' => new Regex('^test', 'i')
]);

Error Handling

use Amp\Mongodb\MongodbException;
use Amp\Mongodb\MongodbQueryError;
use Amp\Mongodb\MongodbConnectionException;

try {
    $result = $pool->insertOne('users', ['name' => 'Test']);
} catch (MongodbQueryError $e) {
    // Query execution errors (duplicate key, validation, etc.)
    echo "Query error: {$e->getMessage()}\n";
} catch (MongodbConnectionException $e) {
    // Connection/network errors
    echo "Connection error: {$e->getMessage()}\n";
} catch (MongodbException $e) {
    // General MongoDB errors
    echo "MongoDB error: {$e->getMessage()}\n";
}

API Reference

Connection Methods

  • find(string $collection, array $filter, array $options = []): MongodbResult
  • findOne(string $collection, array $filter, array $options = []): ?array
  • insertOne(string $collection, array $document, array $options = []): MongodbCommandResult
  • insertMany(string $collection, array $documents, array $options = []): MongodbCommandResult
  • updateOne(string $collection, array $filter, array $update, array $options = []): MongodbCommandResult
  • updateMany(string $collection, array $filter, array $update, array $options = []): MongodbCommandResult
  • replaceOne(string $collection, array $filter, array $replacement, array $options = []): MongodbCommandResult
  • deleteOne(string $collection, array $filter, array $options = []): MongodbCommandResult
  • deleteMany(string $collection, array $filter, array $options = []): MongodbCommandResult
  • countDocuments(string $collection, array $filter, array $options = []): int
  • aggregate(string $collection, array $pipeline, array $options = []): MongodbResult
  • findOneAndUpdate(string $collection, array $filter, array $update, array $options = []): ?array
  • findOneAndDelete(string $collection, array $filter, array $options = []): ?array
  • findOneAndReplace(string $collection, array $filter, array $replacement, array $options = []): ?array
  • createIndex(string $collection, array $keys, array $options = []): array
  • dropIndex(string $collection, string $indexName): array
  • listIndexes(string $collection): MongodbResult
  • listCollections(array $filter = []): MongodbResult
  • dropCollection(string $collection): array
  • createCollection(string $collection, array $options = []): array
  • beginTransaction(array $options = []): MongodbTransaction
  • command(array $command, ?string $databaseName = null): array
  • close(): void

Performance Tips

  1. Use Connection Pooling: Reuse connections instead of creating new ones
  2. Concurrent Operations: Use async() and await() for parallel queries
  3. Batch Operations: Use insertMany() and updateMany() when possible
  4. Projection: Limit returned fields with projection option
  5. Indexes: Create appropriate indexes for frequent queries

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

The MIT License (MIT). Please see LICENSE for more information.

Credits

  • Built with AMPHP
  • Powered by Revolt
  • Inspired by the MongoDB Wire Protocol specification