krak / symfony-messenger-redis
Symfony Messenger Redis Adapter
Installs: 19 661
Dependents: 1
Suggesters: 0
Security: 0
Stars: 7
Watchers: 2
Forks: 5
Open Issues: 2
Type:symfony-bundle
Requires
- ext-json: *
- ext-redis: *
- symfony/messenger: ^4.4|^5.4|^6.2
Requires (Dev)
- nyholm/symfony-bundle-test: ^1.6
- phpunit/phpunit: ^9.0
- symfony/dependency-injection: ^5.4
- symfony/http-kernel: ^5.4
- symfony/property-access: ^5.4
- symfony/serializer: ^5.4
README
This provides custom Redis List Integration with the Symfony Messenger ^4.4 system.
The standard redis implementation requires redis 5.0 and utilizes the streams feature, this adapter uses redis lists to power the queue functionality.
Installation
Install with composer at krak/symfony-messenger-redis
.
If symfony's composer install doesn't automatically register the bundle, you can do so manually:
<?php return [ //... Krak\SymfonyMessengerRedis\MessengerRedisBundle::class => ['all' => true], ];
Usage
You can use this queue with the following config for the framework messenger:
framework: messenger: transports: acme_redis: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: { queue: queue_acme }
Where MESSENGER_TRANSPORT_DSN
env is like: redis://localhost:6379
This will register a transport named acme_redis
which will properly use the configured Redis transport from this library.
Unique Messages
If you ever need to ensure that you a specific job needs to be unique while waiting in the queue, you can use the UniqueStamp.
use Symfony\Component\Messenger\Envelope; use Krak\SymfonyMessengerRedis\Stamp\UniqueStamp; Envelope::wrap($message)->with(new UniqueStamp('optional-unique-id'));
If you don't pass an identifier to enforce uniqueness, the transport will perform an md5 hash of the message body to create an identifier.
Example usage of this stamp:
- You dispatch a ProductUpdatedMessage every time a product is saved in one system.
- You have message handlers that then do some expensive operation based off of that product information
- In the event a single product gets saved rapidly without effectively changing, it makes no sense to enqueue the same job for that product 100 times
- Using the unique stamp ensures that if a message is already in the queue for that specific product, don't add in another message since the original message hasn't been processed yet.
Delayed Messages
This library supports the DelayStamp provided by the core SF messenger.
Debounce Messages
You can use DebounceStamp to debounce message sending to transport.
DebounceStamp is similar to the DelayStamp, but essentially, if a debounced message is new, we enter it into the queue exactly like a unique & delayed message would. But if there is a similar message still waiting to be queued, then we would remove it, and re-add it with the new delay. For anyone not aware of what debounce is, https://redd.one/blog/debounce-vs-throttle
use Symfony\Component\Messenger\Envelope; use Krak\SymfonyMessengerRedis\Stamp\DebounceStamp; Envelope::wrap($message)->with(new DebounceStamp('delay-in-ms', 'optional-unique-id'));
Available Options
Here are the available options that can be provided to the transport options array or as query parameters:
- queue:
- required: yes
- default: N/A
- description: The internal list name to use for storing the messages in redis. The system will also create a processing queue named
{queue}_processing
for storing the processed messages.
- db:
- required: no
- default: N/A
- description: The redis database to select when performing redis operations. This value can also be set from the dsn via a query parameter or by setting the path in the dsn (
redis//redis:6379?db=1
orredis://redis:6379/1
).
TLS Support
To enable TLS support, make the DSN schema rediss://
instead of redis://
to note that the redis client should connect with TLS.
Processed Queue
This library uses the rpoplpush reliable queue pattern. It's important to note however that this library does not make any attempt to clean up that processed queue.
We've made great strides to make sure that when everything is working right, that the processing queue is always cleaned up, even when using the autoscaling receiver.
However, if for some reason a worker is killed via SIGKILL
(aka kill -9
), there are chances the worker process could leave a processing item in the queue because it didn't finish it's process. Overtime, this might cause the _processing
to accrue messages that don't need to be there.
It won't hurt anything other than storage to have those _processing
lists take up space. It also shouldn't hurt anything to periodically just wipe out those lists using the DEL
command. But that would need to be verified that it wouldn't hurt any running workers (which I don't think it will).
Using Symfony's Redis Transport at the same time
Both symfony's redis and the krak redis transport register the dsn prefix: redis://
. In the scenario that you want to support both transports, you'll need to use the use_krak_redis
option to disable this libraries redis transport.
framework: messenger: transports: krak_redis: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: { queue: queue_acme } sf_redis: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: { use_krak_redis: false } # this allows symfony's redis transport factory to take precedence
Testing
You can run the test suite with: composer test
You'll need to start the redis docker container locally in order for the Feature test suite to pass.
Keep in mind that you will need to have the redis-ext installed on your local php cli, and will need to start up the redis instance in docker via docker-compose
.