mroosz/php-cassandra

Cassandra client library with support for protocol v5 and asynchronous requests

v0.8.1 2024-06-17 22:01 UTC

This package is auto-updated.

Last update: 2025-09-15 22:14:25 UTC


README

Latest Stable Version License PHP Version Require Total Downloads

php-cassandra is a pure-PHP client for Apache Cassandra with support for CQL binary protocol v3, v4 and v5 (Cassandra 4.x/5.x), synchronous and asynchronous APIs, prepared statements, batches, result iterators, object mapping, SSL/TLS, and LZ4 compression.

Packagist: mroosz/php-cassandra
Repository: GitHub – MichaelRoosz/php-cassandra

Table of contents

Introduction

php-cassandra is a modern PHP client for Apache Cassandra that prioritizes correctness, performance, and developer experience. This library aims to provide full protocol coverage and advanced features while maintaining simplicity.

Why choose php-cassandra?

🚀 Modern Architecture

  • Pure PHP implementation with no external dependencies
  • Support for latest Cassandra protocol versions (v3/v4/v5)
  • Built for PHP 8.1+ with modern language features

⚡ High Performance

  • Asynchronous request pipelining for maximum throughput
  • LZ4 compression support for reduced bandwidth
  • Prepared statement caching and reuse

🎯 Developer Friendly

  • Complete data type coverage including complex nested structures
  • Rich configuration options with sensible defaults
  • Object mapping with customizable row classes

Key Features

  • Protocol Support: v3/v4/v5 with automatic negotiation
  • Transports: Sockets and PHP streams (SSL/TLS, persistent connections)
  • Request Types: Synchronous, Asynchronous
  • Statements: Prepared statements with positional/named binding, auto-prepare
  • Data Types: Full coverage including collections, tuples, UDTs, custom types, vectors
  • Results: Iterators, multiple fetch styles, object mapping
  • Events: Schema/status/topology change notifications
  • Advanced: LZ4 compression, server overload signaling, tracing support

Requirements

System Requirements

Component Minimum Recommended Notes
PHP Version 8.1.0 8.3+ Latest stable version recommended
Architecture 32-bit/64-bit 64-bit 64-bit required for Bigint/Counter/Date/Duration/Time/Timestamp types and defaultTimestamp request option (unsupported on 32-bit)

PHP Extensions

Extension Required Purpose Notes
sockets Optional Socket transport Required for sockets connections configured with SocketNodeConfig
bcmath or gmp Optional Performance improvement for large integer operations Used by Varint and Decimal types
openssl Optional SSL/TLS connections Required for encrypted connections

Data Type Compatibility

Some Cassandra data types require 64-bit PHP and are unsupported on 32-bit:

Type 32-bit PHP 64-bit PHP Notes
Bigint ⚠️ Partial ✅ Full Supported if the value is within 32-bit range
Counter ⚠️ Partial ✅ Full Supported if the value is within 32-bit range
Date ❌ Unsupported ✅ Full Requires 64-bit PHP
Duration ❌ Unsupported ✅ Full Requires 64-bit PHP
Time ❌ Unsupported ✅ Full Requires 64-bit PHP
Timestamp ❌ Unsupported ✅ Full Requires 64-bit PHP

Additionally, the defaultTimestamp request option (in QueryOptions and BatchOptions) requires 64-bit PHP and is unsupported on 32-bit.

Installation

Using Composer (Recommended)

composer require mroosz/php-cassandra

Then include Composer's autoloader in your application entrypoint (if not already):

<?php
require __DIR__ . '/vendor/autoload.php';

Without Composer

If you can't use Composer, you can load the library's own autoloader:

<?php
require __DIR__ . '/php-cassandra/php-cassandra.php';

Quick start

Basic Connection and Query

<?php

use Cassandra\Connection;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection\ConnectionOptions;
use Cassandra\Consistency;

// Connect to Cassandra
$nodes = [
    new StreamNodeConfig(
        host: '127.0.0.1', 
        port: 9042, 
        username: 'cassandra', 
        password: 'cassandra'
    ),
];

$conn = new Connection($nodes, keyspace: 'my_keyspace');
$conn->connect();
$conn->setConsistency(Consistency::QUORUM);

// Simple query
$result = $conn->query('SELECT * FROM system.local')->asRowsResult();
foreach ($result as $row) {
    echo "Cluster: " . $row['cluster_name'] . "\n";
}

Prepared statements

<?php
use Cassandra\Request\Options\ExecuteOptions;
use Cassandra\Value\Uuid;
use Cassandra\Consistency;

// Prepare a statement
$prepared = $conn->prepare('SELECT * FROM users WHERE id = ? AND status = ?');

// Execute with positional parameters
$result = $conn->execute(
    $prepared, 
    [
        Uuid::fromValue('550e8400-e29b-41d4-a716-446655440000'),
        'active'
    ],
    consistency: Consistency::LOCAL_QUORUM,
    options: new ExecuteOptions(pageSize: 100)
)->asRowsResult();

foreach ($result as $user) {
    echo "User: {$user['name']} ({$user['email']})\n";
}

// Execute with named parameters
$namedPrepared = $conn->prepare('SELECT * FROM users WHERE email = :email AND org_id = :org_id');
$result = $conn->execute(
    $namedPrepared,
    ['email' => 'john@example.com', 'org_id' => 123],
    options: new ExecuteOptions(namesForValues: true)
)->asRowsResult();

Async Operations Example

<?php
use Cassandra\Request\Options\QueryOptions;

// Fire multiple queries concurrently
$statements = [];
$statements[] = $conn->queryAsync(
    'SELECT COUNT(*) FROM users', 
    options: new QueryOptions(pageSize: 1000)
);
$statements[] = $conn->queryAsync(
    'SELECT * FROM users LIMIT 10',
    options: new QueryOptions(pageSize: 10)
);

// Process results as they become available
$userCount = $statements[0]->getRowsResult()->fetch()['count'];
$recentUsers = $statements[1]->getRowsResult()->fetchAll();

echo "Total users: {$userCount}\n";
echo "Recent users: " . count($recentUsers) . "\n";

Error Handling Example

<?php
use Cassandra\Exception\ServerException;
use Cassandra\Exception\ConnectionException;
use Cassandra\Exception\CassandraException;

try {
    $result = $conn->query(
        'SELECT * FROM users WHERE email = ?',
        ['john.doe@example.com'],
        Consistency::LOCAL_QUORUM
    )->asRowsResult();
    
    foreach ($result as $user) {
        echo "Found user: {$user['name']}\n";
    }
    
} catch (ServerException $e) {
    echo "Server error: " . $e->getMessage() . "\n";
    // Inspect $e->getContext() and $e->getErrorContext() for server-provided details
    
} catch (ConnectionException $e) {
    echo "Connection error: " . $e->getMessage() . "\n";
    
} catch (CassandraException $e) {
    echo "Client error: " . $e->getMessage() . "\n";
}

SSL/TLS Connection Example

<?php
use Cassandra\Connection\StreamNodeConfig;

// Secure connection with TLS
$secureNode = new StreamNodeConfig(
    host: 'tls://cassandra.example.com',
    port: 9042,
    username: 'secure_user',
    password: 'secure_password',
    sslOptions: [
        'cafile' => '/path/to/ca.pem',
        'verify_peer' => true,
        'verify_peer_name' => true,
    ]
);

$conn = new Connection([$secureNode], keyspace: 'production_app');
$conn->connect();

echo "Secure connection established!\n";

Connecting

Create NodeConfig instances and pass them to Connection:

use Cassandra\Connection\SocketNodeConfig;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection;

// Stream transport
$streamNode = new StreamNodeConfig(
    host: 'tls://cassandra.example.com',
    port: 9042,
    username: 'user',
    password: 'secret',
    connectTimeoutInSeconds: 10,
    timeoutInSeconds: 30,
);

// Stream transport with SSL/TLS
$streamTlsNode = new StreamNodeConfig(
    host: 'tls://cassandra.example.com',
    port: 9042,
    username: 'user',
    password: 'secret',
    connectTimeoutInSeconds: 10,
    timeoutInSeconds: 30,
    sslOptions: [
        // See [PHP SSL context options](https://www.php.net/manual/en/context.ssl.php)
        'cafile' => '/etc/ssl/certs/ca.pem',
        'verify_peer' => true,
        'verify_peer_name' => true,
    ]
);

// Socket transport
$socketNode = new SocketNodeConfig(
    host: '10.0.0.10',
    port: 9042,
    username: 'user',
    password: 'secret',
    // See [PHP socket_get_option documentation](https://www.php.net/manual/en/function.socket-get-option.php)
    socketOptions: [SO_RCVTIMEO => ['sec' => 10, 'usec' => 0]]
);

$conn = new Connection([$streamNode, $streamTlsNode, $socketNode], keyspace: 'app');
$conn->connect();

Connection options are provided via ConnectionOptions:

  • enableCompression = use LZ4 if enabled on server
  • throwOnOverload = true to ask server to throw on overload (v4+)
  • nodeSelectionStrategy = Random (default) or RoundRobin
  • preparedResultCacheSize = cache size for prepared metadata (default 100)

Keyspace selection:

  • v5: can also be sent per-request via Query/Execute options (see below)
  • v3/v4: call $conn->setKeyspace('ks') or run USE ks

Consistency levels

Use the Consistency enum:

  • ALL, ANY, EACH_QUORUM, LOCAL_ONE, LOCAL_QUORUM, LOCAL_SERIAL, ONE, QUORUM, SERIAL, THREE, TWO

Apply per call or as default via setConsistency().

Queries

Synchronous:

use Cassandra\Value\Uuid;
use Cassandra\Consistency;
use Cassandra\Request\Options\QueryOptions;

$rowsResult = $conn->query(
    'SELECT id, name FROM ks.users WHERE id = ?',
    [Uuid::fromValue($id)],
    consistency: Consistency::ONE,
    options: new QueryOptions(pageSize: 100)
)->asRowsResult();

Asynchronous:

use Cassandra\Request\Options\QueryOptions;

$s1 = $conn->queryAsync('SELECT count(*) FROM ks.t1', options: new QueryOptions(pageSize: 1000));
$s2 = $conn->queryAsync('SELECT count(*) FROM ks.t2', options: new QueryOptions(pageSize: 1000));

$r2 = $s2->getResult()->asRowsResult();
$r1 = $s1->getResult()->asRowsResult();

Query options (QueryOptions):

  • autoPrepare (bool, default true): transparently prepare+execute when needed
  • pageSize (int, min 100 enforced by client)
  • pagingState (string)
  • serialConsistency (SerialConsistency::SERIAL or SerialConsistency::LOCAL_SERIAL)
  • defaultTimestamp (microseconds since epoch)
    • Requires 64-bit PHP; unsupported on 32-bit
  • namesForValues (bool): true to use associative binds; if not explicitly set, it is auto-detected for queries and executes
  • keyspace (string; protocol v5 only)
  • nowInSeconds (int; protocol v5 only)

Notes:

  • If you supply non-Value\* PHP values with QueryOptions(autoPrepare: true), the driver auto-prepares + executes for correct typing.
  • Always use fully-qualified table names (including keyspace) for PREPARE statements to avoid ambiguity, e.g. SELECT ... FROM ks.users WHERE ....

Fetch all pages helpers:

// For simple queries
$pages = $conn->queryAll('SELECT * FROM ks.users WHERE org_id = ?', [$orgId]);
foreach ($pages as $page) {
    foreach ($page as $row) {
        // ...
    }
}

Prepared statements

use Cassandra\Request\Options\ExecuteOptions;

$prepared = $conn->prepare('SELECT * FROM ks.users WHERE email = :email');

$rowsResult = $conn->execute(
    $prepared,
    ['email' => 'jane@example.com'],
    options: new ExecuteOptions(
        namesForValues: true,
        pageSize: 50
    )
)->asRowsResult();

Pagination with prepared statements:

use Cassandra\Request\Options\ExecuteOptions;

$options = new ExecuteOptions(pageSize: 100, namesForValues: true);
$result = $conn->execute($prepared, ['org_id' => 1], options: $options)->asRowsResult();

do {
    foreach ($result as $row) {
        // process row
    }

    $pagingState = $result->getRowsMetadata()->pagingState;
    if ($pagingState === null) break;

    $options = new ExecuteOptions(
        pageSize: 100,
        namesForValues: true,
        pagingState: $pagingState
    );
    $result = $conn->execute($result, [], options: $options)->asRowsResult(); // reuse previous RowsResult for metadata id
} while (true);

Execute all pages helper:

use Cassandra\Request\Options\ExecuteOptions;

$pages = $conn->executeAll($prepared, ['org_id' => 1], options: new ExecuteOptions(namesForValues: true));

Additional notes:

  • For PREPARE and EXECUTE, namesForValues is auto-detected if not set explicitly based on the array keys (associative vs indexed).
  • Always use fully-qualified table names in prepared statements.

Batches

use Cassandra\Consistency;
use Cassandra\Request\Batch;
use Cassandra\Request\BatchType;
use Cassandra\Value\Uuid;
use Cassandra\Value\Varchar;

$batch = new Batch(type: BatchType::LOGGED, consistency: Consistency::QUORUM);

// Prepared in batch (namesForValues: use associative array)
$prepared = $conn->prepare('UPDATE ks.users SET age = :age WHERE id = :id');
$batch->appendPreparedStatement($prepared, ['age' => 21, 'id' => 'c5419d81-499e-4c9c-ac0c-fa6ba3ebc2bc']);

// Simple query in batch (positional)
$batch->appendQuery(
    'INSERT INTO ks.users (id, name, age) VALUES (?, ?, ?)',
    [
        Uuid::fromValue('c5420d81-499e-4c9c-ac0c-fa6ba3ebc2bc'),
        Varchar::fromValue('Mark'),
        20,
    ]
);

$conn->batch($batch);

Batch notes:

  • BATCH does not support names for values at the protocol level for simple queries; use positional values for appendQuery. For prepared entries, provide values consistent with the prepared statement (associative for named markers).
  • BatchOptions: serialConsistency, defaultTimestamp (64-bit PHP only), keyspace (v5), nowInSeconds (v5).

Results and fetching

query()/execute() return a Result; call asRowsResult() for row-returning queries. Supported RowsResult methods:

  • fetch(FetchType::ASSOC|NUM|BOTH) returns next row or false
  • fetchAll(FetchType) returns all remaining rows
  • fetchColumn(int $index) / fetchAllColumns(int $index)
  • fetchKeyPair(int $keyIndex, int $valueIndex) / fetchAllKeyPairs(...)
  • getIterator() returns a ResultIterator so you can foreach ($rowsResult as $row)

Example:

use Cassandra\Response\Result\FetchType;

$r = $conn->query('SELECT role FROM system_auth.roles')->asRowsResult();
foreach ($r as $i => $row) {
    echo $row['role'], "\n";
}

Advanced fetching examples:

// Fetch single row
$result = $conn->query('SELECT id, name, email FROM users WHERE id = ?', [$userId])->asRowsResult();
$user = $result->fetch(FetchType::ASSOC);
if ($user) {
    echo "User: {$user['name']} <{$user['email']}>\n";
}

// Fetch all rows at once
$allUsers = $result->fetchAll(FetchType::ASSOC);
foreach ($allUsers as $user) {
    echo "User: {$user['name']}\n";
}

// Fetch specific column values
$result = $conn->query('SELECT name FROM users WHERE org_id = ?', [123])->asRowsResult();
$names = $result->fetchAllColumns(0); // Get all values from first column
print_r($names);

// Fetch key-value pairs
$result = $conn->query('SELECT id, name FROM users WHERE active = true')->asRowsResult();
$userMap = $result->fetchAllKeyPairs(0, 1); // id => name mapping
print_r($userMap);

// Different fetch types
$result = $conn->query('SELECT id, name, email FROM users LIMIT 5')->asRowsResult();

// Associative array (default)
$row = $result->fetch(FetchType::ASSOC);
// Returns: ['id' => '...', 'name' => '...', 'email' => '...']

// Numeric array
$row = $result->fetch(FetchType::NUM);
// Returns: [0 => '...', 1 => '...', 2 => '...']

// Both associative and numeric
$row = $result->fetch(FetchType::BOTH);
// Returns: ['id' => '...', 0 => '...', 'name' => '...', 1 => '...', ...]

Pagination example:

use Cassandra\Request\Options\QueryOptions;

$pageSize = 100;
$options = new QueryOptions(pageSize: $pageSize);
$result = $conn->query('SELECT * FROM large_table', [], options: $options)->asRowsResult();

$totalProcessed = 0;
do {
    foreach ($result as $row) {
        // Process each row
        echo "Processing: {$row['id']}\n";
        $totalProcessed++;
    }
    
    $pagingState = $result->getRowsMetadata()->pagingState;
    if ($pagingState === null) {
        break; // No more pages
    }
    
    // Fetch next page
    $options = new QueryOptions(pageSize: $pageSize, pagingState: $pagingState);
    $result = $conn->query('SELECT * FROM large_table', [], options: $options)->asRowsResult();
    
} while (true);

echo "Total processed: {$totalProcessed} rows\n";

Object mapping

You can fetch rows into objects by implementing RowClassInterface or by using the default RowClass:

use Cassandra\Response\Result\RowClassInterface;

final class UserRow implements RowClassInterface {
    public function __construct(private array $row, array $args = []) {}
    public function id(): string { return (string) $this->row['id']; }
    public function name(): string { return (string) $this->row['name']; }
}

$rows = $conn->query('SELECT id, name FROM ks.users')->asRowsResult();
$rows->configureFetchObject(UserRow::class);

foreach ($rows as $user) {
    echo $user->name(), "\n";
}

Data types

All native Cassandra types are supported via classes in Cassandra\Value\*. You may pass either:

  • A concrete Value\... instance, or
  • A PHP scalar/array matching the type; the driver will convert it when metadata is available

Examples:

use Cassandra\Value\Ascii;
use Cassandra\Value\Bigint;
use Cassandra\Value\Blob;
use Cassandra\Value\Boolean;
use Cassandra\Value\Counter;
use Cassandra\Value\Custom;
use Cassandra\Value\Date;
use Cassandra\Value\Decimal;
use Cassandra\Value\Double;
use Cassandra\Value\Duration;
use Cassandra\Value\Float32;
use Cassandra\Value\Inet;
use Cassandra\Value\Int32;
use Cassandra\Value\ListCollection;
use Cassandra\Value\MapCollection;
use Cassandra\Value\SetCollection;
use Cassandra\Value\Smallint;
use Cassandra\Value\Time;
use Cassandra\Value\Timestamp;
use Cassandra\Value\Timeuuid;
use Cassandra\Value\Tinyint;
use Cassandra\Value\Tuple;
use Cassandra\Value\UDT;
use Cassandra\Value\Uuid;
use Cassandra\Value\Varchar;
use Cassandra\Value\Varint;
use Cassandra\Value\Vector;
use Cassandra\Type;

// Scalars
Ascii::fromValue('hello');
Bigint::fromValue(10_000_000_000);
Blob::fromValue("\x01\x02");
Boolean::fromValue(true);
Counter::fromValue(1000);
Custom::fromValue('custom_data', 'my.custom.Type');
Decimal::fromValue('123.456');
Double::fromValue(2.718281828459);
Float32::fromValue(2.718);
Inet::fromValue('192.168.0.1');
Int32::fromValue(-123);
Smallint::fromValue(2048);
Timeuuid::fromValue('8db96410-8dba-11f0-b0eb-325096b39f47');
Tinyint::fromValue(12);
Uuid::fromValue('78b58041-06dd-4181-a14f-ce0c1979f51c');
Varchar::fromValue('hello ✅');
Varint::fromValue(10000000000);

// Temporal
Date::fromValue('2011-02-03');
Duration::fromValue('89h4m48s');
Time::fromValue('08:12:54.123456789');
Timestamp::fromValue('2011-02-03T04:05:00.000+0000');

// Collections / Tuples / UDT / Vector
ListCollection::fromValue([1, 2, 3], Type::INT);
MapCollection::fromValue(['a' => 1], Type::ASCII, Type::INT);
SetCollection::fromValue([1, 2, 3], Type::INT);
Tuple::fromValue([1, 'x'], [Type::INT, Type::VARCHAR]);
UDT::fromValue(['id' => 1, 'name' => 'n'], ['id' => Type::INT, 'name' => Type::VARCHAR]);
Vector::fromValue([0.12, -0.3, 0.9], Type::FLOAT, dimensions: 3);

Type definition syntax for complex values

For complex types, the driver needs a type definition to encode PHP values. Wherever you see a parameter like \Cassandra\Type|(array{ type: \Cassandra\Type }&array<mixed>), you can either pass a scalar Type::... (for simple elements) or a definition array with nested types for complex structures. The common shapes are:

  • List: ['type' => Type::LIST, 'valueType' => <elementType>, 'isFrozen' => bool]
  • Set: ['type' => Type::SET, 'valueType' => <elementType>, 'isFrozen' => bool]
  • Map: ['type' => Type::MAP, 'keyType' => <keyType>, 'valueType' => <valueType>, 'isFrozen' => bool]
  • Tuple: ['type' => Type::TUPLE, 'valueTypes' => [<t1>, <t2>, ...]]
  • UDT: ['type' => Type::UDT, 'valueTypes' => ['field' => <type>, ...], 'isFrozen' => bool, 'keyspace' => 'ks', 'name' => 'udt_name']
  • Vector: ['type' => Type::VECTOR, 'valueType' => <elementType>, 'dimensions' => int]

Examples

use Cassandra\Type;
use Cassandra\Value\ListCollection;
use Cassandra\Value\SetCollection;
use Cassandra\Value\MapCollection;
use Cassandra\Value\Tuple;
use Cassandra\Value\UDT;

// List<int>
ListCollection::fromValue([1,2,3], Type::INT);

// Set<text>
SetCollection::fromValue(['a','b'], Type::VARCHAR);

// Map<text,int>
MapCollection::fromValue(['a' => 1], Type::ASCII, Type::INT);

// Tuple<int,text,boolean>
Tuple::fromValue([1, 'x', true], [Type::INT, Type::VARCHAR, Type::BOOLEAN]);

// UDT<id:int, name:text>
UDT::fromValue(['id' => 1, 'name' => 'n'], ['id' => Type::INT, 'name' => Type::VARCHAR]);

// Frozen list<udt<id:int, friends<list<text>>>>
ListCollection::fromValue(
    [
        ['id' => 1, 'friends' => ['a','b']],
        ['id' => 2, 'friends' => []],
    ],
    [
        'type' => Type::LIST,
        'valueType' => [
            'type' => Type::UDT,
            'isFrozen' => true,
            'valueTypes' => [
                'id' => Type::INT,
                'friends' => [
                    'type' => Type::LIST,
                    'valueType' => Type::VARCHAR,
                ],
            ],
        ],
        'isFrozen' => true,
    ]
);

// Map<text, tuple<int, udt<code:int, tags<set<text>>>>>
MapCollection::fromValue(
    [
        'a' => [1, ['code' => 7, 'tags' => ['x','y']]],
    ],
    [
        'type' => Type::MAP,
        'keyType' => Type::VARCHAR,
        'valueType' => [
            'type' => Type::TUPLE,
            'valueTypes' => [
                Type::INT,
                [
                    'type' => Type::UDT,
                    'valueTypes' => [
                        'code' => Type::INT,
                        'tags' => [
                            'type' => Type::SET,
                            'valueType' => Type::VARCHAR,
                        ],
                    ],
                ],
            ],
        ],
    ]
);

// UDT with nested list<map<text, tuple<int,text>>>
UDT::fromValue(
    [
        'id' => 1,
        'items' => [
            ['a' => [1, 'one']],
            ['b' => [2, 'two']],
        ],
    ],
    [
        'id' => Type::INT,
        'items' => [
            'type' => Type::LIST,
            'valueType' => [
                'type' => Type::MAP,
                'keyType' => Type::VARCHAR,
                'valueType' => [
                    'type' => Type::TUPLE,
                    'valueTypes' => [Type::INT, Type::VARCHAR],
                ],
            ],
        ],
    ]
);

// Vector<float> with 3 dimensions
Vector::fromValue([0.1, -0.5, 0.8], Type::FLOAT, dimensions: 3);

// Vector<double> with 128 dimensions (common for embeddings)
Vector::fromValue(array_fill(0, 128, 0.0), Type::DOUBLE, dimensions: 128);

Nested complex example (Set inside a row):

use Cassandra\Value\SetCollection;
use Cassandra\Type;

SetCollection::fromValue([
    [
        'id' => 1,
        'name' => 'string',
        'active' => true,
        'friends' => ['a', 'b'],
        'drinks' => [['qty' => 5, 'brand' => 'Pepsi']],
    ],
], [
    [
        'type' => Type::UDT,
        'valueTypes' => [
            'id' => Type::INT,
            'name' => Type::VARCHAR,
            'active' => Type::BOOLEAN,
            'friends' => [
                'type' => Type::LIST, 
                'valueType' => Type::VARCHAR
            ],
            'drinks' => [
                'type' => Type::LIST, 
                'valueType' => [
                    'type' => Type::UDT,
                    'valueTypes' => [
                        'qty' => Type::INT,
                        'brand' => Type::VARCHAR
                    ],
                ]
            ],
        ],
    ],
]);

Special values:

  • new \Cassandra\Value\NotSet() encodes a bind variable as NOT SET (distinct from NULL)

Events

Register a listener and subscribe for events on the connection:

use Cassandra\EventListener;
use Cassandra\Response\Event;
use Cassandra\Request\Register;
use Cassandra\EventType;

$conn->registerEventListener(new class () implements EventListener {
    public function onEvent(Event $event): void {
        // Inspect $event->getType() and $event->getData()
    }
});

$conn->syncRequest(new Register([
    EventType::TOPOLOGY_CHANGE,
    EventType::STATUS_CHANGE,
    EventType::SCHEMA_CHANGE,
]));

// process events (simplest possible loop)
while (true) {
    $conn->waitForNextEvent();
    sleep(1);
}

Non-blocking event polling:

// In your app loop, poll without blocking
if ($event = $conn->tryReadNextEvent()) {
    // handle $event
}

// Or drain all currently available events
while ($event = $conn->tryReadNextEvent()) {
    // handle $event
}

Tracing and custom payloads (advanced)

You can enable tracing and set a custom payload on any request:

use Cassandra\Request\Query;

$req = new Query('SELECT now() FROM system.local');
$req->enableTracing();
$req->setPayload(['my-key' => 'my-value']);

$result = $conn->syncRequest($req);

Asynchronous API

The async API lets you pipeline multiple requests without blocking. Each async method returns a Cassandra\Statement handle that you can resolve later.

You now have both blocking and non-blocking control:

  • Blocking per statement: getResult() / getRowsResult() / waitForResponse()
  • Blocking for sets: waitForStatements(array $statements) and waitForAllPendingStatements()
  • Non-blocking/polling:
    • drainAvailableResponses(int $max = PHP_INT_MAX): int — processes up to max responses if available
    • tryResolveStatement(Statement $statement): bool — resolves a specific statement if possible
    • tryResolveStatements(array $statements, int $max = PHP_INT_MAX): int — resolves from a set without blocking
    • waitForAnyStatement(array $statements): Statement — blocks until any of the given statements completes

Basics:

use Cassandra\Request\Options\QueryOptions;
use Cassandra\Request\Options\ExecuteOptions;
use Cassandra\Consistency;

// Fire two queries concurrently
$s1 = $conn->queryAsync('SELECT count(*) FROM ks.t1', options: new QueryOptions(pageSize: 1000));
$s2 = $conn->queryAsync('SELECT count(*) FROM ks.t2', options: new QueryOptions(pageSize: 1000));

// Do other work here...

// Resolve in any order
$r2 = $s2->getRowsResult();
$r1 = $s1->getRowsResult();

Waiting for all responses:

// Issue several statements
$handles = [];
for ($i = 0; $i < 10; $i++) {
    $handles[] = $conn->queryAsync('SELECT now() FROM system.local');
}

$conn->waitForStatements($handles);

foreach ($handles as $h) {
    $rows = $h->getRowsResult();
    // process
}

Non-blocking draining and polling:

// Fire off work in various places...

// Later in your loop: non-blocking drain up to 32 available responses
$processed = $conn->drainAvailableResponses(32);
if ($processed > 0) {
    // some statements just became ready; you can consume their results now
}

// Or: non-blocking check for a specific statement
if ($conn->tryResolveStatement($s1)) {
    $rows = $s1->getRowsResult();
}

// Or: wait until any of several statements completes
$ready = $conn->waitForAnyStatement([$s1, $s2]);
// $ready is whichever completed first

Prepared + async:

use Cassandra\Request\Options\PrepareOptions;
use Cassandra\Request\Options\ExecuteOptions;

// Prepare asynchronously
$pStmt = $conn->prepareAsync('SELECT id, name FROM ks.users WHERE org_id = ?');
$prepared = $pStmt->getPreparedResult();

// Execute asynchronously with paging
$s = $conn->executeAsync(
    $prepared,
    [123],
    consistency: Consistency::LOCAL_QUORUM,
    options: new ExecuteOptions(pageSize: 200)
);

// Block for rows when you need them
$rows = $s->getRowsResult();

Advanced waiting:

// Block until any statement completes:
$stmt = $conn->waitForAnyStatement([$s1, $s2, $s3]);

// Block until the next event arrives:
$event = $conn->waitForNextEvent();

Compression

Enable LZ4 compression (if supported by the server) via ConnectionOptions:

use Cassandra\Connection;
use Cassandra\Connection\ConnectionOptions;

$conn = new Connection(
    $nodes,
    keyspace: 'app',
    options: new ConnectionOptions(enableCompression: true)
);

Notes:

  • Compression is negotiated during STARTUP. When enabled, the client accepts server-compressed frames and transparently decompresses them.
  • The client may still send some frames uncompressed depending on size/heuristics; this is allowed by the protocol.

Error handling

php-cassandra provides comprehensive error handling with a well-structured exception hierarchy. Understanding these exceptions helps you build robust applications with proper error recovery.

Exception Hierarchy

\Exception
└── \Cassandra\Exception\CassandraException (base exception)
    ├── \Cassandra\Exception\CompressionException (compression errors)
    ├── \Cassandra\Exception\ConnectionException (connection/transport errors)
    ├── \Cassandra\Exception\NodeException (node I/O errors)
    │   ├── \Cassandra\Exception\SocketException
    │   └── \Cassandra\Exception\StreamException
    ├── \Cassandra\Exception\RequestException (request errors)
    ├── \Cassandra\Exception\ResponseException (response errors)
    ├── \Cassandra\Exception\ServerException (server-side errors)
    │   ├── \Cassandra\Exception\ServerException\AlreadyExistsException
    │   ├── \Cassandra\Exception\ServerException\AuthenticationErrorException
    │   ├── \Cassandra\Exception\ServerException\CasWriteUnknownException
    │   ├── \Cassandra\Exception\ServerException\CdcWriteFailureException
    │   ├── \Cassandra\Exception\ServerException\ConfigErrorException
    │   ├── \Cassandra\Exception\ServerException\FunctionFailureException
    │   ├── \Cassandra\Exception\ServerException\InvalidException
    │   ├── \Cassandra\Exception\ServerException\IsBootstrappingException
    │   ├── \Cassandra\Exception\ServerException\OverloadedException
    │   ├── \Cassandra\Exception\ServerException\ProtocolErrorException
    │   ├── \Cassandra\Exception\ServerException\ReadFailureException
    │   ├── \Cassandra\Exception\ServerException\ReadTimeoutException
    │   ├── \Cassandra\Exception\ServerException\ServerErrorException
    │   ├── \Cassandra\Exception\ServerException\SyntaxErrorException
    │   ├── \Cassandra\Exception\ServerException\TruncateErrorException
    │   ├── \Cassandra\Exception\ServerException\UnauthorizedException
    │   ├── \Cassandra\Exception\ServerException\UnavailableException
    │   ├── \Cassandra\Exception\ServerException\UnpreparedException
    │   ├── \Cassandra\Exception\ServerException\WriteFailureException
    │   ├── \Cassandra\Exception\ServerException\WriteTimeoutException
    ├── \Cassandra\Exception\StatementException (result type mismatches)
    ├── \Cassandra\Exception\StringMathException (string math backends)
    ├── \Cassandra\Exception\TypeInfoException (type info building errors)
    ├── \Cassandra\Exception\TypeNameParserException (type parsing errors)
    ├── \Cassandra\Exception\ValueException (invalid value inputs)
    ├── \Cassandra\Exception\ValueFactoryException (value factory/type def errors)
    └── \Cassandra\Exception\VIntCodecException (variable integer codec errors)

Error Handling Patterns

Basic Error Handling

use Cassandra\Exception\StatementException;
use Cassandra\Exception\ServerException;
use Cassandra\Exception\ConnectionException;
use Cassandra\Exception\CassandraException;

try {
    $result = $conn->query('SELECT * FROM users WHERE id = ?', [$userId])
        ->asRowsResult();
    
    foreach ($result as $row) {
        // Process row
    }
    
} catch (ServerException $e) {
    // Server returned an error response
    error_log("Server error: " . $e->getMessage());
    
} catch (ConnectionException $e) {
    // Network/connection issues
    error_log("Connection error: " . $e->getMessage());
    
} catch (StatementException $e) {
    // Wrong result type access (e.g., calling asRowsResult() on non-rows result)
    error_log("Statement error: " . $e->getMessage());
    
} catch (CassandraException $e) {
    // Other client-side errors
    error_log("Client error: " . $e->getMessage());
}

Specific Server Error Handling

use Cassandra\Exception\ServerException\{
    UnavailableException,
    ReadTimeoutException,
    WriteTimeoutException,
    OverloadedException,
    SyntaxErrorException,
    InvalidException,
    UnauthorizedException,
    AlreadyExistsException
};

try {
    $conn->query('CREATE TABLE users (id UUID PRIMARY KEY, name TEXT)');
    
} catch (AlreadyExistsException $e) {
    // Table already exists - this might be OK
    echo "Table already exists, continuing...\n";
    
} catch (UnauthorizedException $e) {
    // Permission denied
    throw new \RuntimeException("Insufficient permissions: " . $e->getMessage());
    
} catch (SyntaxErrorException $e) {
    // CQL syntax error
    throw new \RuntimeException("Invalid CQL syntax: " . $e->getMessage());
    
} catch (InvalidException $e) {
    // Invalid query (e.g., wrong types)
    throw new \RuntimeException("Invalid query: " . $e->getMessage());
}

Retry Logic with Exponential Backoff

function executeWithRetry(callable $operation, int $maxRetries = 3): mixed
{
    $attempt = 0;
    $delay = 100; // Start with 100ms
    
    while ($attempt < $maxRetries) {
        try {
            return $operation();
            
        } catch (UnavailableException | ReadTimeoutException | WriteTimeoutException | OverloadedException $e) {
            $attempt++;
            
            if ($attempt >= $maxRetries) {
                throw $e; // Re-throw on final attempt
            }
            
            // Exponential backoff with jitter
            $jitter = rand(0, $delay / 2);
            usleep(($delay + $jitter) * 1000);
            $delay *= 2;
            
            error_log("Retrying operation (attempt {$attempt}/{$maxRetries}) after error: " . $e->getMessage());
            
        } catch (ServerException $e) {
            // Don't retry non-transient errors
            throw $e;
        }
    }
}

// Usage
$result = executeWithRetry(function() use ($conn, $userId) {
    return $conn->query('SELECT * FROM users WHERE id = ?', [$userId])
        ->asRowsResult();
});

Timeout Handling

use Cassandra\Exception\ServerException\{ReadTimeoutException, WriteTimeoutException};

try {
    $result = $conn->query(
        'SELECT * FROM large_table WHERE complex_condition = ?',
        [$condition],
        Consistency::QUORUM
    )->asRowsResult();
    
} catch (ReadTimeoutException $e) {
    $ctx = $e->getErrorContext();
    $consistency = $ctx->consistency;
    $received = $ctx->nodesAnswered;
    $required = $ctx->nodesRequired;
    $dataPresent = $ctx->dataPresent;
    
    error_log("Read timeout: got {$received}/{$required} responses at {$consistency->name}, data_present: " . 
              ($dataPresent ? 'yes' : 'no'));
    
    return $conn->query(
        'SELECT * FROM large_table WHERE complex_condition = ?',
        [$condition],
        Consistency::ONE
    )->asRowsResult();
    
} catch (WriteTimeoutException $e) {
    $ctx = $e->getErrorContext();
    $consistency = $ctx->consistency;
    $received = $ctx->nodesAcknowledged;
    $required = $ctx->nodesRequired;
    $writeType = $ctx->writeType->value;
    
    error_log("Write timeout: got {$received}/{$required} responses at {$consistency->name}, write_type: {$writeType}");
    
    if ($writeType === 'BATCH_LOG') {
        error_log("Batch log write timeout - operation may have succeeded");
    }
}

Error Information Access

Most server exceptions provide additional context:

use Cassandra\Exception\ServerException\{UnavailableException, ReadTimeoutException, WriteTimeoutException};

try {
    $conn->query('SELECT * FROM users');
    
} catch (UnavailableException $e) {
    $ctx = $e->getErrorContext();
    echo "Consistency: " . $ctx->consistency->name . "\n";
    echo "Required: " . $ctx->nodesRequired . "\n";
    echo "Alive: " . $ctx->nodesAlive . "\n";
    
} catch (ReadTimeoutException $e) {
    $ctx = $e->getErrorContext();
    echo "Consistency: " . $ctx->consistency->name . "\n";
    echo "Received: " . $ctx->nodesAnswered . "\n";
    echo "Required: " . $ctx->nodesRequired . "\n";
    echo "Data present: " . ($ctx->dataPresent ? 'yes' : 'no') . "\n";
    
} catch (WriteTimeoutException $e) {
    $ctx = $e->getErrorContext();
    echo "Write type: " . $ctx->writeType->value . "\n";
    echo "Consistency: " . $ctx->consistency->name . "\n";
    echo "Received: " . $ctx->nodesAcknowledged . "\n";
    echo "Required: " . $ctx->nodesRequired . "\n";
}

Configuration Reference

Connection Configuration

Node Configuration

StreamNodeConfig (supports SSL/TLS, persistent connections)

use Cassandra\Connection\StreamNodeConfig;

$node = new StreamNodeConfig(
    host: 'cassandra.example.com',        // Can include protocol (tls://)
    port: 9042,                           // Port number
    username: 'user',                     // Username (optional)
    password: 'secret',                   // Password (optional)
    connectTimeoutInSeconds: 10,          // Connection timeout (default: 5)
    timeoutInSeconds: 30,                 // I/O timeout (default: 30)
    persistent: true,                     // Use persistent connections
    sslOptions: [                         // SSL/TLS options (see PHP SSL context)
        'verify_peer' => true,            // Verify peer certificate
        'verify_peer_name' => true,       // Verify peer name
        'cafile' => '/path/to/ca.pem',    // CA certificate file
        'local_cert' => '/path/to/cert.pem', // Client certificate
        'local_pk' => '/path/to/key.pem', // Client private key
        'passphrase' => 'cert_password',  // Private key passphrase
        'ciphers' => 'HIGH:!aNULL',       // Cipher list
        'crypto_method' => STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT,
    ]
);

SocketNodeConfig (requires ext-sockets)

use Cassandra\Connection\SocketNodeConfig;

$node = new SocketNodeConfig(
    host: '127.0.0.1',                    // Cassandra host
    port: 9042,                           // Cassandra port (default: 9042)
    username: 'cassandra',                // Username (optional)
    password: 'cassandra',                // Password (optional)
    socketOptions: [                      // Socket-specific options
        SO_RCVTIMEO => ['sec' => 10, 'usec' => 0],  // Receive timeout
        SO_SNDTIMEO => ['sec' => 10, 'usec' => 0],  // Send timeout
        SO_KEEPALIVE => 1,                // Keep-alive
    ]
);

Connection Options

use Cassandra\Connection\ConnectionOptions;
use Cassandra\Connection\NodeSelectionStrategy;

$options = new ConnectionOptions(
    enableCompression: true,              // Enable LZ4 compression (default: false)
    throwOnOverload: true,                // Throw on server overload (v4+, default: false)
    nodeSelectionStrategy: NodeSelectionStrategy::RoundRobin, // Node selection (default: Random)
    preparedResultCacheSize: 200,         // Prepared statement cache size (default: 100)
);

Request Options

Query Options

use Cassandra\Request\Options\QueryOptions;
use Cassandra\SerialConsistency;

$queryOptions = new QueryOptions(
    autoPrepare: true,                    // Auto-prepare for type safety (default: true)
    pageSize: 1000,                       // Page size (min 100, default: 5000)
    pagingState: $previousPagingState,    // For pagination (default: null)
    serialConsistency: SerialConsistency::SERIAL, // Serial consistency (default: null)
    defaultTimestamp: 1640995200000000,   // Default timestamp (microseconds, default: null)
    namesForValues: true,                 // Use named parameters (auto-detected if null)
    keyspace: 'my_keyspace',              // Per-request keyspace (v5 only, default: null)
    nowInSeconds: time(),                 // Current time override (v5 only, default: null)
);

Execute Options

use Cassandra\Request\Options\ExecuteOptions;

$executeOptions = new ExecuteOptions(
    // All QueryOptions properties plus:
    skipMetadata: true,                   // Skip result metadata (default: false)
    autoPrepare: false,                   // Not applicable for execute
    pageSize: 500,
    namesForValues: true,
    // ... other QueryOptions
);

Prepare Options

use Cassandra\Request\Options\PrepareOptions;

$prepareOptions = new PrepareOptions(
    keyspace: 'my_keyspace',              // Keyspace for preparation (v5 only)
);

Batch Options

use Cassandra\Request\Options\BatchOptions;
use Cassandra\SerialConsistency;

$batchOptions = new BatchOptions(
    serialConsistency: SerialConsistency::LOCAL_SERIAL,
    defaultTimestamp: 1640995200000000,   // Microseconds since epoch
    keyspace: 'my_keyspace',              // v5 only
    nowInSeconds: time(),                 // v5 only
);

Advanced Configuration

Value Encoding Configuration

use Cassandra\Value\ValueEncodeConfig;
use Cassandra\Value\EncodeOption\DateEncodeOption;
use Cassandra\Value\EncodeOption\DurationEncodeOption;
use Cassandra\Value\EncodeOption\TimeEncodeOption;
use Cassandra\Value\EncodeOption\TimestampEncodeOption;
use Cassandra\Value\EncodeOption\VarintEncodeOption;

$conn->configureValueEncoding(new ValueEncodeConfig(
    dateEncodeOption: DateEncodeOption::AS_DATETIME_IMMUTABLE,
    durationEncodeOption: DurationEncodeOption::AS_DATEINTERVAL,
    timeEncodeOption: TimeEncodeOption::AS_DATETIME_IMMUTABLE,
    timestampEncodeOption: TimestampEncodeOption::AS_DATETIME_IMMUTABLE,
    varintEncodeOption: VarintEncodeOption::AS_STRING,
));

Event Listeners

use Cassandra\EventListener;
use Cassandra\WarningsListener;

// Event listener
$conn->registerEventListener(new class implements EventListener {
    public function onEvent(\Cassandra\Response\Event $event): void {
        error_log("Cassandra event: " . $event->getType());
    }
});

// Warnings listener
$conn->registerWarningsListener(new class implements WarningsListener {
    public function onWarnings(array $warnings, $request, $response): void {
        foreach ($warnings as $warning) {
            error_log("Cassandra warning: $warning");
        }
    }
});

Notes

  • pageSize is clamped to a minimum of 100 by the client for efficiency.
  • If you supply non-Value\* PHP values with QueryOptions(autoPrepare: true), the driver auto-prepares + executes for correct typing.
  • On UNPREPARED server errors, the driver transparently re-prepares and retries the execution.
  • Always use fully-qualified table names in PREPARE statements.

Frequently Asked Questions (FAQ)

General Questions

Q: What's the difference between this library and the DataStax PHP Driver?

A: The main differences are:

  • Pure PHP: No C extensions required, easier deployment
  • Protocol v5 Support: Full support for latest Cassandra protocol features
  • Active Development: Actively maintained with regular updates
  • Modern PHP: Built for PHP 8.1+ with modern language features

Q: Can I use this with older versions of Cassandra?

A: Yes! The library supports protocol versions v3, v4, and v5:

  • Cassandra 2.1+: Protocol v3
  • Cassandra 2.2+: Protocol v4 (recommended)
  • Cassandra 4.0+: Protocol v5 (recommended for new deployments)

Installation and Setup

Q: Do I need any PHP extensions?

A: The library works with standard PHP, but some extensions enhance functionality:

  • ext-sockets: Required for SocketNodeConfig (alternative: StreamNodeConfig)
  • ext-bcmath or ext-gmp: Improves performance for large integer operations (Varint, Decimal)
  • ext-openssl: For SSL/TLS connections

Q: Can I run this on 32-bit PHP?

A: Limited support. The following features are unsupported on 32-bit PHP: value types Bigint, Counter, Date, Duration, Time, Timestamp, and the defaultTimestamp request option. Use 64-bit PHP for full compatibility.

Data Types and Modeling

Q: How do I handle complex data structures?

A: Use collections and UDTs:

// Map
$profile = MapCollection::fromValue(['role' => 'admin', 'level' => 'senior'], Type::VARCHAR, Type::VARCHAR);

// List
$tags = ListCollection::fromValue(['php', 'cassandra', 'database'], Type::VARCHAR);

// UDT
$address = UDT::fromValue(
    ['street' => '123 Main St', 'city' => 'New York', 'zip' => '10001'],
    ['street' => Type::VARCHAR, 'city' => Type::VARCHAR, 'zip' => Type::VARCHAR]
);

Q: How do I work with timestamps?

A: Use the Timestamp value class:

use Cassandra\Value\Timestamp;

// Current time
$now = Timestamp::now();

// From string
$timestamp = Timestamp::fromValue('2024-01-15T10:30:00Z');

// From Unix timestamp
$timestamp = Timestamp::fromValue(1705312200000); // milliseconds

Migration Guide

From DataStax PHP Driver

If you're migrating from the DataStax PHP Driver, here are the key differences and migration steps:

Connection Setup

// DataStax Driver (old)
$cluster = Cassandra::cluster()
    ->withContactPoints('127.0.0.1')
    ->withPort(9042)
    ->withCredentials('username', 'password')
    ->build();
$session = $cluster->connect('keyspace_name');

// php-cassandra (new)
use Cassandra\Connection;
use Cassandra\Connection\StreamNodeConfig;

$conn = new Connection([
    new StreamNodeConfig('127.0.0.1', 9042, 'username', 'password')
], keyspace: 'keyspace_name');
$conn->connect();

Query Execution

// DataStax Driver (old)
$statement = new Cassandra\SimpleStatement('SELECT * FROM users WHERE id = ?');
$result = $session->execute($statement, ['arguments' => [$userId]]);

// php-cassandra (new)
$result = $conn->query('SELECT * FROM users WHERE id = ?', [$userId])->asRowsResult();

Prepared Statements

// DataStax Driver (old)
$statement = $session->prepare('SELECT * FROM users WHERE id = ?');
$result = $session->execute($statement, ['arguments' => [$userId]]);

// php-cassandra (new)
$prepared = $conn->prepare('SELECT * FROM users WHERE id = ?');
$result = $conn->execute($prepared, [$userId])->asRowsResult();

Data Types

// DataStax Driver (old)
$uuid = new Cassandra\Uuid('550e8400-e29b-41d4-a716-446655440000');
$timestamp = new Cassandra\Timestamp(time());

// php-cassandra (new)
use Cassandra\Value\Uuid;
use Cassandra\Value\Timestamp;

$uuid = Uuid::fromValue('550e8400-e29b-41d4-a716-446655440000');
$timestamp = Timestamp::fromValue(time() * 1000);

Async Operations

// DataStax Driver (old)
$future = $session->executeAsync($statement);
$result = $future->get();

// php-cassandra (new)
$statement = $conn->queryAsync('SELECT * FROM users');
$result = $statement->getRowsResult();

Migration Checklist

  • Update connection setup - Replace cluster builder with Connection and NodeConfig
  • Update query methods - Replace execute() with query() and asRowsResult()
  • Update data types - Replace Cassandra* types with Cassandra\Value* types
  • Update prepared statements - Use new prepare/execute pattern
  • Update async operations - Replace futures with statement handles
  • Update error handling - Use new exception hierarchy
  • Update batch operations - Use new Batch class
  • Test thoroughly - Verify all functionality works as expected

Connection tuning examples

use Cassandra\Connection;
use Cassandra\Connection\SocketNodeConfig;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection\ConnectionOptions;

// Stream with TLS and persistent
$stream = new StreamNodeConfig(
    host: 'tls://cassandra.example.com',
    port: 9042,
    username: 'user',
    password: 'secret',
    connectTimeoutInSeconds: 5,
    timeoutInSeconds: 15,
    persistent: true,
    sslOptions: [
        'cafile' => '/etc/ssl/certs/ca.pem',
        'verify_peer' => true,
        'verify_peer_name' => true,
    ]
);

// Socket with custom timeouts
$socket = new SocketNodeConfig(
    host: '127.0.0.1',
    port: 9042,
    username: 'user',
    password: 'secret',
    socketOptions: [
        SO_RCVTIMEO => ['sec' => 5, 'usec' => 0],
        SO_SNDTIMEO => ['sec' => 5, 'usec' => 0],
    ]
);

$conn = new Connection([$socket, $stream], options: new ConnectionOptions(enableCompression: true));

Configuring value encoding

use Cassandra\Connection;
use Cassandra\Value\ValueEncodeConfig;
use Cassandra\Value\EncodeOption\TimestampEncodeOption;
use Cassandra\Value\EncodeOption\DateEncodeOption;

$conn = new Connection([$socket]);
$conn->configureValueEncoding(new ValueEncodeConfig(
    timestampEncodeOption: TimestampEncodeOption::AS_INT,
    dateEncodeOption: DateEncodeOption::AS_INT,
));

Warnings listener

use Cassandra\WarningsListener;
use Cassandra\Request\Request;
use Cassandra\Response\Response;

$conn->registerWarningsListener(new class () implements WarningsListener {
    public function onWarnings(array $warnings, Request $request, Response $response): void {
        error_log('Cassandra warnings: ' . implode('; ', $warnings));
    }
});

Event processing patterns

use Cassandra\EventListener;
use Cassandra\Response\Event;

$conn->registerEventListener(new class () implements EventListener {
    public function onEvent(Event $event): void {
        // enqueue to worker, react to topology/status/schema changes
    }
});

// Non-busy loop with backoff
while (true) {
    $conn->waitForNextEvent();
    usleep(200_000); // 200ms
}

v5 keyspace per request

  • When the server negotiates protocol v5, you can set keyspace on QueryOptions, ExecuteOptions, and PrepareOptions.
  • If you also call setKeyspace(), the per-request option takes precedence for that request.

Tracing notes

  • Use tracing sparingly in production; it adds overhead.
  • Read the trace id from the result to correlate with server logs (if enabled).

Performance tips

  • Prefer prepared statements for hot paths; the driver caches prepared metadata.
  • Iterate results instead of materializing large arrays.

Version support

  • Cassandra 4.x/5.x tested. Protocols v3/v4/v5 supported; features like per-request keyspace/now_in_seconds require v5.

API reference (essentials)

  • Cassandra\Connection

    • connect(), disconnect(), isConnected(), getVersion()
    • setConsistency(Consistency), withConsistency(Consistency)
    • setKeyspace(string), withKeyspace(string), supportsKeyspaceRequestOption(), supportsNowInSecondsRequestOption()
    • query(string, array = [], ?Consistency, QueryOptions) / queryAsync(...) / queryAll(...)
    • prepare(string, PrepareOptions) / prepareAsync(...)
    • execute(Result $previous, array = [], ?Consistency, ExecuteOptions) / executeAsync(...) / executeAll(...)
    • batch(Batch) / batchAsync(Batch)
    • syncRequest(Request) / asyncRequest(Request) / waitForStatements(array $statements) / waitForAllPendingStatements() / waitForAnyStatement()
    • registerEventListener(EventListener) / unregisterEventListener(EventListener) / waitForNextEvent()
    • registerWarningsListener(WarningsListener) / unregisterWarningsListener(WarningsListener)
    • waitForNextResponse()
    • Non-blocking helpers: drainAvailableResponses(), tryResolveStatement(), tryResolveStatements(), tryReadNextResponse(), tryReadNextEvent()
  • Results

    • RowsResult (iterable): fetch(), fetchAll(), fetchColumn(), fetchAllColumns(), fetchKeyPair(), fetchAllKeyPairs(), configureFetchObject(), fetchObject(), fetchAllObjects(), getRowsMetadata(), hasMorePages()
    • PreparedResult (for execute)
    • SchemaChangeResult, SetKeyspaceResult, VoidResult
  • Types

    • Cassandra\Consistency (enum)
    • Cassandra\SerialConsistency (enum)
    • Cassandra\Type (enum) and Cassandra\Value\* classes (Ascii, Bigint, Blob, Boolean, Counter, Date, Decimal, Double, Duration, Float32, Inet, Int32, ListCollection, MapCollection, NotSet, SetCollection, Smallint, Time, Timestamp, Timeuuid, Tinyint, Tuple, UDT, Uuid, Varchar, Varint, Vector, ...)

Changelog

See CHANGELOG.md for release notes and upgrade considerations.

License

This library is released under the MIT License. See LICENSE for details.

Contributing

Contributions are welcome! Here's how to get started:

Development Setup

  1. Fork and Clone

    git clone https://github.com/MichaelRoosz/php-cassandra.git
    cd php-cassandra
  2. Install Dependencies

    composer install
  3. Start Development Environment

    docker compose up -d
    composer test:integration:init
  4. Run Tests

    composer test:unit
    composer test:integration:run

Docker quickstart for integration tests:

composer test:integration:up           # start Cassandra test container (ports 9142->9042)
composer test:integration:init         # wait until Cassandra is ready
composer test:integration:run          # run integration suite (socket + stream)
composer test:integration:down         # stop and clean up

Contribution Guidelines

Code Standards

  • PHP 8.1+: Use modern PHP features and syntax
  • PSR-12: Follow PHP-FIG coding standards
  • Type Hints: Use strict typing everywhere possible
  • Documentation: Document all public methods and classes
  • Tests: Include tests for all new functionality

Contributors

  • Michael Roosz - Current maintainer and lead developer
  • Shen Zhenyu - Original driver development
  • Evseev Nikolay - Foundation and early development

Special thanks to all contributors who have helped make this library better.

Supporting the Project

If you find this library useful, consider:

  • Starring the repository on GitHub
  • 🐛 Reporting bugs and suggesting features
  • 📝 Contributing code or documentation
  • 💬 Sharing your experience with the community
  • 📚 Writing tutorials or blog posts

Your support helps keep this project active and improving!