mahavirnahata / stream-bus
Redis-backed cross-language stream bus for Laravel.
Requires
- php: ^8.2
- illuminate/console: ^11.0|^12.0
- illuminate/contracts: ^11.0|^12.0
- illuminate/redis: ^11.0|^12.0
- illuminate/support: ^11.0|^12.0
- ramsey/uuid: ^4.7
Requires (Dev)
- phpunit/phpunit: ^10.5
README
A small Redis-backed stream bus for cross-language workers. Supports Redis Streams and Lists, configurable via config/stream-bus.php.
Requirements
- PHP 8.2+
- Laravel 11/12
- Redis server
Features
- Redis Streams and Lists drivers
- Consumer command with long-running loop
- Cross-language interoperability (Node, Go, Python)
- Optional dedupe for effectively-once processing
- Multiple consumers configured via config
Install (Packagist)
composer require mahavirnahata/stream-bus
Install (local development)
Add a path repository and require the package:
{
"repositories": [
{ "type": "path", "url": "packages/stream-bus" }
],
"require": {
"mahavirnahata/stream-bus": "*"
}
}
Publish config:
php artisan vendor:publish --tag=stream-bus-config
Quick start
use MahavirNahata\StreamBus\Facades\StreamBus; StreamBus::publish('events:outbound', [ 'type' => 'image.process', 'payload' => ['id' => 123], ]);
Run a consumer from Laravel:
php artisan stream-bus:consume events:inbound App\Handlers\ImageResultHandler --group=laravel
You can also configure consumers in config/stream-bus.php and run:
php artisan stream-bus:consume
End-to-end examples
Example 1: Laravel dispatches, Node.js listens
Laravel (dispatcher):
use MahavirNahata\StreamBus\Facades\StreamBus; StreamBus::publish('events:outbound', [ 'type' => 'image.process', 'payload' => ['id' => 123], ]);
Node.js (listener): see examples/node-consumer.js reading from stream-bus:events:outbound.
Example 2: Node.js dispatches, Laravel listens
Node.js (dispatcher): see examples/node-producer.js writing to stream-bus:events:inbound.
Laravel (listener):
<?php namespace App\Handlers; use MahavirNahata\StreamBus\Contracts\StreamBusHandler; class ImageResultHandler implements StreamBusHandler { public function handle(array $message): void { // Handle inbound data from Node.js event(new \\App\\Events\\ExternalResultReceived($message['payload'] ?? [])); } }
Run:
php artisan stream-bus:consume events:inbound App\\Handlers\\ImageResultHandler --group=laravel
Or, with config:
php artisan stream-bus:consume
Configuration
return [ 'driver' => env('STREAM_BUS_DRIVER', 'streams'), // streams|lists 'connection' => env('STREAM_BUS_REDIS', 'default'), 'prefix' => env('STREAM_BUS_PREFIX', 'stream-bus:'), 'delivery' => env('STREAM_BUS_DELIVERY', 'at-least-once'), // at-least-once|effectively-once 'dedupe_ttl' => env('STREAM_BUS_DEDUPE_TTL', 86400), 'consumers' => [ 'events:inbound' => App\Handlers\ImageResultHandler::class, 'events:other' => [ 'handler' => App\Handlers\OtherHandler::class, 'driver' => 'streams', 'group' => 'other-group', 'block' => 2000, ], ], ];
Config reference
driver:streamsorlistsconnection: Redis connection nameprefix: Key prefix for all topicsdelivery:at-least-onceoreffectively-oncededupe_ttl: Dedupe TTL in secondsconsumers: Map oftopic => handlerortopic => options
Shared Redis guidance
This package only reads from the configured stream/list key, not the whole Redis instance. If you share Redis with other apps, use a unique prefix or a separate Redis connection/db:
STREAM_BUS_PREFIX=app1:bus: STREAM_BUS_REDIS=stream-bus
Publish messages
use MahavirNahata\StreamBus\StreamBus; app(StreamBus::class)->publish('events:outbound', [ 'type' => 'image.process', 'payload' => ['id' => 123], ]);
Consume from Laravel
- Create a handler:
use MahavirNahata\StreamBus\Contracts\StreamBusHandler; class ImageResultHandler implements StreamBusHandler { public function handle(array $message): void { // handle response } }
- Run the consumer:
php artisan stream-bus:consume events:inbound App\Handlers\ImageResultHandler --group=laravel
Or, with config (single or multiple consumers):
php artisan stream-bus:consume
Consumer command options
topic: Topic name (optional if configured)handler: Handler class (optional if configured)--driver:streamsorlists--connection: Redis connection--prefix: Key prefix--group: Consumer group (streams)--consumer: Consumer name (streams, defaults to hostname)--count: Messages per read (streams)--block: Block time in ms (streams) or seconds (lists)--delivery:at-least-onceoreffectively-once--dedupe-ttl: Dedupe TTL seconds--once: Read once and exit--sleep: Sleep ms when no messages--no-ack: Disable ACK (streams)--stop-on-error: Exit if handler throws
Delivery semantics
- at-least-once: default
- effectively-once: best-effort dedupe using a Redis key per message ID
Override from CLI:
php artisan stream-bus:consume events:inbound App\Handlers\ImageResultHandler --delivery=effectively-once --dedupe-ttl=3600
Examples
See examples/ for producers and consumers in Node, Python, Go, and a Laravel handler example.
Round trip flow (outbound -> external worker -> inbound)
- Laravel publishes to
events:outbound. - External worker consumes and processes.
- External worker publishes results to
events:inbound. - Laravel consumes
events:inboundand dispatches app jobs.
Consume from other languages
Examples are in examples/.
Go (Streams)
- Use
XREADGROUPon streamstream-bus:events:outbound - Consumer group:
workers - ACK with
XACK
Node / Python (Streams)
- Use your Redis client to read from the stream with a consumer group
- Write responses to
stream-bus:events:inbound
Delivery guarantees
- Streams and Lists are at-least-once by default.
- Effectively-once uses best-effort dedupe for a configurable TTL.
Troubleshooting
- Consumer exits immediately: ensure your handler class exists and implements
StreamBusHandler, andconsumersis not empty when using config. - No messages received: verify the topic name and prefix, and for streams ensure the correct group/consumer is used.
- Duplicate processing: use
delivery=effectively-onceand a reasonablededupe_ttl.
FAQ
Does it scan all Redis keys?
No. It only reads the configured topic key with your prefix.
Can I run multiple consumers?
Yes. Define multiple entries in consumers and run php artisan stream-bus:consume.
Exactly-once?
Not guaranteed. Use effectively-once with idempotent handlers.
License
MIT