mahavirnahata/stream-bus

Redis-backed cross-language stream bus for Laravel.

Maintainers

Package info

github.com/mahavirnahata/stream-bus

pkg:composer/mahavirnahata/stream-bus

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 1

Open Issues: 0

0.1.0 2026-02-10 16:01 UTC

This package is auto-updated.

Last update: 2026-04-10 16:26:01 UTC


README

A small Redis-backed stream bus for cross-language workers. Supports Redis Streams and Lists, configurable via config/stream-bus.php.

Requirements

  • PHP 8.2+
  • Laravel 11/12
  • Redis server

Features

  • Redis Streams and Lists drivers
  • Consumer command with long-running loop
  • Cross-language interoperability (Node, Go, Python)
  • Optional dedupe for effectively-once processing
  • Multiple consumers configured via config

Install (Packagist)

composer require mahavirnahata/stream-bus

Install (local development)

Add a path repository and require the package:

{
  "repositories": [
    { "type": "path", "url": "packages/stream-bus" }
  ],
  "require": {
    "mahavirnahata/stream-bus": "*"
  }
}

Publish config:

php artisan vendor:publish --tag=stream-bus-config

Quick start

use MahavirNahata\StreamBus\Facades\StreamBus;

StreamBus::publish('events:outbound', [
    'type' => 'image.process',
    'payload' => ['id' => 123],
]);

Run a consumer from Laravel:

php artisan stream-bus:consume events:inbound App\Handlers\ImageResultHandler --group=laravel

You can also configure consumers in config/stream-bus.php and run:

php artisan stream-bus:consume

End-to-end examples

Example 1: Laravel dispatches, Node.js listens

Laravel (dispatcher):

use MahavirNahata\StreamBus\Facades\StreamBus;

StreamBus::publish('events:outbound', [
    'type' => 'image.process',
    'payload' => ['id' => 123],
]);

Node.js (listener): see examples/node-consumer.js reading from stream-bus:events:outbound.

Example 2: Node.js dispatches, Laravel listens

Node.js (dispatcher): see examples/node-producer.js writing to stream-bus:events:inbound.

Laravel (listener):

<?php

namespace App\Handlers;

use MahavirNahata\StreamBus\Contracts\StreamBusHandler;

class ImageResultHandler implements StreamBusHandler
{
    public function handle(array $message): void
    {
        // Handle inbound data from Node.js
        event(new \\App\\Events\\ExternalResultReceived($message['payload'] ?? []));
    }
}

Run:

php artisan stream-bus:consume events:inbound App\\Handlers\\ImageResultHandler --group=laravel

Or, with config:

php artisan stream-bus:consume

Configuration

return [
    'driver' => env('STREAM_BUS_DRIVER', 'streams'), // streams|lists
    'connection' => env('STREAM_BUS_REDIS', 'default'),
    'prefix' => env('STREAM_BUS_PREFIX', 'stream-bus:'),
    'delivery' => env('STREAM_BUS_DELIVERY', 'at-least-once'), // at-least-once|effectively-once
    'dedupe_ttl' => env('STREAM_BUS_DEDUPE_TTL', 86400),
    'consumers' => [
        'events:inbound' => App\Handlers\ImageResultHandler::class,
        'events:other' => [
            'handler' => App\Handlers\OtherHandler::class,
            'driver' => 'streams',
            'group' => 'other-group',
            'block' => 2000,
        ],
    ],
];

Config reference

  • driver: streams or lists
  • connection: Redis connection name
  • prefix: Key prefix for all topics
  • delivery: at-least-once or effectively-once
  • dedupe_ttl: Dedupe TTL in seconds
  • consumers: Map of topic => handler or topic => options

Shared Redis guidance

This package only reads from the configured stream/list key, not the whole Redis instance. If you share Redis with other apps, use a unique prefix or a separate Redis connection/db:

STREAM_BUS_PREFIX=app1:bus:
STREAM_BUS_REDIS=stream-bus

Publish messages

use MahavirNahata\StreamBus\StreamBus;

app(StreamBus::class)->publish('events:outbound', [
    'type' => 'image.process',
    'payload' => ['id' => 123],
]);

Consume from Laravel

  1. Create a handler:
use MahavirNahata\StreamBus\Contracts\StreamBusHandler;

class ImageResultHandler implements StreamBusHandler
{
    public function handle(array $message): void
    {
        // handle response
    }
}
  1. Run the consumer:
php artisan stream-bus:consume events:inbound App\Handlers\ImageResultHandler --group=laravel

Or, with config (single or multiple consumers):

php artisan stream-bus:consume

Consumer command options

  • topic: Topic name (optional if configured)
  • handler: Handler class (optional if configured)
  • --driver: streams or lists
  • --connection: Redis connection
  • --prefix: Key prefix
  • --group: Consumer group (streams)
  • --consumer: Consumer name (streams, defaults to hostname)
  • --count: Messages per read (streams)
  • --block: Block time in ms (streams) or seconds (lists)
  • --delivery: at-least-once or effectively-once
  • --dedupe-ttl: Dedupe TTL seconds
  • --once: Read once and exit
  • --sleep: Sleep ms when no messages
  • --no-ack: Disable ACK (streams)
  • --stop-on-error: Exit if handler throws

Delivery semantics

  • at-least-once: default
  • effectively-once: best-effort dedupe using a Redis key per message ID

Override from CLI:

php artisan stream-bus:consume events:inbound App\Handlers\ImageResultHandler --delivery=effectively-once --dedupe-ttl=3600

Examples

See examples/ for producers and consumers in Node, Python, Go, and a Laravel handler example.

Round trip flow (outbound -> external worker -> inbound)

  1. Laravel publishes to events:outbound.
  2. External worker consumes and processes.
  3. External worker publishes results to events:inbound.
  4. Laravel consumes events:inbound and dispatches app jobs.

Consume from other languages

Examples are in examples/.

Go (Streams)

  • Use XREADGROUP on stream stream-bus:events:outbound
  • Consumer group: workers
  • ACK with XACK

Node / Python (Streams)

  • Use your Redis client to read from the stream with a consumer group
  • Write responses to stream-bus:events:inbound

Delivery guarantees

  • Streams and Lists are at-least-once by default.
  • Effectively-once uses best-effort dedupe for a configurable TTL.

Troubleshooting

  • Consumer exits immediately: ensure your handler class exists and implements StreamBusHandler, and consumers is not empty when using config.
  • No messages received: verify the topic name and prefix, and for streams ensure the correct group/consumer is used.
  • Duplicate processing: use delivery=effectively-once and a reasonable dedupe_ttl.

FAQ

Does it scan all Redis keys?
No. It only reads the configured topic key with your prefix.

Can I run multiple consumers?
Yes. Define multiple entries in consumers and run php artisan stream-bus:consume.

Exactly-once?
Not guaranteed. Use effectively-once with idempotent handlers.

License

MIT