jardiscore / messaging
A powerful, unified PHP messaging library for Redis, Kafka, and RabbitMQ with automatic serialization, consumer groups, and production-ready features
Installs: 38
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
pkg:composer/jardiscore/messaging
Requires
- php: >=8.2
- ext-amqp: *
- ext-pcntl: *
- ext-rdkafka: *
- ext-redis: *
- jardiscore/dotenv: ^1.0.0
- jardispsr/messaging: ^1.0.0
Requires (Dev)
- phpstan/phpstan: ^2.0.4
- phpunit/phpunit: ^10.5
- squizlabs/php_codesniffer: ^3.11.2
README
A powerful, unified PHP messaging library that makes working with Redis, Kafka, and RabbitMQ effortless.
โจ Why Choose Jardis Async-Messaging?
๐ฏ One API, Three Brokers
Switch between Redis Streams, Apache Kafka, and RabbitMQ with zero code changes. Your business logic stays clean while we handle the complexity.
๐ช Built for Modern PHP
- PHP 8.2+ with full type safety and strict types
- Named arguments for crystal-clear configuration
- Dependency injection ready - perfect for frameworks
๐ก๏ธ Production-Ready Features
- โ Layered architecture - automatic fallback & broadcast
- โ Fluent API - 2-line setup, zero boilerplate
- โ Automatic JSON serialization/deserialization
- โ Connection pooling and graceful reconnection
- โ Consumer groups (Redis & Kafka)
- โ Message acknowledgement (RabbitMQ & Kafka)
- โ Metadata support for tracing and debugging
- โ Lazy connections - connect only when needed
๐จ Developer Experience First
- Intuitive, fluent API - publish in 2 lines
- Comprehensive validation - catch errors before they hit production
- Detailed exceptions with context
- Full PHPStan Level 8 compatible
๐ฌ Battle-Tested
- 320 tests with 84.35% coverage
- Extensive unit and integration test suite
- CI/CD ready with Docker Compose
- Used in production DDD/Event-Driven systems
๐ฆ Installation
composer require jardiscore/messaging
Requirements:
- PHP 8.2 or higher
- Choose your broker extension(s):
ext-redisfor Redis Streams/Pub-Subext-rdkafkafor Apache Kafkaext-amqpfor RabbitMQ
๐ Quick Start
โก NEW: Fluent API (Easiest Way!)
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\MessagingService; // Publishing - Just 2 lines! $publisher = (new MessagePublisher())->setRedis('localhost'); $publisher->publish('orders', ['order_id' => 123, 'total' => 99.99]); // Consuming - Just as easy! $consumer = (new MessageConsumer()) ->setRedis('localhost') ->enableGracefulShutdown(); // Auto-stop on SIGTERM/SIGINT $consumer->consume('orders', $handler); // Or use the unified MessagingService wrapper $messaging = new MessagingService($publisher, $consumer); $messaging->publish('orders', ['order_id' => 123]); $messaging->consume('orders', $handler);
That's it! No configuration objects, no connection boilerplate. Just fluent, readable code.
๐ฅ Advanced: Layered Messaging (High Availability)
Stack multiple brokers for automatic fallback:
// Try Redis first, fallback to Kafka if Redis fails $publisher = (new MessagePublisher()) ->setRedis('localhost', priority: 0) // Fastest - try first ->setKafka('kafka:9092', priority: 1); // Fallback $publisher->publish('orders', ['order_id' => 123]); // โ Publishes to Redis, OR Kafka if Redis is down!
Or broadcast to ALL brokers:
$publisher = (new MessagePublisher()) ->setRedis('redis-cache') // Hot cache ->setKafka('kafka:9092') // Event log ->setRabbitMq('rabbitmq'); // Task queue $publisher->publishToAll('critical-event', ['status' => 'down']); // โ Published to Redis, Kafka, AND RabbitMQ simultaneously!
๐ All Brokers with Fluent API
Kafka
// Publish $publisher = (new MessagePublisher())->setKafka('kafka:9092'); $publisher->publish('user-events', [ 'event' => 'user.registered', 'user_id' => 'abc123' ]); // Consume $consumer = (new MessageConsumer()) ->setKafka('kafka:9092', groupId: 'order-processors') ->autoDeserialize(); $consumer->consume('orders', $handler, ['timeout' => 1000]);
RabbitMQ
// Publish $publisher = (new MessagePublisher()) ->setRabbitMq('rabbitmq', username: 'admin', password: 'secret'); $publisher->publish('notifications', [ 'type' => 'email', 'to' => 'user@example.com' ]); // Consume $consumer = (new MessageConsumer()) ->setRabbitMq('rabbitmq', queueName: 'notifications', username: 'admin', password: 'secret'); $handler = new CallbackHandler(function ($msg, $meta): bool { sendEmail($msg); return true; // ACK }); $consumer->consume('email.*', $handler);
With Options
// Redis with Pub/Sub (not Streams) $publisher = (new MessagePublisher()) ->setRedis('localhost', options: ['useStreams' => false]); // Kafka with SASL auth $publisher = (new MessagePublisher()) ->setKafka('kafka:9092', username: 'user', password: 'pass'); // RabbitMQ with custom port $publisher = (new MessagePublisher()) ->setRabbitMq('localhost', port: 5673);
๐จ Advanced Features
๐ก๏ธ Graceful Shutdown (NEW!)
Enable automatic cleanup when your consumer receives termination signals:
$consumer = (new MessageConsumer()) ->setRedis('localhost') ->enableGracefulShutdown(); // Registers SIGTERM & SIGINT handlers $consumer->consume('orders', $handler); // When you press Ctrl+C or send SIGTERM, consumer stops gracefully
Features:
- Automatically stops on
SIGTERM(Docker/Kubernetes stop) - Automatically stops on
SIGINT(Ctrl+C) - Works on Unix/Linux systems with
pcntlextension - No manual signal handling needed
๐ MessagingService Wrapper with Lazy Loading (NEW!)
Unified interface for DDD/Application Services with lazy instantiation:
use JardisCore\Messaging\MessagingService; // Setup with lazy loading - Publisher/Consumer created only when needed! $messaging = new MessagingService( publisherFactory: fn() => (new MessagePublisher())->setRedis('localhost'), consumerFactory: fn() => (new MessageConsumer())->setRedis('localhost')->enableGracefulShutdown() ); // Publisher is NOT created yet - no Redis connection established! // Use in your Domain/Application layer class OrderService { public function __construct(private MessagingService $messaging) {} public function placeOrder(Order $order): void { // ... business logic // Publisher is created HERE on first publish() call $this->messaging->publish('orders.placed', [ 'order_id' => $order->id, 'total' => $order->total ]); } public function startOrderProcessor(): void { // Consumer is created HERE on first consume() call $this->messaging->consume('orders.placed', new OrderHandler()); } } // Perfect for DDD contexts like $domain->getMessage()
Benefits:
- โ Zero overhead if service is injected but never used
- โ No connections established until actually needed
- โ Memory efficient - only instantiate what you use
- โ Perfect for DI containers - inject everywhere, pay only when used
๐๏ธ Layered Architecture
Build resilient, high-availability messaging systems by stacking multiple brokers:
Use Case 1: Automatic Fallback
// Production setup: Primary + Backup $publisher = (new MessagePublisher()) ->setRedis('redis-primary', priority: 0) // Try first ->setRedis('redis-replica', priority: 1) // Fallback #1 ->setKafka('kafka:9092', priority: 2); // Fallback #2 $publisher->publish('orders', $order); // Automatically tries redis-primary โ redis-replica โ kafka
Use Case 2: Broadcast Critical Events
// Send to ALL systems simultaneously $publisher = (new MessagePublisher()) ->setRedis('cache') // For real-time processing ->setKafka('kafka:9092') // For audit log ->setRabbitMq('rabbitmq'); // For async workers $results = $publisher->publishToAll('system.alert', [ 'severity' => 'critical', 'message' => 'Database connection lost' ]); // Returns: ['redis' => true, 'kafka' => true, 'rabbitmq' => true]
Use Case 3: Performance Tiering
// Hot path + Cold storage $publisher = (new MessagePublisher()) ->setRedis('localhost', priority: 0) // Fast cache (ms) ->setKafka('kafka:9092', priority: 1); // Persistent (s) $publisher->publish('analytics', $event); // Writes to Redis for instant queries, // Falls back to Kafka if Redis is saturated
Use Case 4: Consumer Fallback
// Try Redis first, fallback to Kafka $consumer = (new MessageConsumer()) ->setRedis('localhost', priority: 0) ->setKafka('kafka:9092', 'backup-group', priority: 1); $consumer->consume('orders', $handler); // Reads from Redis, switches to Kafka if Redis fails
Priority System:
- Lower priority number = tried first
- Default: Redis (0) โ Kafka (1) โ RabbitMQ (2)
- Customize with
priority:parameter
Custom Message Handlers
use JardisCore\Messaging\contract\MessageHandlerInterface; class OrderHandler implements MessageHandlerInterface { public function __construct( private OrderService $orderService, private Logger $logger ) {} public function handle(string|array $message, array $metadata): bool { $this->logger->info('Processing order', [ 'order_id' => $message['order_id'], 'stream_id' => $metadata['id'] ]); $this->orderService->process($message); return true; } } $consumer->consume('orders', new OrderHandler($orderService, $logger));
Redis Consumer Groups (Horizontal Scaling)
$consumer = (new MessageConsumer())->setRedis('localhost'); // Multiple consumers with same group = work distribution $consumer->consume('orders', $handler, [ 'group' => 'order-workers', 'consumer' => gethostname(), // Unique consumer name 'count' => 10, // Batch size 'block' => 2000 ]);
Kafka with Custom Configuration
$consumer = (new MessageConsumer())->setKafka( brokers: 'kafka:9092', groupId: 'my-group', options: [ 'session.timeout.ms' => 6000, 'max.poll.interval.ms' => 300000, 'enable.auto.offset.store' => 'false' ] );
RabbitMQ Queue Options
$consumer = (new MessageConsumer())->setRabbitMq( host: 'localhost', queueName: 'high-priority-orders', options: [ 'flags' => AMQP_DURABLE, 'arguments' => [ 'x-message-ttl' => 60000, // 60 seconds TTL 'x-max-priority' => 10, // Priority queue 'x-max-length' => 10000 // Max queue size ] ] ); $consumer->consume('orders.*', $handler, [ 'prefetch_count' => 5 // QoS - process 5 at a time ]);
Publisher Options
// Redis with custom fields $publisher->publish('orders', ['order_id' => 123], [ 'fields' => [ 'priority' => 'high', 'region' => 'EU' ] ]); // Kafka with partition key $publisher->publish('orders', $order, [ 'key' => $order['customer_id'], // Same customer = same partition 'partition' => 2 // Or explicit partition ]); // RabbitMQ with message attributes $publisher->publish('notifications', $notification, [ 'attributes' => [ 'priority' => 9, 'delivery_mode' => 2, // Persistent 'expiration' => '60000' // 60s expiration ] ]);
๐ External Connection Support
If your application already has established connections (e.g., in a legacy system), you can wrap them for reuse instead of creating new connections. This is perfect for integrating the messaging library into existing applications without duplicating connection resources.
Why Use External Connections?
- Resource Efficiency: Reuse existing connections instead of creating new ones
- Legacy Integration: Seamlessly integrate with existing infrastructure
- Connection Management: Let your existing system manage connection lifecycle
- Zero Migration Cost: Add messaging capabilities without refactoring existing code
Redis
Wrap an existing Redis connection:
use JardisCore\Messaging\Connection\ExternalRedisConnection; use JardisCore\Messaging\Publisher\RedisPublisher; // Your existing Redis connection $legacyRedis = new Redis(); $legacyRedis->connect('localhost', 6379); // Wrap it for reuse $connection = new ExternalRedisConnection($legacyRedis); $publisher = new RedisPublisher($connection); // Use it normally $publisher->publish('orders', json_encode(['order_id' => 123])); // Connection lifecycle is managed by your legacy system
Lifecycle Management:
// Default: Don't close external connection on disconnect $connection = new ExternalRedisConnection($legacyRedis, manageLifecycle: false); // Or: Allow messaging library to close it $connection = new ExternalRedisConnection($legacyRedis, manageLifecycle: true);
Kafka
Wrap an existing Kafka producer or consumer:
use JardisCore\Messaging\Connection\ExternalKafkaConnection; use JardisCore\Messaging\Publisher\KafkaPublisher; use JardisCore\Messaging\Consumer\KafkaConsumer; // For Publishing - wrap existing producer $legacyProducer = new \RdKafka\Producer(); $legacyProducer->addBrokers('kafka1:9092'); $connection = new ExternalKafkaConnection($legacyProducer); $publisher = new KafkaPublisher($connection); $publisher->publish('user-events', json_encode(['event' => 'user.registered'])); // For Consuming - wrap existing consumer $conf = new \RdKafka\Conf(); $conf->set('group.id', 'my-group'); $conf->set('metadata.broker.list', 'kafka1:9092'); $legacyConsumer = new \RdKafka\KafkaConsumer($conf); $connection = new ExternalKafkaConnection($legacyConsumer); $consumer = new KafkaConsumer($connection, 'my-group'); $consumer->consume('user-events', $handler);
Flush on Disconnect (for Producers):
// Default: Don't flush on disconnect $connection = new ExternalKafkaConnection($legacyProducer, flushOnDisconnect: false); // Or: Flush producer when disconnecting $connection = new ExternalKafkaConnection($legacyProducer, flushOnDisconnect: true);
Important: ExternalKafkaConnection supports both \RdKafka\Producer and \RdKafka\KafkaConsumer instances. Make sure to pass the correct type for your use case.
RabbitMQ
Wrap an existing AMQP connection:
use JardisCore\Messaging\Connection\ExternalRabbitMqConnection; use JardisCore\Messaging\Publisher\RabbitMqPublisher; // Your existing RabbitMQ connection $legacyConnection = new \AMQPConnection([ 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', ]); $legacyConnection->connect(); // Wrap it for reuse $connection = new ExternalRabbitMqConnection($legacyConnection); $publisher = new RabbitMqPublisher($connection); // Use it normally $publisher->publish('notifications', json_encode(['type' => 'email']));
Custom Exchange Configuration:
$connection = new ExternalRabbitMqConnection( connection: $legacyConnection, exchangeName: 'custom-exchange', exchangeType: AMQP_EX_TYPE_TOPIC, manageLifecycle: false );
Adding Pre-Configured Publishers/Consumers
Use addPublisher() and addConsumer() methods to add already instantiated broker instances with external connections:
use JardisCore\Messaging\MessagePublisher; use JardisCore\Messaging\MessageConsumer; use JardisCore\Messaging\Connection\ExternalRedisConnection; use JardisCore\Messaging\Connection\ExternalKafkaConnection; use JardisCore\Messaging\Publisher\RedisPublisher; use JardisCore\Messaging\Consumer\KafkaConsumer; // Legacy app setup $legacyRedis = new Redis(); $legacyRedis->connect('localhost', 6379); // Create external connection and publisher $redisConnection = new ExternalRedisConnection($legacyRedis); $redisPublisher = new RedisPublisher($redisConnection, useStreams: true); // Add to MessagePublisher with custom type and priority $messagePublisher = (new MessagePublisher()) ->addPublisher($redisPublisher, 'redis-external', priority: 0) ->setKafka('kafka:9092', priority: 1); // Mix with new connections $messagePublisher->publish('events', ['type' => 'order.created']); // Same pattern for consumers $legacyKafkaConsumer = new \RdKafka\KafkaConsumer($conf); $kafkaConnection = new ExternalKafkaConnection($legacyKafkaConsumer); $kafkaConsumer = new KafkaConsumer($kafkaConnection, 'my-group'); $messageConsumer = (new MessageConsumer()) ->addConsumer($kafkaConsumer, 'kafka-external', priority: 0); $messageConsumer->consume('orders', $handler);
Use Cases:
- Legacy Integration: Reuse existing connections from legacy systems
- Custom Configuration: Add publishers/consumers with advanced setup
- Mixed Environments: Combine external and new connections with priority control
- Testing: Inject mock publishers/consumers for unit tests
Use with Fluent API (Legacy Pattern)
External connections work seamlessly with publishers created via factories:
// Legacy app setup $redis = new Redis(); $redis->connect('localhost', 6379); // Create publisher with external connection $externalConnection = new ExternalRedisConnection($redis); $publisher = new RedisPublisher($externalConnection, useStreams: true); // Now wrap it in MessagePublisher if needed $messagePublisher = new MessagePublisher($publisher); $messagePublisher->publish('events', ['type' => 'order.created']);
Best Practices
- Lifecycle Management: Use
manageLifecycle: false(default) when external system owns the connection - Health Checks: External connections perform health checks (e.g., Redis ping)
- Error Handling: If external connection dies, appropriate exceptions are thrown
- Thread Safety: Ensure your external connection is thread-safe if used concurrently
๐งช Testing
Run All Tests
make phpunit
Run Only Unit Tests
make phpunit-unit
Run Integration Tests (requires Docker)
make start # Start Redis, Kafka, RabbitMQ (Wiremock removed)
make phpunit-integration
Code Quality
make phpstan # Static analysis make phpcs # Coding standards make phpunit-coverage # Coverage report
๐๏ธ Architecture
Clean, Interface-Driven Design
MessagingService (DDD/Application Layer)
โโโ MessagePublisher (Facade)
โ โโโ Factory/
โ โ โโโ PublisherFactory
โ โโโ PublisherInterface (contract)
โ โโโ RedisPublisher
โ โโโ KafkaPublisher
โ โโโ RabbitMqPublisher
โ
โโโ MessageConsumer (Facade)
โโโ Factory/
โ โโโ ConsumerFactory
โโโ ConsumerInterface (contract)
โโโ RedisConsumer
โโโ KafkaConsumer
โโโ RabbitMqConsumer
Architecture Highlights:
- Factory Pattern: Centralized factory classes for creating publisher/consumer instances (SRP compliance)
- Facade Pattern: MessagePublisher/MessageConsumer provide simple APIs
- Strategy Pattern: Pluggable publisher/consumer implementations
- Layered Fallback: Priority-based automatic failover
Dependency Injection Ready
// In your DI container - MessagingService with Lazy Loading (Recommended) $container->singleton(MessagingService::class, function() { return new MessagingService( publisherFactory: fn() => (new MessagePublisher()) ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT')) ->setKafka(env('KAFKA_BROKERS'), priority: 1), consumerFactory: fn() => (new MessageConsumer()) ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT')) ->enableGracefulShutdown() ); }); // Or register separately $container->singleton(MessagePublisher::class, function() { return (new MessagePublisher()) ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT')); }); $container->singleton(MessageConsumer::class, function() { return (new MessageConsumer()) ->setRedis(env('REDIS_HOST'), (int) env('REDIS_PORT')) ->enableGracefulShutdown(); });
Classic API (For Advanced Users)
If you need explicit control over connections and publishers:
use JardisCore\Messaging\Config\ConnectionConfig; use JardisCore\Messaging\Connection\RedisConnection; use JardisCore\Messaging\Publisher\RedisPublisher; // Create connection configuration $config = new ConnectionConfig( host: 'localhost', port: 6379, password: 'secret' ); // Create connection $connection = new RedisConnection($config); // Create specific publisher $redisPublisher = new RedisPublisher($connection, useStreams: true); // Use with MessagePublisher $publisher = new MessagePublisher($redisPublisher); $publisher->publish('orders', ['order_id' => 123]);
This approach gives you full control but requires more setup. The fluent API is recommended for most use cases.
๐ฏ Use Cases
โ Perfect For:
- Event-Driven Architecture - Decouple your services
- Domain-Driven Design - Domain events and command handling
- Microservices Communication - Async messaging between services
- Job Queues - Background processing
- Real-time Updates - WebSocket backends, live dashboards
- CQRS - Command/Query separation with event sourcing
๐ข Production Scenarios:
- E-commerce order processing
- User notification systems
- Payment processing pipelines
- Log aggregation and monitoring
- IoT data ingestion
- Analytics event tracking
๐ Documentation
Key Concepts
Auto-Serialization
- Arrays โ Automatically encoded to JSON
- Objects โ Must implement
JsonSerializable - Strings โ Passed through as-is
Auto-Deserialization
- Valid JSON โ Decoded to array
- Invalid JSON โ Returned as string
- Can be disabled with
autoDeserialize: false
Connection Management
- Lazy connection - Connects on first use
- Auto-reconnect - Handles connection drops
- Graceful shutdown - Kafka flush on disconnect
Message Metadata
Each message handler receives metadata:
- Redis:
id,stream,timestamp,type - Kafka:
partition,offset,timestamp,key,topic - RabbitMQ:
routing_key,delivery_tag,exchange,headers, etc.
Development Setup
git clone https://github.com/jardiscore/messaging.git
cd messaging
make install
make start
make phpunit
๐ License
PolyForm Noncommercial License 1.0.0 - see LICENSE for details
- โ Free for noncommercial use (personal projects, education, research, open source)
- โ Free for nonprofits (charities, educational institutions, government)
- โ ๏ธ Commercial use requires a separate license - contact jardiscore@headgent.dev
๐ Acknowledgments
Built with โค๏ธ by Jardis Core Development
Part of the JardisCore ecosystem for building robust, scalable PHP applications.
๐ Support
- Issues: GitHub Issues
- Email: jardisCore@headgent.dev
- Documentation: [Coming Soon]
Ready to simplify your messaging infrastructure? composer require jardiscore/messaging ๐