alsc / streambus
Redis Streams message bus
Requires
- php: >=8.2
- predis/predis: ^3.1|dev-main
- psr/log: ^1.1|^2|^3
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.75
- monolog/monolog: *
- phpunit/phpunit: ^11.4
- psalm/phar: ^6.12
- symfony/process: ^7.3
This package is auto-updated.
Last update: 2025-07-19 12:52:07 UTC
README
This library enables you to use Redis Streams as a message bus or queue.
At the technical level, Redis Streams Bus is a collection of Redis streams, each designated for a specific subject or message type, along with abstractions for working with them.
Features
- Handle multiple message types and subjects within a single abstraction
- Independent consumption via consumer groups with any required number of consumers
- Ordered consumption within each subject
- Recover consumption after failures
- Delivery attempt counter
- Place unprocessed messages into a DLQ (Dead Letter Queue)
- Delay processing (NACK) for an arbitrary period in case of failure
- Set a maximum time for message processing
- Queue mode operation with deletion of processed items
- Limit the time and number of stored messages
- Tools for monitoring consumption and bus state
Requirements
- PHP >= 8.2
- Redis or Valkey >= 7.2
Installation
composer require alsc/streambus
Usage
The library provides a factory for configuring and creating the necessary classes. The main components and settings are described below. You can find more usage examples in the examples folder and in the tests of the repository.
Settings
Class StreamBus\StreamBusSettings
This class defines the settings for the entire message bus and is used during initialization of StreamBusBuilder
and StreamBus
.
Key | Description | Default |
---|---|---|
minTTLSec | Minimum message TTL in stream | 86400 |
maxSize | Maximum messages stored per subject | 1000000 |
exactLimits | Apply exact limits detail | false |
deleteOnAck | Delete message from bus with ACK operation | false |
maxDelivery | Maximum attempts to deliver a message. 0 by default means no limit |
0 |
ackExplicit | Should client acknowledge each message it reads | true |
ackWaitMs | Maximum time to process message, after which it will be redelivered | 30 * 60 * 1000 |
nackDelayMs | Time to delay next delivery message attempt | 0 |
maxExpiredSubjects | Maximum number of subjects to read expired messages per call. 0 by default means no limit |
0 |
Example
$settings = new StreamBusSettings( minTTLSec: 86_400, maxSize: 10_000_000, exactLimits: false, deleteOnAck: false, maxDelivery: 10, ackWaitMs: 30 * 60 * 1000, //... ); $builder = StreamBusBuilder::create('bus_name') ->withSettings($settings) //...
Produce
Class StreamBus\Producer\StreamBusProducerInterface
You can add messages to the bus either one by one or in batches.
Example
$serializers = [ 'users.new' => new StreamBusJsonSerializer(), 'products.new' => new StreamBusJsonSerializer(), ]; $builder = StreamBusBuilder::create('bus_name') ->withClient($redisClient) ->withSettings(new StreamBusSettings()) ->withSerializers($serializers); $producer = $builder->createProducer(); $producer->add('users.new', ['id' => 1, 'name' => 'David']); $producer->add('users.new', ['id' => 2, 'name' => 'Andrew']); $producer->add('products.new', ['id' => 1, 'product' => 'guitar']); $producer->addMany('products.new', [ ['id' => 1, 'product' => 'guitar'] ['id' => 2, 'product' => 'flute'] ]);
Consume
Class: StreamBus\Consumer\StreamBusConsumerInterface
There are several types of consumers for message consumption:
StreamBusConsumer
- no order guaranties, at least once deliveryStreamBusOrderedConsumer
- ordered per subject, at least once deliveryStreamBusOrderedStrictConsumer
- same as previous with additional consistency checks
You can specify the subjects you are interested in when creating the consumers. Consumers support blocking reads.
Example
$serializers = [ 'users.new' => new StreamBusJsonSerializer(), 'products.new' => new StreamBusJsonSerializer(), 'orders.new' => new StreamBusJsonSerializer(), ]; $builder = StreamBusBuilder::create('bus_name') ->withClient($redisClient) ->withSettings(new StreamBusSettings()) ->withSerializers($serializers); $consumer = $builder->createConsumer('all', 'consumer'); // or $consumer = $builder->createOrderedConsumer('users', ['users.new']); // or $consumer = $builder->createOrderedStrictConsumer('users_and_orders', ['users.new', 'orders.new']); // Read max 5 messages per subject, block read for 10 seconds if no messages available to read while ($messages = $consumer->read(5, 10_000)) { foreach ($messages as $subject => $subjectMessages) { foreach ($subjectMessages as $messageId => $message) { printf('got message from subject: %s, with id: %s' . PHP_EOL, $subject, $messageId); print_r($message); // ack $consumer->ack($subject, $messageId); // or nack $consumer->nack($subject, $messageId); // or nack with 10 seconds redelivery delay $consumer->nack($subject, $messageId, 10_000); } } }
Consume with processor
Class StreamBus\Processor\StreamBusProcessor
You can also process messages using the processor.
Example
class User { public function __construct(public int $id, public string $name) {} public function toArray(): array { return ['id' => $this->id, 'name' => $this->name]; } public static function fromArray(array $data): self { return new self($data['id'], $data['name']); } } $builder = StreamBusBuilder::create('bus_name') ->withClient($redisClient) ->withSettings(new StreamBusSettings()) ->withSerializers([ 'users.new' => new StreamBusJsonSerializer( static fn(User $user) => $user->toArray(), static fn(array $data) => User::fromArray($data), ), ]); class UsersHandler { public function __invoke(string $type, string $id, User $user): true { printf('Welcome, %s!' . PHP_EOL, $user->name); return true; } } $processor = $builder->createProcessor( 'processor', 'consumer', ['users.new' => new UsersHandler()] )->process();
Dead Letter Queue
The bus supports working with a DLQ, placing messages there after maxDelivery
delivery attempts.
An example of usage can be found in the examples folder.
Observe
Class: StreamBus\StreamBus\StreamBusInfoInterface
Using this interface, you can obtain:
- A list of existing subjects
- A list of consumer groups for a specific subject
- The number of messages taken for processing
- The consumer group lag
- Raw information about the state of the underlying Redis streams (details)
Example
$info = $builder->createBusInfo(); foreach ($info->getSubjects() as $subject) { printf('Subject: %s' . PHP_EOL, $subject); printf(' Stream length: %d' . PHP_EOL, $info->getStreamLength($subject)); foreach ($info->getGroups($subject) as $group) { printf(' Group: %s' . PHP_EOL, $group); printf(' Pending: %d' . PHP_EOL, $info->getGroupPending($group, $subject)); printf(' Time lag: %d' . PHP_EOL, $info->getGroupPending($group, $subject)); } }
Benchmark
The project includes benchmarks that you can run. It is recommended to adjust the test file configurations according to your hardware.
My 5-year-old home laptop is capable of processing around 100,000 messages per second in batch mode. On servers, I have achieved results of around 1 million messages per second in batch mode.
composer test:benchmark
Contributing
This project is open source and welcomes contributions from the community. If you have ideas, improvements, or bug fixes, feel free to open an issue or submit a pull request!