jardiscore/messaging

A powerful, unified PHP messaging library for Redis, Kafka, and RabbitMQ with automatic serialization, consumer groups, and production-ready features

Installs: 65

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

pkg:composer/jardiscore/messaging

1.0.0 2026-01-24 10:07 UTC

This package is auto-updated.

Last update: 2026-01-24 10:09:09 UTC


README

A powerful, unified PHP messaging library that makes working with Redis, Kafka, and RabbitMQ effortless.

Build Status License PHP Version PHPStan Level PSR-4 PSR-12 Coverage

✨ Why Choose Jardis Async-Messaging?

🎯 One API, Multiple Transports

Switch between Redis Streams, Apache Kafka, RabbitMQ, and InMemory (for testing) with zero code changes. Your business logic stays clean while we handle the complexity.

πŸ’ͺ Built for Modern PHP

  • PHP 8.2+ with full type safety and strict types
  • Named arguments for crystal-clear configuration
  • Dependency injection ready - perfect for frameworks

πŸ›‘οΈ Production-Ready Features

  • βœ… Layered architecture - automatic fallback & broadcast
  • βœ… InMemory transport - deterministic testing without infrastructure
  • βœ… Fluent API - 2-line setup, zero boilerplate
  • βœ… Automatic JSON serialization/deserialization
  • βœ… Connection pooling and graceful reconnection
  • βœ… Consumer groups (Redis & Kafka)
  • βœ… Message acknowledgement (RabbitMQ & Kafka)
  • βœ… Metadata support for tracing and debugging
  • βœ… Lazy connections - connect only when needed

🎨 Developer Experience First

  • Intuitive, fluent API - publish in 2 lines
  • Comprehensive validation - catch errors before they hit production
  • Detailed exceptions with context
  • Full PHPStan Level 8 compatible

πŸ”¬ Battle-Tested

  • 374 tests with 84.13% coverage
  • Extensive unit and integration test suite
  • CI/CD ready with Docker Compose
  • Used in production DDD/Event-Driven systems

πŸ“¦ Installation

composer require jardiscore/messaging

Requirements:

  • PHP 8.2 or higher
  • Choose your broker extension(s):
    • ext-redis for Redis Streams/Pub-Sub
    • ext-rdkafka for Apache Kafka
    • ext-amqp for RabbitMQ

πŸš€ Quick Start

The Basic Concept

Your Web-App                    Redis/Kafka/RabbitMQ              Worker Process
     β”‚                                 β”‚                                β”‚
     β”‚  publish('orders', data)        β”‚                                β”‚
     │────────────────────────────────▢│  [Message sits in queue]       β”‚
     β”‚                                 β”‚                                β”‚
     β”‚                                 β”‚   consume('orders', handler)   β”‚
     β”‚                                 │◀───────────────────────────────│
     β”‚                                 β”‚                                β”‚
     β”‚                                 β”‚   handler processes message    β”‚
     β”‚                                 β”‚   return true (ACK = done)     β”‚

Two separate processes:

  • Your Web-App publishes events (e.g., when a customer places an order)
  • A Worker (daemon, cron, supervisor) runs separately and processes the queue

Step 1: Publishing (in your Web-App)

// File: src/Controller/OrderController.php
use JardisCore\Messaging\MessagePublisher;

class OrderController
{
    private MessagePublisher $publisher;

    public function __construct()
    {
        $this->publisher = (new MessagePublisher())->setRedis('localhost');
    }

    public function createOrder(array $orderData): void
    {
        // 1. Save order to database
        $orderId = $this->saveToDatabase($orderData);

        // 2. Publish event to queue "orders" - another process will handle it
        $this->publisher->publish('orders', [
            'order_id' => $orderId,
            'customer_email' => $orderData['email'],
            'total' => $orderData['total']
        ]);

        // Web request returns immediately - email sending happens async!
    }
}

Step 2: Consuming (separate Worker process)

// File: bin/worker.php (run via: php bin/worker.php)
use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;

$consumer = (new MessageConsumer())
    ->setRedis('localhost')
    ->enableGracefulShutdown();  // Clean stop on Ctrl+C or SIGTERM

$handler = new CallbackHandler(function (array|string $message, array $metadata): bool {
    // $message = Your data from publish(): ['order_id' => 123, 'customer_email' => '...', 'total' => 99.99]
    // $metadata = Broker info: ['id' => '...', 'stream' => 'orders', 'timestamp' => ..., 'type' => 'redis']

    echo "Processing order #{$message['order_id']}\n";

    // Your async work: send email, generate PDF, call external API...
    sendOrderConfirmationEmail($message['customer_email'], $message['order_id']);

    return true;   // ACK = success, remove from queue
    // return false; // NACK = failed, keep in queue for retry
});

// This runs forever, processing messages as they arrive
$consumer->consume('orders', $handler);

Run the worker:

# Development
php bin/worker.php

# Production (via Supervisor, systemd, Docker, etc.)
supervisorctl start order-worker

That's it! Your web-app stays fast (no waiting for emails), and workers process tasks in the background.

πŸ”₯ Advanced: Layered Messaging (High Availability)

Stack multiple brokers for automatic fallback:

// Try Redis first, fallback to Kafka if Redis fails
$publisher = (new MessagePublisher())
    ->setRedis('localhost', priority: 0)      // Fastest - try first
    ->setKafka('kafka:9092', priority: 1);    // Fallback

$publisher->publish('orders', ['order_id' => 123]);
// β†’ Publishes to Redis, OR Kafka if Redis is down!

Or broadcast to ALL brokers:

$publisher = (new MessagePublisher())
    ->setRedis('redis-cache')     // Hot cache
    ->setKafka('kafka:9092')      // Event log
    ->setRabbitMq('rabbitmq');    // Task queue

$publisher->publishToAll('critical-event', ['status' => 'down']);
// β†’ Published to Redis, Kafka, AND RabbitMQ simultaneously!

πŸ“– All Brokers with Fluent API

Kafka

use JardisCore\Messaging\MessagePublisher;
use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;

// Publish to Kafka
$publisher = (new MessagePublisher())->setKafka('kafka:9092');
$publisher->publish('user-events', [
    'event' => 'user.registered',
    'user_id' => 'abc123'
]);

// Consume from Kafka
$consumer = (new MessageConsumer())
    ->setKafka('kafka:9092', groupId: 'order-processors')  // groupId required for Kafka
    ->autoDeserialize();  // Automatically decode JSON messages to arrays

$handler = new CallbackHandler(function (array|string $message, array $metadata): bool {
    // $metadata contains: partition, offset, timestamp, key, topic
    echo "Partition: " . $metadata['partition'] . ", Offset: " . $metadata['offset'] . "\n";
    return true;
});

$consumer->consume('orders', $handler, ['timeout' => 1000]);  // timeout in ms

RabbitMQ

use JardisCore\Messaging\MessagePublisher;
use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;

// Publish to RabbitMQ
$publisher = (new MessagePublisher())
    ->setRabbitMq('rabbitmq', username: 'admin', password: 'secret');

$publisher->publish('notifications', [
    'type' => 'email',
    'to' => 'user@example.com'
]);

// Consume from RabbitMQ
$consumer = (new MessageConsumer())
    ->setRabbitMq('rabbitmq', queueName: 'notifications', username: 'admin', password: 'secret');

$handler = new CallbackHandler(function (array|string $message, array $metadata): bool {
    // $metadata contains: routing_key, delivery_tag, exchange, headers
    echo "Routing key: " . $metadata['routing_key'] . "\n";
    echo "Email to: " . $message['to'] . "\n";

    // Process the notification...

    return true; // ACK - removes message from queue
});

$consumer->consume('email.*', $handler);  // Wildcard routing key

With Options

// Redis with Pub/Sub (not Streams)
$publisher = (new MessagePublisher())
    ->setRedis('localhost', options: ['useStreams' => false]);

// Kafka with SASL auth
$publisher = (new MessagePublisher())
    ->setKafka('kafka:9092', username: 'user', password: 'pass');

// RabbitMQ with custom port
$publisher = (new MessagePublisher())
    ->setRabbitMq('localhost', port: 5673);

🎨 Advanced Features

πŸ›‘οΈ Graceful Shutdown (NEW!)

Enable automatic cleanup when your consumer receives termination signals:

use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;

$consumer = (new MessageConsumer())
    ->setRedis('localhost')
    ->enableGracefulShutdown();  // Registers SIGTERM & SIGINT handlers

$handler = new CallbackHandler(function (array|string $message, array $metadata): bool {
    // Your processing logic here
    return true;
});

$consumer->consume('orders', $handler);
// When you press Ctrl+C or send SIGTERM, consumer stops gracefully

Features:

  • Automatically stops on SIGTERM (Docker/Kubernetes stop)
  • Automatically stops on SIGINT (Ctrl+C)
  • Works on Unix/Linux systems with pcntl extension
  • No manual signal handling needed

πŸ”Œ MessagingService Wrapper with Lazy Loading (NEW!)

Unified interface for DDD/Application Services with lazy instantiation:

use JardisCore\Messaging\MessagingService;

// Setup with lazy loading - Publisher/Consumer created only when needed!
$messaging = new MessagingService(
    publisherFactory: fn() => (new MessagePublisher())->setRedis('localhost'),
    consumerFactory: fn() => (new MessageConsumer())->setRedis('localhost')->enableGracefulShutdown()
);

// Publisher is NOT created yet - no Redis connection established!

// Use in your Domain/Application layer
class OrderService
{
    public function __construct(private MessagingService $messaging) {}

    public function placeOrder(Order $order): void
    {
        // ... business logic

        // Publisher is created HERE on first publish() call
        $this->messaging->publish('orders.placed', [
            'order_id' => $order->id,
            'total' => $order->total
        ]);
    }

    public function startOrderProcessor(): void
    {
        // Consumer is created HERE on first consume() call
        $this->messaging->consume('orders.placed', new OrderHandler());
    }
}

// Perfect for DDD contexts like $domain->getMessage()

Benefits:

  • βœ… Zero overhead if service is injected but never used
  • βœ… No connections established until actually needed
  • βœ… Memory efficient - only instantiate what you use
  • βœ… Perfect for DI containers - inject everywhere, pay only when used

πŸ—οΈ Layered Architecture

Build resilient, high-availability messaging systems by stacking multiple brokers:

Use Case 1: Automatic Fallback

use JardisCore\Messaging\MessagePublisher;

// Production setup: Primary + Backup
$publisher = (new MessagePublisher())
    ->setRedis('redis-primary', priority: 0)     // Try first
    ->setRedis('redis-replica', priority: 1)     // Fallback #1
    ->setKafka('kafka:9092', priority: 2);       // Fallback #2

// Your order data
$order = ['order_id' => 123, 'customer' => 'John', 'total' => 99.99];

$publisher->publish('orders', $order);
// Automatically tries redis-primary β†’ redis-replica β†’ kafka

Use Case 2: Broadcast Critical Events

// Send to ALL systems simultaneously
$publisher = (new MessagePublisher())
    ->setRedis('cache')       // For real-time processing
    ->setKafka('kafka:9092')  // For audit log
    ->setRabbitMq('rabbitmq'); // For async workers

$results = $publisher->publishToAll('system.alert', [
    'severity' => 'critical',
    'message' => 'Database connection lost'
]);
// Returns: ['redis' => true, 'kafka' => true, 'rabbitmq' => true]

Use Case 3: Performance Tiering

use JardisCore\Messaging\MessagePublisher;

// Hot path + Cold storage
$publisher = (new MessagePublisher())
    ->setRedis('localhost', priority: 0)      // Fast cache (ms)
    ->setKafka('kafka:9092', priority: 1);    // Persistent (s)

// Analytics event data
$event = ['type' => 'page_view', 'user_id' => 42, 'page' => '/products'];

$publisher->publish('analytics', $event);
// Writes to Redis for instant queries,
// Falls back to Kafka if Redis is saturated

Use Case 4: Consumer Fallback

use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;

// Try Redis first, fallback to Kafka
$consumer = (new MessageConsumer())
    ->setRedis('localhost', priority: 0)
    ->setKafka('kafka:9092', groupId: 'backup-group', priority: 1);

$handler = new CallbackHandler(function (array|string $message, array $metadata): bool {
    echo "Processing from: " . $metadata['type'] . "\n";  // 'redis' or 'kafka'
    return true;
});

$consumer->consume('orders', $handler);
// Reads from Redis, switches to Kafka if Redis fails

Priority System:

  • Lower priority number = tried first
  • Default: Redis (0) β†’ Kafka (1) β†’ RabbitMQ (2)
  • Customize with priority: parameter

πŸ§ͺ InMemory Transport (Testing)

Test your messaging code without Docker or external brokers:

use JardisCore\Messaging\MessagePublisher;
use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;
use JardisCore\Messaging\Transport\InMemoryTransport;

// Setup - no broker needed!
$publisher = (new MessagePublisher())->setInMemory();
$consumer = (new MessageConsumer())->setInMemory();

// Publish
$publisher->publish('orders', ['orderId' => 123]);
$publisher->publish('orders', ['orderId' => 456]);

// Consume (SYNC - processed immediately!)
$processed = [];
$consumer->consume('orders', new CallbackHandler(function($message, $metadata) use (&$processed) {
    $processed[] = $message;
    return true;
}));

// Assert
assert(count($processed) === 2);
assert($processed[0]['orderId'] === 123);

Test Isolation:

use JardisCore\Messaging\MessagePublisher;
use JardisCore\Messaging\Transport\InMemoryTransport;
use PHPUnit\Framework\TestCase;

class OrderTest extends TestCase
{
    protected function setUp(): void
    {
        // Reset the singleton before each test for isolation
        InMemoryTransport::reset();
    }

    protected function tearDown(): void
    {
        InMemoryTransport::reset();
    }

    public function testOrderPublishesEvent(): void
    {
        // Arrange
        $publisher = (new MessagePublisher())->setInMemory();

        // Act - your code that publishes events
        $publisher->publish('order-events', ['order_id' => 123, 'status' => 'created']);

        // Assert - verify message was published
        $transport = InMemoryTransport::getInstance();
        $this->assertEquals(1, $transport->getMessageCount('order-events'));

        // You can also peek at the messages
        $messages = $transport->peek('order-events');
        $this->assertEquals(123, json_decode($messages[0]['message'], true)['order_id']);
    }
}

Benefits:

  • βœ… No infrastructure - no Docker, no Redis, no Kafka, no RabbitMQ
  • βœ… Deterministic - no race conditions, no timing issues
  • βœ… Synchronous - messages processed immediately
  • βœ… Fast - instant in-memory operations
  • βœ… Perfect for unit tests and CI/CD pipelines

Custom Message Handlers

Instead of using CallbackHandler, you can create your own handler class:

use JardisPsr\Messaging\MessageHandlerInterface;
use JardisCore\Messaging\MessageConsumer;

/**
 * Custom handler class for processing orders
 * Must implement MessageHandlerInterface
 */
class OrderHandler implements MessageHandlerInterface
{
    public function __construct(
        private OrderService $orderService,  // Your business service
        private Logger $logger               // PSR-3 logger
    ) {}

    /**
     * Handle an incoming message
     *
     * @param string|array $message  The message payload (decoded JSON = array, raw = string)
     * @param array $metadata        Broker-specific metadata (timestamps, IDs, etc.)
     * @return bool                  true = ACK (success), false = NACK (retry/reject)
     */
    public function handle(string|array $message, array $metadata): bool
    {
        $this->logger->info('Processing order', [
            'order_id' => $message['order_id'],
            'stream_id' => $metadata['id']       // Redis stream ID
        ]);

        $this->orderService->process($message);

        return true;  // Message processed successfully
    }
}

// Usage
$consumer = (new MessageConsumer())->setRedis('localhost');
$orderService = new OrderService();  // Your service
$logger = new Logger();              // Your PSR-3 logger

$consumer->consume('orders', new OrderHandler($orderService, $logger));

Redis Consumer Groups (Horizontal Scaling)

use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;

$consumer = (new MessageConsumer())->setRedis('localhost');

$handler = new CallbackHandler(function (array|string $message, array $metadata): bool {
    echo "Worker " . gethostname() . " processing order\n";
    return true;
});

// Multiple consumers with same group = work distribution
// Run this script on multiple servers - messages are distributed among them
$consumer->consume('orders', $handler, [
    'group' => 'order-workers',       // Consumer group name
    'consumer' => gethostname(),      // Unique consumer name (per instance)
    'count' => 10,                    // Batch size - process 10 messages at a time
    'block' => 2000                   // Block for 2000ms waiting for messages
]);

Kafka with Custom Configuration

$consumer = (new MessageConsumer())->setKafka(
    brokers: 'kafka:9092',
    groupId: 'my-group',
    options: [
        'session.timeout.ms' => 6000,
        'max.poll.interval.ms' => 300000,
        'enable.auto.offset.store' => 'false'
    ]
);

RabbitMQ Queue Options

use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;

$consumer = (new MessageConsumer())->setRabbitMq(
    host: 'localhost',
    queueName: 'high-priority-orders',
    options: [
        'flags' => AMQP_DURABLE,              // Queue survives broker restart
        'arguments' => [
            'x-message-ttl' => 60000,         // 60 seconds TTL
            'x-max-priority' => 10,           // Priority queue (0-10)
            'x-max-length' => 10000           // Max queue size
        ]
    ]
);

$handler = new CallbackHandler(function (array|string $message, array $metadata): bool {
    echo "Processing high-priority order\n";
    return true;
});

$consumer->consume('orders.*', $handler, [
    'prefetch_count' => 5  // QoS - process 5 messages at a time
]);

Publisher Options

use JardisCore\Messaging\MessagePublisher;

// ============ REDIS OPTIONS ============
$redisPublisher = (new MessagePublisher())->setRedis('localhost');

// Add custom fields to Redis stream entry
$redisPublisher->publish('orders', ['order_id' => 123], [
    'fields' => [
        'priority' => 'high',
        'region' => 'EU'
    ]
]);

// ============ KAFKA OPTIONS ============
$kafkaPublisher = (new MessagePublisher())->setKafka('kafka:9092');

$order = ['order_id' => 456, 'customer_id' => 'cust_789', 'total' => 199.99];

// Use partition key for ordering (same customer = same partition = ordered delivery)
$kafkaPublisher->publish('orders', $order, [
    'key' => $order['customer_id'],  // Messages with same key go to same partition
    'partition' => 2                  // Or specify explicit partition number
]);

// ============ RABBITMQ OPTIONS ============
$rabbitPublisher = (new MessagePublisher())->setRabbitMq('rabbitmq');

$notification = ['type' => 'email', 'to' => 'user@example.com', 'subject' => 'Hello'];

// Set message attributes (priority, persistence, expiration)
$rabbitPublisher->publish('notifications', $notification, [
    'attributes' => [
        'priority' => 9,              // 0-9, higher = more important
        'delivery_mode' => 2,         // 2 = persistent (survives restart)
        'expiration' => '60000'       // Message expires after 60 seconds
    ]
]);

πŸ”Œ External Connection Support

If your application already has established connections (e.g., in a legacy system), you can wrap them for reuse instead of creating new connections. This is perfect for integrating the messaging library into existing applications without duplicating connection resources.

Why Use External Connections?

  • Resource Efficiency: Reuse existing connections instead of creating new ones
  • Legacy Integration: Seamlessly integrate with existing infrastructure
  • Connection Management: Let your existing system manage connection lifecycle
  • Zero Migration Cost: Add messaging capabilities without refactoring existing code

Redis

Wrap an existing Redis connection:

use JardisCore\Messaging\Connection\ExternalRedisConnection;
use JardisCore\Messaging\Publisher\RedisPublisher;

// Your existing Redis connection
$legacyRedis = new Redis();
$legacyRedis->connect('localhost', 6379);

// Wrap it for reuse
$connection = new ExternalRedisConnection($legacyRedis);
$publisher = new RedisPublisher($connection);

// Use it normally
$publisher->publish('orders', json_encode(['order_id' => 123]));

// Connection lifecycle is managed by your legacy system

Lifecycle Management:

// Default: Don't close external connection on disconnect
$connection = new ExternalRedisConnection($legacyRedis, manageLifecycle: false);

// Or: Allow messaging library to close it
$connection = new ExternalRedisConnection($legacyRedis, manageLifecycle: true);

Kafka

Wrap an existing Kafka producer or consumer:

use JardisCore\Messaging\Connection\ExternalKafkaConnection;
use JardisCore\Messaging\Publisher\KafkaPublisher;
use JardisCore\Messaging\Consumer\KafkaConsumer;

// For Publishing - wrap existing producer
$legacyProducer = new \RdKafka\Producer();
$legacyProducer->addBrokers('kafka1:9092');

$connection = new ExternalKafkaConnection($legacyProducer);
$publisher = new KafkaPublisher($connection);
$publisher->publish('user-events', json_encode(['event' => 'user.registered']));

// For Consuming - wrap existing consumer
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'my-group');
$conf->set('metadata.broker.list', 'kafka1:9092');

$legacyConsumer = new \RdKafka\KafkaConsumer($conf);

$connection = new ExternalKafkaConnection($legacyConsumer);
$consumer = new KafkaConsumer($connection, 'my-group');

// Handler for processing messages
$handler = function (string $message, array $metadata): bool {
    echo "Received: $message\n";
    return true;
};

$consumer->consume('user-events', $handler);

Flush on Disconnect (for Producers):

// Default: Don't flush on disconnect
$connection = new ExternalKafkaConnection($legacyProducer, flushOnDisconnect: false);

// Or: Flush producer when disconnecting
$connection = new ExternalKafkaConnection($legacyProducer, flushOnDisconnect: true);

Important: ExternalKafkaConnection supports both \RdKafka\Producer and \RdKafka\KafkaConsumer instances. Make sure to pass the correct type for your use case.

RabbitMQ

Wrap an existing AMQP connection:

use JardisCore\Messaging\Connection\ExternalRabbitMqConnection;
use JardisCore\Messaging\Publisher\RabbitMqPublisher;

// Your existing RabbitMQ connection
$legacyConnection = new \AMQPConnection([
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
]);
$legacyConnection->connect();

// Wrap it for reuse
$connection = new ExternalRabbitMqConnection($legacyConnection);
$publisher = new RabbitMqPublisher($connection);

// Use it normally
$publisher->publish('notifications', json_encode(['type' => 'email']));

Custom Exchange Configuration:

$connection = new ExternalRabbitMqConnection(
    connection: $legacyConnection,
    exchangeName: 'custom-exchange',
    exchangeType: AMQP_EX_TYPE_TOPIC,
    manageLifecycle: false
);

Adding Pre-Configured Publishers/Consumers

Use addPublisher() and addConsumer() methods to add already instantiated broker instances with external connections:

use JardisCore\Messaging\MessagePublisher;
use JardisCore\Messaging\MessageConsumer;
use JardisCore\Messaging\Handler\CallbackHandler;
use JardisCore\Messaging\Connection\ExternalRedisConnection;
use JardisCore\Messaging\Connection\ExternalKafkaConnection;
use JardisCore\Messaging\Publisher\RedisPublisher;
use JardisCore\Messaging\Consumer\KafkaConsumer;

// Legacy app setup
$legacyRedis = new Redis();
$legacyRedis->connect('localhost', 6379);

// Create external connection and publisher
$redisConnection = new ExternalRedisConnection($legacyRedis);
$redisPublisher = new RedisPublisher($redisConnection, useStreams: true);

// Add to MessagePublisher with custom type and priority
$messagePublisher = (new MessagePublisher())
    ->addPublisher($redisPublisher, 'redis-external', priority: 0)
    ->setKafka('kafka:9092', priority: 1);  // Mix with new connections

$messagePublisher->publish('events', ['type' => 'order.created']);

// Same pattern for consumers
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'my-group');
$conf->set('metadata.broker.list', 'kafka:9092');

$legacyKafkaConsumer = new \RdKafka\KafkaConsumer($conf);
$kafkaConnection = new ExternalKafkaConnection($legacyKafkaConsumer);
$kafkaConsumer = new KafkaConsumer($kafkaConnection, 'my-group');

$messageConsumer = (new MessageConsumer())
    ->addConsumer($kafkaConsumer, 'kafka-external', priority: 0);

// Handler for processing messages
$handler = new CallbackHandler(function (array|string $message, array $metadata): bool {
    echo "Processing order from external Kafka\n";
    return true;
});

$messageConsumer->consume('orders', $handler);

Use Cases:

  • Legacy Integration: Reuse existing connections from legacy systems
  • Custom Configuration: Add publishers/consumers with advanced setup
  • Mixed Environments: Combine external and new connections with priority control
  • Testing: Inject mock publishers/consumers for unit tests

Use with Fluent API (Legacy Pattern)

External connections work seamlessly with publishers created via factories:

use JardisCore\Messaging\MessagePublisher;
use JardisCore\Messaging\Connection\ExternalRedisConnection;
use JardisCore\Messaging\Publisher\RedisPublisher;
use Redis;

// Legacy app setup
$redis = new Redis();
$redis->connect('localhost', 6379);

// Create publisher with external connection
$externalConnection = new ExternalRedisConnection($redis);
$publisher = new RedisPublisher($externalConnection, useStreams: true);

// Now wrap it in MessagePublisher if needed
$messagePublisher = new MessagePublisher($publisher);
$messagePublisher->publish('events', ['type' => 'order.created']);

Best Practices

  1. Lifecycle Management: Use manageLifecycle: false (default) when external system owns the connection
  2. Health Checks: External connections perform health checks (e.g., Redis ping)
  3. Error Handling: If external connection dies, appropriate exceptions are thrown
  4. Thread Safety: Ensure your external connection is thread-safe if used concurrently

πŸ”— Connection Access (SharedResource Integration)

Access internally created connections for resource sharing with other components:

Accessing Connections

use JardisCore\Messaging\MessagePublisher;
use JardisCore\Messaging\MessageConsumer;

// Setup publisher with brokers
$publisher = (new MessagePublisher())
    ->setRedis('localhost')
    ->setKafka('kafka:9092')
    ->setRabbitMq('rabbitmq');

// Check if connection exists
if ($publisher->hasConnection('redis')) {
    // Get the connection instance
    $redisConnection = $publisher->getConnection('redis');

    // Access underlying client for SharedResource
    $redisClient = $redisConnection->getClient();
}

// Same pattern for consumers
$consumer = (new MessageConsumer())
    ->setRedis('localhost')
    ->setKafka('kafka:9092', 'my-group');

if ($consumer->hasConnection('kafka')) {
    $kafkaConnection = $consumer->getConnection('kafka');
}

Integration with SharedResource (Foundation)

// In Foundation after kernel initialization
$messaging = $kernel->getMessage();
$publisher = $messaging->getPublisher();

// Share Redis connection with SharedResource
if ($publisher->hasConnection('redis')) {
    $redis = $publisher->getConnection('redis')->getClient();
    SharedResource::setRedisMessaging($redis);
}

// Share Kafka producer
if ($publisher->hasConnection('kafka')) {
    $producer = $publisher->getConnection('kafka')->getClient();
    SharedResource::setKafkaProducer($producer);
}

// Share RabbitMQ connection
if ($publisher->hasConnection('rabbitmq')) {
    $amqp = $publisher->getConnection('rabbitmq')->getConnection();
    SharedResource::setAmqp($amqp);
}

Available Methods

Method Description
hasConnection(string $type): bool Check if connection exists for broker type
getConnection(string $type): ?ConnectionInterface Get connection instance (null if not configured)

Broker Types: 'redis', 'kafka', 'rabbitmq'

Connection Client Methods

Connection Class Method Returns
RedisConnection getClient() Redis
KafkaConnection getClient() Producer
RabbitMqConnection getConnection() AMQPConnection

πŸ§ͺ Testing

Run All Tests

make phpunit

Run Only Unit Tests

make phpunit-unit

Run Integration Tests (requires Docker)

make start              # Start Redis, Kafka, RabbitMQ (Wiremock removed)
make phpunit-integration

Code Quality

make phpstan            # Static analysis
make phpcs              # Coding standards
make phpunit-coverage   # Coverage report

πŸ—οΈ Architecture

Clean, Interface-Driven Design

MessagingService (DDD/Application Layer)
    β”œβ”€β”€ MessagePublisher (Facade)
    β”‚   β”œβ”€β”€ Factory/
    β”‚   β”‚   └── PublisherFactory
    β”‚   └── PublisherInterface (contract)
    β”‚       β”œβ”€β”€ RedisPublisher
    β”‚       β”œβ”€β”€ KafkaPublisher
    β”‚       β”œβ”€β”€ RabbitMqPublisher
    β”‚       └── InMemoryPublisher (testing)
    β”‚
    └── MessageConsumer (Facade)
        β”œβ”€β”€ Factory/
        β”‚   └── ConsumerFactory
        └── ConsumerInterface (contract)
            β”œβ”€β”€ RedisConsumer
            β”œβ”€β”€ KafkaConsumer
            β”œβ”€β”€ RabbitMqConsumer
            └── InMemoryConsumer (testing)

Architecture Highlights:

  • Factory Pattern: Centralized factory classes for creating publisher/consumer instances (SRP compliance)
  • Facade Pattern: MessagePublisher/MessageConsumer provide simple APIs
  • Strategy Pattern: Pluggable publisher/consumer implementations
  • Layered Fallback: Priority-based automatic failover

Dependency Injection Ready

// In your DI container - MessagingService with Lazy Loading (Recommended)
$container->singleton(MessagingService::class, function() {
    return new MessagingService(
        publisherFactory: fn() => (new MessagePublisher())
            ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT'))
            ->setKafka(env('KAFKA_BROKERS'), priority: 1),
        consumerFactory: fn() => (new MessageConsumer())
            ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT'))
            ->enableGracefulShutdown()
    );
});

// Or register separately
$container->singleton(MessagePublisher::class, function() {
    return (new MessagePublisher())
        ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT'));
});

$container->singleton(MessageConsumer::class, function() {
    return (new MessageConsumer())
        ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT'))
        ->enableGracefulShutdown();
});

Classic API (For Advanced Users)

If you need explicit control over connections and publishers:

use JardisCore\Messaging\Config\ConnectionConfig;
use JardisCore\Messaging\Connection\RedisConnection;
use JardisCore\Messaging\Publisher\RedisPublisher;

// Create connection configuration
$config = new ConnectionConfig(
    host: 'localhost',
    port: 6379,
    password: 'secret'
);

// Create connection
$connection = new RedisConnection($config);

// Create specific publisher
$redisPublisher = new RedisPublisher($connection, useStreams: true);

// Use with MessagePublisher
$publisher = new MessagePublisher($redisPublisher);
$publisher->publish('orders', ['order_id' => 123]);

This approach gives you full control but requires more setup. The fluent API is recommended for most use cases.

🎯 Use Cases

βœ… Perfect For:

  • Event-Driven Architecture - Decouple your services
  • Domain-Driven Design - Domain events and command handling
  • Microservices Communication - Async messaging between services
  • Job Queues - Background processing
  • Real-time Updates - WebSocket backends, live dashboards
  • CQRS - Command/Query separation with event sourcing

🏒 Production Scenarios:

  • E-commerce order processing
  • User notification systems
  • Payment processing pipelines
  • Log aggregation and monitoring
  • IoT data ingestion
  • Analytics event tracking

πŸ“š Documentation

Key Concepts

Auto-Serialization

  • Arrays β†’ Automatically encoded to JSON
  • Objects β†’ Must implement JsonSerializable
  • Strings β†’ Passed through as-is

Auto-Deserialization

  • Valid JSON β†’ Decoded to array
  • Invalid JSON β†’ Returned as string
  • Can be disabled with autoDeserialize: false

Connection Management

  • Lazy connection - Connects on first use
  • Auto-reconnect - Handles connection drops
  • Graceful shutdown - Kafka flush on disconnect

Message Metadata

Each message handler receives metadata:

  • Redis: id, stream, timestamp, type
  • Kafka: partition, offset, timestamp, key, topic
  • RabbitMQ: routing_key, delivery_tag, exchange, headers, etc.
  • InMemory: topic, timestamp, type, index, plus custom metadata

Development Setup

git clone https://github.com/jardiscore/messaging.git
cd messaging
make install
make start
make phpunit

πŸ“„ License

PolyForm Noncommercial License 1.0.0 - see LICENSE for details

  • βœ… Free for noncommercial use (personal projects, education, research, open source)
  • βœ… Free for nonprofits (charities, educational institutions, government)
  • ⚠️ Commercial use requires a separate license - contact jardiscore@headgent.dev

πŸ™ Acknowledgments

Built with ❀️ by Jardis Core Development

Part of the JardisCore ecosystem for building robust, scalable PHP applications.

πŸ“ž Support

Ready to simplify your messaging infrastructure? composer require jardiscore/messaging πŸš€