andreagroferreira / redis-stream
A powerful, reliable Redis Streams implementation for Laravel applications
Fund package maintenance!
andreagroferreira
wizardingcode.io/sponsor
Requires
- php: ^8.1|^8.2|^8.3|^8.4
- illuminate/console: ^9.0|^10.0|^12.0
- illuminate/filesystem: ^9.0|^10.0|^12.0
- illuminate/support: ^9.0|^10.0|^12.0
Requires (Dev)
- mockery/mockery: ^1.5
- orchestra/testbench: ^7.0|^8.0|^9.0
- pestphp/pest: ^1.22|^2.0
- pestphp/pest-plugin-laravel: ^1.4|^2.0
Suggests
- ext-redis: Required for phpredis driver (recommended for production)
- predis/predis: Required for predis driver (^1.1|^2.0)
This package is auto-updated.
Last update: 2025-03-31 19:49:52 UTC
README
A powerful, reliable Redis Streams implementation for Laravel applications. This package makes it easy to work with Redis Streams for event-driven applications, message processing, and real-time data pipelines.
Features
-
β Proper exception handling with typed exceptions
-
π Simple producer/consumer API for Redis Streams
-
β»οΈ Compatible with both phpredis and predis drivers
-
π Automatic consumer group management
-
π Retry handling for failed messages
-
π Batch publishing support for high-throughput applications
-
π Graceful shutdown support
-
βοΈ Configurable stream trimming (MAXLEN)
-
π Built-in Artisan command for consuming streams
Installation
You can install the package via composer:
composer require andreagroferreira/redis-stream
Redis Driver Requirements
This package supports both phpredis
(the PHP extension) and predis
(PHP library) drivers:
For PHP Redis Extension (recommended for production):
# Install the PHP Redis Extension pecl install redis # Add "extension=redis.so" to your php.ini
For Predis Library:
# Install Predis library
composer require predis/predis
The package will automatically detect which driver you're using.
After installation, publish the configuration file:
php artisan vendor:publish --provider="WizardingCode\RedisStream\RedisStreamServiceProvider"
Configuration
Configure your Redis connection for streams in your .env
file:
REDIS_STREAM_CONNECTION=streams
REDIS_STREAM_NAME=default_stream
REDIS_STREAM_CONSUMER_GROUP=default_group
REDIS_STREAM_CONSUMER_NAME=default_consumer
REDIS_STREAM_MAX_LENGTH=1000000
REDIS_STREAM_POLL_INTERVAL=1
REDIS_STREAM_RETRY_LIMIT=3
REDIS_STREAM_BATCH_SIZE=10
You can also define custom streams in the redis_stream.php
config file.
Usage
Exception Handling
This package provides specific exception types for better error handling:
RedisStreamException
: Base exception class for all Redis Stream errorsConnectionException
: Thrown when Redis connection failsPublishException
: Thrown when message publishing failsConsumeException
: Thrown when message consumption failsMessageProcessingException
: Thrown when message processing fails
Example of handling exceptions:
use WizardingCode\RedisStream\Exceptions\ConnectionException; use WizardingCode\RedisStream\Exceptions\PublishException; try { $producer->publish('user.created', $userData); } catch (ConnectionException $e) { // Handle connection issues Log::error("Redis connection error: " . $e->getMessage()); // Maybe retry or queue for later } catch (PublishException $e) { // Handle publishing issues Log::error("Failed to publish message: " . $e->getMessage()); } catch (Exception $e) { // Handle other errors Log::error("Unexpected error: " . $e->getMessage()); }
Basic Example
<?php // Publishing messages $producer = app(WizardingCode\RedisStream\RedisStreamProducer::class); $messageId = $producer->publish('user.created', [ 'user_id' => 1234, 'name' => 'John Doe', 'email' => 'john@example.com' ]); // Consuming messages (in a console command or job) $consumer = app(WizardingCode\RedisStream\RedisStreamConsumer::class); $consumer->consume(function($data, $messageId) { $event = $data['event']; $payload = $data['payload']; // Process the message match($event) { 'user.created' => $this->processUserCreation($payload), 'user.updated' => $this->processUserUpdate($payload), default => $this->processUnknownEvent($event, $payload) }; // Message is auto-acknowledged if no exception is thrown });
Using the Artisan Command
# Basic usage with default settings php artisan redis-stream:consume # Advanced usage with all options php artisan redis-stream:consume \ --stream=my_stream \ --group=my_group \ --consumer=consumer1 \ --handler="App\\Handlers\\MyStreamHandler" \ --interval=5 \ --batch=50 \ --retries=5
Creating a Custom Handler
<?php namespace App\Handlers; class MyStreamHandler { public function handle(array $data, string $messageId): void { $event = $data['event']; $payload = $data['payload']; // Your custom handling logic logger()->info("Processing event: {$event}"); // Process based on event type match($event) { 'order.created' => $this->processOrder($payload), 'payment.completed' => $this->processPayment($payload), default => $this->handleUnknown($event, $payload) }; } protected function processOrder(array $data): void { // Process order logic } protected function processPayment(array $data): void { // Process payment logic } protected function handleUnknown(string $event, array $data): void { logger()->warning("Unknown event type: {$event}"); } }
Batch Publishing
For high-throughput scenarios, you can publish messages in batches:
$producer = app(WizardingCode\RedisStream\RedisStreamProducer::class); $messages = [ [ 'event' => 'user.created', 'payload' => ['user_id' => 1, 'name' => 'User 1'], ], [ 'event' => 'user.created', 'payload' => ['user_id' => 2, 'name' => 'User 2'], ], [ 'event' => 'user.created', 'payload' => ['user_id' => 3, 'name' => 'User 3'], ], ]; $messageIds = $producer->publishBatch($messages);
Stream Trimming
To manage stream size:
$producer = app(WizardingCode\RedisStream\RedisStreamProducer::class); // Trim to approximately 10,000 items (fast) $deleted = $producer->trim(10000); // Trim to exactly 10,000 items (slower) $deleted = $producer->trim(10000, true);
Creating Custom Stream Producers
You can access named stream producers defined in your config:
// Get a specific producer for a named stream $ordersProducer = app('redis_stream.producer.stream_orders'); $ordersProducer->publish('order.created', ['order_id' => 12345]);
Generating Custom Producers and Consumers
This package provides Artisan commands to quickly scaffold custom producer and consumer classes:
Creating a Producer
php artisan redis-stream:make-producer OrderProducer --stream=orders
This will generate an OrderProducer
class that extends the base functionality of RedisStreamProducer
.
Creating a Consumer
php artisan redis-stream:make-consumer OrderConsumer --stream=orders --group=orders_processing --command
The --command
flag will also generate a dedicated Artisan command to run this consumer:
php artisan redis-stream:orders
Consumer Implementation Example
After generating a consumer, you can customize the handleMessage
method to process specific event types:
protected function handleMessage(array $data, string $messageId): void { $event = $data['event'] ?? 'unknown'; $payload = $data['payload'] ?? []; Log::info("Processing {$event} message {$messageId}"); try { // Handle different event types match ($event) { 'order.created' => $this->processNewOrder($payload), 'order.updated' => $this->processOrderUpdate($payload), 'order.cancelled' => $this->processOrderCancellation($payload), default => $this->handleUnknownEvent($event, $payload, $messageId), }; } catch (\Exception $e) { Log::error("Failed to process {$event} message {$messageId}: " . $e->getMessage()); throw $e; // Rethrow to let the consumer handle retries } } private function processNewOrder(array $orderData): void { // Process new order logic }
Testing
This package uses Pest PHP for testing:
composer test
For code coverage:
composer test-coverage
Note: To run the full test suite including feature tests, you need to have Redis installed and configured locally. The feature tests interact with an actual Redis server to verify the Redis Stream functionality.
The tests include both unit tests for exceptions and feature tests for Redis Stream interactions. In our CI setup, we only verify basic functionality to avoid Redis configuration issues in automated environments.
Contributing
Please see CONTRIBUTING for details.
Security Vulnerabilities
Please review our security policy on how to report security vulnerabilities.
Credits
License
The MIT License (MIT). Please see License File for more information.