jardiscore / messaging
A powerful, unified PHP messaging library for Redis, Kafka, and RabbitMQ with automatic serialization, consumer groups, and production-ready features
Installs: 65
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
pkg:composer/jardiscore/messaging
Requires
- php: >=8.2
- ext-amqp: *
- ext-pcntl: *
- ext-rdkafka: *
- ext-redis: *
- jardiscore/dotenv: ^1.0.0
- jardispsr/messaging: ^1.0.0
Requires (Dev)
- phpstan/phpstan: ^2.0.4
- phpunit/phpunit: ^10.5
- squizlabs/php_codesniffer: ^3.11.2
README
A powerful, unified PHP messaging library that makes working with Redis, Kafka, and RabbitMQ effortless.
β¨ Why Choose Jardis Async-Messaging?
π― One API, Multiple Transports
Switch between Redis Streams, Apache Kafka, RabbitMQ, and InMemory (for testing) with zero code changes. Your business logic stays clean while we handle the complexity.
πͺ Built for Modern PHP
- PHP 8.2+ with full type safety and strict types
- Named arguments for crystal-clear configuration
- Dependency injection ready - perfect for frameworks
π‘οΈ Production-Ready Features
- β Layered architecture - automatic fallback & broadcast
- β InMemory transport - deterministic testing without infrastructure
- β Fluent API - 2-line setup, zero boilerplate
- β Automatic JSON serialization/deserialization
- β Connection pooling and graceful reconnection
- β Consumer groups (Redis & Kafka)
- β Message acknowledgement (RabbitMQ & Kafka)
- β Metadata support for tracing and debugging
- β Lazy connections - connect only when needed
π¨ Developer Experience First
- Intuitive, fluent API - publish in 2 lines
- Comprehensive validation - catch errors before they hit production
- Detailed exceptions with context
- Full PHPStan Level 8 compatible
π¬ Battle-Tested
- 374 tests with 84.13% coverage
- Extensive unit and integration test suite
- CI/CD ready with Docker Compose
- Used in production DDD/Event-Driven systems
π¦ Installation
composer require jardiscore/messaging
Requirements:
- PHP 8.2 or higher
- Choose your broker extension(s):
ext-redisfor Redis Streams/Pub-Subext-rdkafkafor Apache Kafkaext-amqpfor RabbitMQ
π Quick Start
The Basic Concept
Your Web-App Redis/Kafka/RabbitMQ Worker Process
β β β
β publish('orders', data) β β
ββββββββββββββββββββββββββββββββββΆβ [Message sits in queue] β
β β β
β β consume('orders', handler) β
β ββββββββββββββββββββββββββββββββββ
β β β
β β handler processes message β
β β return true (ACK = done) β
Two separate processes:
- Your Web-App publishes events (e.g., when a customer places an order)
- A Worker (daemon, cron, supervisor) runs separately and processes the queue
Step 1: Publishing (in your Web-App)
// File: src/Controller/OrderController.php use JardisCore\Messaging\MessagePublisher; class OrderController { private MessagePublisher $publisher; public function __construct() { $this->publisher = (new MessagePublisher())->setRedis('localhost'); } public function createOrder(array $orderData): void { // 1. Save order to database $orderId = $this->saveToDatabase($orderData); // 2. Publish event to queue "orders" - another process will handle it $this->publisher->publish('orders', [ 'order_id' => $orderId, 'customer_email' => $orderData['email'], 'total' => $orderData['total'] ]); // Web request returns immediately - email sending happens async! } }
Step 2: Consuming (separate Worker process)
// File: bin/worker.php (run via: php bin/worker.php) use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; $consumer = (new MessageConsumer()) ->setRedis('localhost') ->enableGracefulShutdown(); // Clean stop on Ctrl+C or SIGTERM $handler = new CallbackHandler(function (array|string $message, array $metadata): bool { // $message = Your data from publish(): ['order_id' => 123, 'customer_email' => '...', 'total' => 99.99] // $metadata = Broker info: ['id' => '...', 'stream' => 'orders', 'timestamp' => ..., 'type' => 'redis'] echo "Processing order #{$message['order_id']}\n"; // Your async work: send email, generate PDF, call external API... sendOrderConfirmationEmail($message['customer_email'], $message['order_id']); return true; // ACK = success, remove from queue // return false; // NACK = failed, keep in queue for retry }); // This runs forever, processing messages as they arrive $consumer->consume('orders', $handler);
Run the worker:
# Development php bin/worker.php # Production (via Supervisor, systemd, Docker, etc.) supervisorctl start order-worker
That's it! Your web-app stays fast (no waiting for emails), and workers process tasks in the background.
π₯ Advanced: Layered Messaging (High Availability)
Stack multiple brokers for automatic fallback:
// Try Redis first, fallback to Kafka if Redis fails $publisher = (new MessagePublisher()) ->setRedis('localhost', priority: 0) // Fastest - try first ->setKafka('kafka:9092', priority: 1); // Fallback $publisher->publish('orders', ['order_id' => 123]); // β Publishes to Redis, OR Kafka if Redis is down!
Or broadcast to ALL brokers:
$publisher = (new MessagePublisher()) ->setRedis('redis-cache') // Hot cache ->setKafka('kafka:9092') // Event log ->setRabbitMq('rabbitmq'); // Task queue $publisher->publishToAll('critical-event', ['status' => 'down']); // β Published to Redis, Kafka, AND RabbitMQ simultaneously!
π All Brokers with Fluent API
Kafka
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; // Publish to Kafka $publisher = (new MessagePublisher())->setKafka('kafka:9092'); $publisher->publish('user-events', [ 'event' => 'user.registered', 'user_id' => 'abc123' ]); // Consume from Kafka $consumer = (new MessageConsumer()) ->setKafka('kafka:9092', groupId: 'order-processors') // groupId required for Kafka ->autoDeserialize(); // Automatically decode JSON messages to arrays $handler = new CallbackHandler(function (array|string $message, array $metadata): bool { // $metadata contains: partition, offset, timestamp, key, topic echo "Partition: " . $metadata['partition'] . ", Offset: " . $metadata['offset'] . "\n"; return true; }); $consumer->consume('orders', $handler, ['timeout' => 1000]); // timeout in ms
RabbitMQ
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; // Publish to RabbitMQ $publisher = (new MessagePublisher()) ->setRabbitMq('rabbitmq', username: 'admin', password: 'secret'); $publisher->publish('notifications', [ 'type' => 'email', 'to' => 'user@example.com' ]); // Consume from RabbitMQ $consumer = (new MessageConsumer()) ->setRabbitMq('rabbitmq', queueName: 'notifications', username: 'admin', password: 'secret'); $handler = new CallbackHandler(function (array|string $message, array $metadata): bool { // $metadata contains: routing_key, delivery_tag, exchange, headers echo "Routing key: " . $metadata['routing_key'] . "\n"; echo "Email to: " . $message['to'] . "\n"; // Process the notification... return true; // ACK - removes message from queue }); $consumer->consume('email.*', $handler); // Wildcard routing key
With Options
// Redis with Pub/Sub (not Streams) $publisher = (new MessagePublisher()) ->setRedis('localhost', options: ['useStreams' => false]); // Kafka with SASL auth $publisher = (new MessagePublisher()) ->setKafka('kafka:9092', username: 'user', password: 'pass'); // RabbitMQ with custom port $publisher = (new MessagePublisher()) ->setRabbitMq('localhost', port: 5673);
π¨ Advanced Features
π‘οΈ Graceful Shutdown (NEW!)
Enable automatic cleanup when your consumer receives termination signals:
use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; $consumer = (new MessageConsumer()) ->setRedis('localhost') ->enableGracefulShutdown(); // Registers SIGTERM & SIGINT handlers $handler = new CallbackHandler(function (array|string $message, array $metadata): bool { // Your processing logic here return true; }); $consumer->consume('orders', $handler); // When you press Ctrl+C or send SIGTERM, consumer stops gracefully
Features:
- Automatically stops on
SIGTERM(Docker/Kubernetes stop) - Automatically stops on
SIGINT(Ctrl+C) - Works on Unix/Linux systems with
pcntlextension - No manual signal handling needed
π MessagingService Wrapper with Lazy Loading (NEW!)
Unified interface for DDD/Application Services with lazy instantiation:
use JardisCore\Messaging\MessagingService; // Setup with lazy loading - Publisher/Consumer created only when needed! $messaging = new MessagingService( publisherFactory: fn() => (new MessagePublisher())->setRedis('localhost'), consumerFactory: fn() => (new MessageConsumer())->setRedis('localhost')->enableGracefulShutdown() ); // Publisher is NOT created yet - no Redis connection established! // Use in your Domain/Application layer class OrderService { public function __construct(private MessagingService $messaging) {} public function placeOrder(Order $order): void { // ... business logic // Publisher is created HERE on first publish() call $this->messaging->publish('orders.placed', [ 'order_id' => $order->id, 'total' => $order->total ]); } public function startOrderProcessor(): void { // Consumer is created HERE on first consume() call $this->messaging->consume('orders.placed', new OrderHandler()); } } // Perfect for DDD contexts like $domain->getMessage()
Benefits:
- β Zero overhead if service is injected but never used
- β No connections established until actually needed
- β Memory efficient - only instantiate what you use
- β Perfect for DI containers - inject everywhere, pay only when used
ποΈ Layered Architecture
Build resilient, high-availability messaging systems by stacking multiple brokers:
Use Case 1: Automatic Fallback
use JardisCore\Messaging\MessagePublisher; // Production setup: Primary + Backup $publisher = (new MessagePublisher()) ->setRedis('redis-primary', priority: 0) // Try first ->setRedis('redis-replica', priority: 1) // Fallback #1 ->setKafka('kafka:9092', priority: 2); // Fallback #2 // Your order data $order = ['order_id' => 123, 'customer' => 'John', 'total' => 99.99]; $publisher->publish('orders', $order); // Automatically tries redis-primary β redis-replica β kafka
Use Case 2: Broadcast Critical Events
// Send to ALL systems simultaneously $publisher = (new MessagePublisher()) ->setRedis('cache') // For real-time processing ->setKafka('kafka:9092') // For audit log ->setRabbitMq('rabbitmq'); // For async workers $results = $publisher->publishToAll('system.alert', [ 'severity' => 'critical', 'message' => 'Database connection lost' ]); // Returns: ['redis' => true, 'kafka' => true, 'rabbitmq' => true]
Use Case 3: Performance Tiering
use JardisCore\Messaging\MessagePublisher; // Hot path + Cold storage $publisher = (new MessagePublisher()) ->setRedis('localhost', priority: 0) // Fast cache (ms) ->setKafka('kafka:9092', priority: 1); // Persistent (s) // Analytics event data $event = ['type' => 'page_view', 'user_id' => 42, 'page' => '/products']; $publisher->publish('analytics', $event); // Writes to Redis for instant queries, // Falls back to Kafka if Redis is saturated
Use Case 4: Consumer Fallback
use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; // Try Redis first, fallback to Kafka $consumer = (new MessageConsumer()) ->setRedis('localhost', priority: 0) ->setKafka('kafka:9092', groupId: 'backup-group', priority: 1); $handler = new CallbackHandler(function (array|string $message, array $metadata): bool { echo "Processing from: " . $metadata['type'] . "\n"; // 'redis' or 'kafka' return true; }); $consumer->consume('orders', $handler); // Reads from Redis, switches to Kafka if Redis fails
Priority System:
- Lower priority number = tried first
- Default: Redis (0) β Kafka (1) β RabbitMQ (2)
- Customize with
priority:parameter
π§ͺ InMemory Transport (Testing)
Test your messaging code without Docker or external brokers:
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; use JardisCore\Messaging\Transport\InMemoryTransport; // Setup - no broker needed! $publisher = (new MessagePublisher())->setInMemory(); $consumer = (new MessageConsumer())->setInMemory(); // Publish $publisher->publish('orders', ['orderId' => 123]); $publisher->publish('orders', ['orderId' => 456]); // Consume (SYNC - processed immediately!) $processed = []; $consumer->consume('orders', new CallbackHandler(function($message, $metadata) use (&$processed) { $processed[] = $message; return true; })); // Assert assert(count($processed) === 2); assert($processed[0]['orderId'] === 123);
Test Isolation:
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\Transport\InMemoryTransport; use PHPUnit\Framework\TestCase; class OrderTest extends TestCase { protected function setUp(): void { // Reset the singleton before each test for isolation InMemoryTransport::reset(); } protected function tearDown(): void { InMemoryTransport::reset(); } public function testOrderPublishesEvent(): void { // Arrange $publisher = (new MessagePublisher())->setInMemory(); // Act - your code that publishes events $publisher->publish('order-events', ['order_id' => 123, 'status' => 'created']); // Assert - verify message was published $transport = InMemoryTransport::getInstance(); $this->assertEquals(1, $transport->getMessageCount('order-events')); // You can also peek at the messages $messages = $transport->peek('order-events'); $this->assertEquals(123, json_decode($messages[0]['message'], true)['order_id']); } }
Benefits:
- β No infrastructure - no Docker, no Redis, no Kafka, no RabbitMQ
- β Deterministic - no race conditions, no timing issues
- β Synchronous - messages processed immediately
- β Fast - instant in-memory operations
- β Perfect for unit tests and CI/CD pipelines
Custom Message Handlers
Instead of using CallbackHandler, you can create your own handler class:
use JardisPsr\Messaging\MessageHandlerInterface; use JardisCore\Messaging\MessageConsumer; /** * Custom handler class for processing orders * Must implement MessageHandlerInterface */ class OrderHandler implements MessageHandlerInterface { public function __construct( private OrderService $orderService, // Your business service private Logger $logger // PSR-3 logger ) {} /** * Handle an incoming message * * @param string|array $message The message payload (decoded JSON = array, raw = string) * @param array $metadata Broker-specific metadata (timestamps, IDs, etc.) * @return bool true = ACK (success), false = NACK (retry/reject) */ public function handle(string|array $message, array $metadata): bool { $this->logger->info('Processing order', [ 'order_id' => $message['order_id'], 'stream_id' => $metadata['id'] // Redis stream ID ]); $this->orderService->process($message); return true; // Message processed successfully } } // Usage $consumer = (new MessageConsumer())->setRedis('localhost'); $orderService = new OrderService(); // Your service $logger = new Logger(); // Your PSR-3 logger $consumer->consume('orders', new OrderHandler($orderService, $logger));
Redis Consumer Groups (Horizontal Scaling)
use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; $consumer = (new MessageConsumer())->setRedis('localhost'); $handler = new CallbackHandler(function (array|string $message, array $metadata): bool { echo "Worker " . gethostname() . " processing order\n"; return true; }); // Multiple consumers with same group = work distribution // Run this script on multiple servers - messages are distributed among them $consumer->consume('orders', $handler, [ 'group' => 'order-workers', // Consumer group name 'consumer' => gethostname(), // Unique consumer name (per instance) 'count' => 10, // Batch size - process 10 messages at a time 'block' => 2000 // Block for 2000ms waiting for messages ]);
Kafka with Custom Configuration
$consumer = (new MessageConsumer())->setKafka( brokers: 'kafka:9092', groupId: 'my-group', options: [ 'session.timeout.ms' => 6000, 'max.poll.interval.ms' => 300000, 'enable.auto.offset.store' => 'false' ] );
RabbitMQ Queue Options
use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; $consumer = (new MessageConsumer())->setRabbitMq( host: 'localhost', queueName: 'high-priority-orders', options: [ 'flags' => AMQP_DURABLE, // Queue survives broker restart 'arguments' => [ 'x-message-ttl' => 60000, // 60 seconds TTL 'x-max-priority' => 10, // Priority queue (0-10) 'x-max-length' => 10000 // Max queue size ] ] ); $handler = new CallbackHandler(function (array|string $message, array $metadata): bool { echo "Processing high-priority order\n"; return true; }); $consumer->consume('orders.*', $handler, [ 'prefetch_count' => 5 // QoS - process 5 messages at a time ]);
Publisher Options
use JardisCore\Messaging\MessagePublisher; // ============ REDIS OPTIONS ============ $redisPublisher = (new MessagePublisher())->setRedis('localhost'); // Add custom fields to Redis stream entry $redisPublisher->publish('orders', ['order_id' => 123], [ 'fields' => [ 'priority' => 'high', 'region' => 'EU' ] ]); // ============ KAFKA OPTIONS ============ $kafkaPublisher = (new MessagePublisher())->setKafka('kafka:9092'); $order = ['order_id' => 456, 'customer_id' => 'cust_789', 'total' => 199.99]; // Use partition key for ordering (same customer = same partition = ordered delivery) $kafkaPublisher->publish('orders', $order, [ 'key' => $order['customer_id'], // Messages with same key go to same partition 'partition' => 2 // Or specify explicit partition number ]); // ============ RABBITMQ OPTIONS ============ $rabbitPublisher = (new MessagePublisher())->setRabbitMq('rabbitmq'); $notification = ['type' => 'email', 'to' => 'user@example.com', 'subject' => 'Hello']; // Set message attributes (priority, persistence, expiration) $rabbitPublisher->publish('notifications', $notification, [ 'attributes' => [ 'priority' => 9, // 0-9, higher = more important 'delivery_mode' => 2, // 2 = persistent (survives restart) 'expiration' => '60000' // Message expires after 60 seconds ] ]);
π External Connection Support
If your application already has established connections (e.g., in a legacy system), you can wrap them for reuse instead of creating new connections. This is perfect for integrating the messaging library into existing applications without duplicating connection resources.
Why Use External Connections?
- Resource Efficiency: Reuse existing connections instead of creating new ones
- Legacy Integration: Seamlessly integrate with existing infrastructure
- Connection Management: Let your existing system manage connection lifecycle
- Zero Migration Cost: Add messaging capabilities without refactoring existing code
Redis
Wrap an existing Redis connection:
use JardisCore\Messaging\Connection\ExternalRedisConnection; use JardisCore\Messaging\Publisher\RedisPublisher; // Your existing Redis connection $legacyRedis = new Redis(); $legacyRedis->connect('localhost', 6379); // Wrap it for reuse $connection = new ExternalRedisConnection($legacyRedis); $publisher = new RedisPublisher($connection); // Use it normally $publisher->publish('orders', json_encode(['order_id' => 123])); // Connection lifecycle is managed by your legacy system
Lifecycle Management:
// Default: Don't close external connection on disconnect $connection = new ExternalRedisConnection($legacyRedis, manageLifecycle: false); // Or: Allow messaging library to close it $connection = new ExternalRedisConnection($legacyRedis, manageLifecycle: true);
Kafka
Wrap an existing Kafka producer or consumer:
use JardisCore\Messaging\Connection\ExternalKafkaConnection; use JardisCore\Messaging\Publisher\KafkaPublisher; use JardisCore\Messaging\Consumer\KafkaConsumer; // For Publishing - wrap existing producer $legacyProducer = new \RdKafka\Producer(); $legacyProducer->addBrokers('kafka1:9092'); $connection = new ExternalKafkaConnection($legacyProducer); $publisher = new KafkaPublisher($connection); $publisher->publish('user-events', json_encode(['event' => 'user.registered'])); // For Consuming - wrap existing consumer $conf = new \RdKafka\Conf(); $conf->set('group.id', 'my-group'); $conf->set('metadata.broker.list', 'kafka1:9092'); $legacyConsumer = new \RdKafka\KafkaConsumer($conf); $connection = new ExternalKafkaConnection($legacyConsumer); $consumer = new KafkaConsumer($connection, 'my-group'); // Handler for processing messages $handler = function (string $message, array $metadata): bool { echo "Received: $message\n"; return true; }; $consumer->consume('user-events', $handler);
Flush on Disconnect (for Producers):
// Default: Don't flush on disconnect $connection = new ExternalKafkaConnection($legacyProducer, flushOnDisconnect: false); // Or: Flush producer when disconnecting $connection = new ExternalKafkaConnection($legacyProducer, flushOnDisconnect: true);
Important: ExternalKafkaConnection supports both \RdKafka\Producer and \RdKafka\KafkaConsumer instances. Make sure to pass the correct type for your use case.
RabbitMQ
Wrap an existing AMQP connection:
use JardisCore\Messaging\Connection\ExternalRabbitMqConnection; use JardisCore\Messaging\Publisher\RabbitMqPublisher; // Your existing RabbitMQ connection $legacyConnection = new \AMQPConnection([ 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', ]); $legacyConnection->connect(); // Wrap it for reuse $connection = new ExternalRabbitMqConnection($legacyConnection); $publisher = new RabbitMqPublisher($connection); // Use it normally $publisher->publish('notifications', json_encode(['type' => 'email']));
Custom Exchange Configuration:
$connection = new ExternalRabbitMqConnection( connection: $legacyConnection, exchangeName: 'custom-exchange', exchangeType: AMQP_EX_TYPE_TOPIC, manageLifecycle: false );
Adding Pre-Configured Publishers/Consumers
Use addPublisher() and addConsumer() methods to add already instantiated broker instances with external connections:
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Handler\CallbackHandler; use JardisCore\Messaging\Connection\ExternalRedisConnection; use JardisCore\Messaging\Connection\ExternalKafkaConnection; use JardisCore\Messaging\Publisher\RedisPublisher; use JardisCore\Messaging\Consumer\KafkaConsumer; // Legacy app setup $legacyRedis = new Redis(); $legacyRedis->connect('localhost', 6379); // Create external connection and publisher $redisConnection = new ExternalRedisConnection($legacyRedis); $redisPublisher = new RedisPublisher($redisConnection, useStreams: true); // Add to MessagePublisher with custom type and priority $messagePublisher = (new MessagePublisher()) ->addPublisher($redisPublisher, 'redis-external', priority: 0) ->setKafka('kafka:9092', priority: 1); // Mix with new connections $messagePublisher->publish('events', ['type' => 'order.created']); // Same pattern for consumers $conf = new \RdKafka\Conf(); $conf->set('group.id', 'my-group'); $conf->set('metadata.broker.list', 'kafka:9092'); $legacyKafkaConsumer = new \RdKafka\KafkaConsumer($conf); $kafkaConnection = new ExternalKafkaConnection($legacyKafkaConsumer); $kafkaConsumer = new KafkaConsumer($kafkaConnection, 'my-group'); $messageConsumer = (new MessageConsumer()) ->addConsumer($kafkaConsumer, 'kafka-external', priority: 0); // Handler for processing messages $handler = new CallbackHandler(function (array|string $message, array $metadata): bool { echo "Processing order from external Kafka\n"; return true; }); $messageConsumer->consume('orders', $handler);
Use Cases:
- Legacy Integration: Reuse existing connections from legacy systems
- Custom Configuration: Add publishers/consumers with advanced setup
- Mixed Environments: Combine external and new connections with priority control
- Testing: Inject mock publishers/consumers for unit tests
Use with Fluent API (Legacy Pattern)
External connections work seamlessly with publishers created via factories:
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\Connection\ExternalRedisConnection; use JardisCore\Messaging\Publisher\RedisPublisher; use Redis; // Legacy app setup $redis = new Redis(); $redis->connect('localhost', 6379); // Create publisher with external connection $externalConnection = new ExternalRedisConnection($redis); $publisher = new RedisPublisher($externalConnection, useStreams: true); // Now wrap it in MessagePublisher if needed $messagePublisher = new MessagePublisher($publisher); $messagePublisher->publish('events', ['type' => 'order.created']);
Best Practices
- Lifecycle Management: Use
manageLifecycle: false(default) when external system owns the connection - Health Checks: External connections perform health checks (e.g., Redis ping)
- Error Handling: If external connection dies, appropriate exceptions are thrown
- Thread Safety: Ensure your external connection is thread-safe if used concurrently
π Connection Access (SharedResource Integration)
Access internally created connections for resource sharing with other components:
Accessing Connections
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\MessageConsumer; // Setup publisher with brokers $publisher = (new MessagePublisher()) ->setRedis('localhost') ->setKafka('kafka:9092') ->setRabbitMq('rabbitmq'); // Check if connection exists if ($publisher->hasConnection('redis')) { // Get the connection instance $redisConnection = $publisher->getConnection('redis'); // Access underlying client for SharedResource $redisClient = $redisConnection->getClient(); } // Same pattern for consumers $consumer = (new MessageConsumer()) ->setRedis('localhost') ->setKafka('kafka:9092', 'my-group'); if ($consumer->hasConnection('kafka')) { $kafkaConnection = $consumer->getConnection('kafka'); }
Integration with SharedResource (Foundation)
// In Foundation after kernel initialization $messaging = $kernel->getMessage(); $publisher = $messaging->getPublisher(); // Share Redis connection with SharedResource if ($publisher->hasConnection('redis')) { $redis = $publisher->getConnection('redis')->getClient(); SharedResource::setRedisMessaging($redis); } // Share Kafka producer if ($publisher->hasConnection('kafka')) { $producer = $publisher->getConnection('kafka')->getClient(); SharedResource::setKafkaProducer($producer); } // Share RabbitMQ connection if ($publisher->hasConnection('rabbitmq')) { $amqp = $publisher->getConnection('rabbitmq')->getConnection(); SharedResource::setAmqp($amqp); }
Available Methods
| Method | Description |
|---|---|
hasConnection(string $type): bool |
Check if connection exists for broker type |
getConnection(string $type): ?ConnectionInterface |
Get connection instance (null if not configured) |
Broker Types: 'redis', 'kafka', 'rabbitmq'
Connection Client Methods
| Connection Class | Method | Returns |
|---|---|---|
RedisConnection |
getClient() |
Redis |
KafkaConnection |
getClient() |
Producer |
RabbitMqConnection |
getConnection() |
AMQPConnection |
π§ͺ Testing
Run All Tests
make phpunit
Run Only Unit Tests
make phpunit-unit
Run Integration Tests (requires Docker)
make start # Start Redis, Kafka, RabbitMQ (Wiremock removed)
make phpunit-integration
Code Quality
make phpstan # Static analysis make phpcs # Coding standards make phpunit-coverage # Coverage report
ποΈ Architecture
Clean, Interface-Driven Design
MessagingService (DDD/Application Layer)
βββ MessagePublisher (Facade)
β βββ Factory/
β β βββ PublisherFactory
β βββ PublisherInterface (contract)
β βββ RedisPublisher
β βββ KafkaPublisher
β βββ RabbitMqPublisher
β βββ InMemoryPublisher (testing)
β
βββ MessageConsumer (Facade)
βββ Factory/
β βββ ConsumerFactory
βββ ConsumerInterface (contract)
βββ RedisConsumer
βββ KafkaConsumer
βββ RabbitMqConsumer
βββ InMemoryConsumer (testing)
Architecture Highlights:
- Factory Pattern: Centralized factory classes for creating publisher/consumer instances (SRP compliance)
- Facade Pattern: MessagePublisher/MessageConsumer provide simple APIs
- Strategy Pattern: Pluggable publisher/consumer implementations
- Layered Fallback: Priority-based automatic failover
Dependency Injection Ready
// In your DI container - MessagingService with Lazy Loading (Recommended) $container->singleton(MessagingService::class, function() { return new MessagingService( publisherFactory: fn() => (new MessagePublisher()) ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT')) ->setKafka(env('KAFKA_BROKERS'), priority: 1), consumerFactory: fn() => (new MessageConsumer()) ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT')) ->enableGracefulShutdown() ); }); // Or register separately $container->singleton(MessagePublisher::class, function() { return (new MessagePublisher()) ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT')); }); $container->singleton(MessageConsumer::class, function() { return (new MessageConsumer()) ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT')) ->enableGracefulShutdown(); });
Classic API (For Advanced Users)
If you need explicit control over connections and publishers:
use JardisCore\Messaging\Config\ConnectionConfig; use JardisCore\Messaging\Connection\RedisConnection; use JardisCore\Messaging\Publisher\RedisPublisher; // Create connection configuration $config = new ConnectionConfig( host: 'localhost', port: 6379, password: 'secret' ); // Create connection $connection = new RedisConnection($config); // Create specific publisher $redisPublisher = new RedisPublisher($connection, useStreams: true); // Use with MessagePublisher $publisher = new MessagePublisher($redisPublisher); $publisher->publish('orders', ['order_id' => 123]);
This approach gives you full control but requires more setup. The fluent API is recommended for most use cases.
π― Use Cases
β Perfect For:
- Event-Driven Architecture - Decouple your services
- Domain-Driven Design - Domain events and command handling
- Microservices Communication - Async messaging between services
- Job Queues - Background processing
- Real-time Updates - WebSocket backends, live dashboards
- CQRS - Command/Query separation with event sourcing
π’ Production Scenarios:
- E-commerce order processing
- User notification systems
- Payment processing pipelines
- Log aggregation and monitoring
- IoT data ingestion
- Analytics event tracking
π Documentation
Key Concepts
Auto-Serialization
- Arrays β Automatically encoded to JSON
- Objects β Must implement
JsonSerializable - Strings β Passed through as-is
Auto-Deserialization
- Valid JSON β Decoded to array
- Invalid JSON β Returned as string
- Can be disabled with
autoDeserialize: false
Connection Management
- Lazy connection - Connects on first use
- Auto-reconnect - Handles connection drops
- Graceful shutdown - Kafka flush on disconnect
Message Metadata
Each message handler receives metadata:
- Redis:
id,stream,timestamp,type - Kafka:
partition,offset,timestamp,key,topic - RabbitMQ:
routing_key,delivery_tag,exchange,headers, etc. - InMemory:
topic,timestamp,type,index, plus custom metadata
Development Setup
git clone https://github.com/jardiscore/messaging.git
cd messaging
make install
make start
make phpunit
π License
PolyForm Noncommercial License 1.0.0 - see LICENSE for details
- β Free for noncommercial use (personal projects, education, research, open source)
- β Free for nonprofits (charities, educational institutions, government)
- β οΈ Commercial use requires a separate license - contact jardiscore@headgent.dev
π Acknowledgments
Built with β€οΈ by Jardis Core Development
Part of the JardisCore ecosystem for building robust, scalable PHP applications.
π Support
- Issues: GitHub Issues
- Email: jardisCore@headgent.dev
- Documentation: [Coming Soon]
Ready to simplify your messaging infrastructure? composer require jardiscore/messaging π