jardisadapter/messaging

Multi-transport messaging for Redis, Kafka, RabbitMQ, and Database with automatic serialization, consumer groups, and failover

Maintainers

Package info

github.com/jardisAdapter/messaging

pkg:composer/jardisadapter/messaging

Statistics

Installs: 98

Dependents: 1

Suggesters: 0

Stars: 0

Open Issues: 0

v1.0.0 2026-03-31 07:54 UTC

This package is auto-updated.

Last update: 2026-03-31 07:55:49 UTC


README

Build Status License: PolyForm Shield PHP Version PHPStan Level PSR-12

Part of the Jardis Business Platform — Enterprise-grade PHP components for Domain-Driven Design

Multi-transport messaging with a unified API for Redis, Kafka, RabbitMQ, Database, and InMemory. ConnectionFactory creates transport-specific connections; PublisherFactory and ConsumerFactory produce typed publishers and consumers from those connections. MessagePublisher and MessageConsumer are immutable facades that accept one or more transport instances via constructor injection and provide automatic serialization, priority-based failover, and graceful shutdown.

Features

  • 4 Transports + InMemory — Redis (Pub/Sub and Streams), Kafka, RabbitMQ, Database (PDO), InMemory (testing)
  • Unified Publish/Consume APIpublish(topic, message) and consume(topic, handler) across all transports
  • Immutable FacadesMessagePublisher and MessageConsumer via variadic constructor injection
  • Automatic Serialization — Arrays and objects encoded to JSON on publish, decoded transparently on consume
  • Consumer Groups — Redis Streams and Kafka for horizontal scaling
  • Priority Failover — Constructor order determines priority; first healthy transport wins
  • Lazy ConnectionMessagingService defers publisher and consumer creation until first use
  • Database Transport — PDO-based messaging with Point-to-Point and Fan-Out modes, no external broker required
  • External Connections — Wrap existing Redis, PDO, AMQP, or Kafka clients via ConnectionFactory::from*()
  • Message Validation — Payload validation before transmission via MessageValidator
  • Graceful Shutdown — SIGTERM/SIGINT handling enabled automatically on consume()

Installation

composer require jardisadapter/messaging

Optional extensions (install only what you need):

  • ext-redis — Redis Streams/Pub-Sub transport
  • ext-rdkafka — Apache Kafka transport
  • ext-amqp — RabbitMQ transport

PDO (Database transport) is always available via ext-pdo.

Quick Start

use JardisAdapter\Messaging\MessagePublisher;
use JardisAdapter\Messaging\MessageConsumer;
use JardisAdapter\Messaging\Factory\ConnectionFactory;
use JardisAdapter\Messaging\Factory\PublisherFactory;
use JardisAdapter\Messaging\Factory\ConsumerFactory;
use JardisAdapter\Messaging\Handler\CallbackHandler;

$connFactory = new ConnectionFactory();
$pubFactory  = new PublisherFactory();
$conFactory  = new ConsumerFactory();

// Create and share a Redis connection
$redisConn = $connFactory->redis('localhost', 6379);

// Publish
$publisher = new MessagePublisher($pubFactory->redis($redisConn));
$publisher->publish('orders', ['order_id' => 42, 'total' => 99.99]);

// Consume
$consumer = new MessageConsumer($conFactory->redis($redisConn));
$consumer->consume('orders', new CallbackHandler(function (string|array $message, array $metadata): bool {
    // $message = ['order_id' => 42, 'total' => 99.99]  (auto-deserialized)
    return true; // true = ACK, false = reject/requeue
}));

Priority Failover

Constructor order determines priority — first argument is tried first. On MessageException, the next transport is used automatically.

$primary = $connFactory->redis('redis-primary');
$secondary = $connFactory->redis('redis-secondary');

$publisher = new MessagePublisher(
    $pubFactory->redis($primary),     // tried first
    $pubFactory->redis($secondary),   // fallback if primary fails
);
$publisher->publish('orders', ['order_id' => 42]);

Transports

Redis

Supports both Pub/Sub (default) and Streams mode.

$redisConn = $connFactory->redis('localhost', 6379);

// Pub/Sub (default)
$publisher = new MessagePublisher($pubFactory->redis($redisConn));

// Streams
$publisher = new MessagePublisher($pubFactory->redis($redisConn, useStreams: true));

// Consumer groups (Streams only)
$consumer = new MessageConsumer($conFactory->redis($redisConn, useStreams: true));
$consumer->consume('orders', $handler, [
    'group'    => 'order-processors',  // auto-created if missing
    'consumer' => 'worker-1',
    'block'    => 5000,
    'count'    => 1,
]);

Kafka

Separate connection types for producer and consumer. Consumer group ID is configured on the connection.

// Producer
$kafkaConn = $connFactory->kafka('kafka:9092');
$publisher = new MessagePublisher($pubFactory->kafka($kafkaConn));
$publisher->publish('invoices', ['invoice_id' => 7], ['key' => 'partition-key']);

// Consumer (groupId is part of connection)
$kafkaConsumerConn = $connFactory->kafkaConsumer('kafka:9092', 'invoice-processor');
$consumer = new MessageConsumer($conFactory->kafka($kafkaConsumerConn));
$consumer->consume('invoices', $handler);

RabbitMQ

Queue-based messaging with automatic ACK/NACK handling.

$rabbitConn = $connFactory->rabbitMq('localhost', 5672, 'guest', 'guest');

$publisher = new MessagePublisher($pubFactory->rabbitMq($rabbitConn));
$publisher->publish('order.created', ['orderId' => 42]);

$consumer = new MessageConsumer($conFactory->rabbitMq($rabbitConn, 'order-queue'));
$consumer->consume('order.created', $handler);

Database (PDO)

No external broker required — uses the application's existing database. Supports MySQL, PostgreSQL, and SQLite.

use JardisAdapter\Messaging\Config\DatabaseTransportOptions;

$dbConn = $connFactory->database('mysql:host=localhost;dbname=app', 'user', 'pass');

$options = new DatabaseTransportOptions(
    table: 'domain_events',
    deleteAfterProcessing: false,  // soft delete (default)
    pollingIntervalMs: 1000,
    batchSize: 10,
    maxAttempts: 3,
);

$publisher = new MessagePublisher($pubFactory->database($dbConn, $options));
$consumer = new MessageConsumer($conFactory->database($dbConn, $options));

// Point-to-Point (default): one consumer per event
$consumer->consume('OrderCreated', $handler);

// Fan-Out: multiple consumer groups process the same event
$consumer->consume('InvoiceCreated', $handler, ['group' => 'email-service']);
$consumer->consume('InvoiceCreated', $handler, ['group' => 'pdf-service']);

Schema: src/Schema/domain_events.sql

InMemory (Testing)

Synchronous in-memory transport for unit and integration tests.

use JardisAdapter\Messaging\Transport\InMemoryTransport;

$transport = new InMemoryTransport();

$publisher = new MessagePublisher($pubFactory->inMemory($transport));
$consumer  = new MessageConsumer($conFactory->inMemory($transport));

$publisher->publish('test', ['id' => 1]);
$transport->getMessageCount('test');  // 1

$consumer->consume('test', $handler, ['limit' => 5]);

External Connections

Wrap existing connections from legacy systems, DI containers, or frameworks.

// Existing Redis instance
$redisConn = $connFactory->fromRedis($existingRedis, manageLifecycle: false);
$publisher = new MessagePublisher($pubFactory->redis($redisConn));

// Existing PDO instance
$dbConn = $connFactory->fromPdo($existingPdo, manageLifecycle: false);

// Existing AMQP connection
$rabbitConn = $connFactory->fromAmqp($amqpConnection, exchangeName: 'custom');

// Existing Kafka producer/consumer
$kafkaProducerConn = $connFactory->fromKafkaProducer($producer, flushOnDisconnect: true);
$kafkaConsumerConn = $connFactory->fromKafkaConsumer($consumer);

When manageLifecycle: false, disconnect() is a no-op — the external system owns the connection lifecycle.

MessagingService (Lazy Loading)

Defers publisher and consumer creation until first use. Ideal for DI containers.

use JardisAdapter\Messaging\MessagingService;

$messaging = new MessagingService(
    publisherFactory: fn() => new MessagePublisher($pubFactory->redis($redisConn)),
    consumerFactory:  fn() => new MessageConsumer($conFactory->redis($redisConn)),
);

$messaging->publish('notifications', ['type' => 'email', 'to' => 'user@example.com']);
$messaging->consume('notifications', $handler);

$messaging->getPublisher();  // MessagePublisherInterface
$messaging->getConsumer();   // MessageConsumerInterface

Error Handling

All exceptions extend MessageException:

Exception When
ConnectionException Connection fails, getClient() called without connect()
PublishException Publishing fails, serialization error, validation failure
ConsumerException Consumer initialization or polling fails

Publisher fallback only triggers on MessageException — other exceptions propagate immediately.

Consumer state-cleanup is performed before re-throwing: NACK in RabbitMQ, attempt tracking in Database, requeue in InMemory.

Documentation

Full documentation, guides, and API reference:

docs.jardis.io/adapter/messaging

License

This package is licensed under the PolyForm Shield License 1.0.0. Free for all use except building competing frameworks or developer tooling.

Jardis · Documentation · Headgent