antidot-fw / message-queue
Message bus and Pub-Sub integration queues for Antidot Framework.
Fund package maintenance!
kpicaza
Requires
- php: ^7.4.3|^8.0
- beberlei/assert: ^3.2
- enqueue/enqueue: ^0.10.1
- psr/container: ^1.0.0
- psr/event-dispatcher: ^1.0
Requires (Dev)
- enqueue/dbal: ^0.10.1
- enqueue/fs: ^0.10.1
- enqueue/null: ^0.10.1
- enqueue/pheanstalk: ^0.10.2
- enqueue/redis: ^0.10.1
- enqueue/sqs: ^0.10.1
- infection/infection: ^0.24.0
- phpro/grumphp: ~0.17 || ~1.0
- phpstan/phpstan: ^0.11.5 || ^0.12.0
- phpunit/phpunit: ^8.0 || ^9.0
- squizlabs/php_codesniffer: ^3.4
- symfony/var-dumper: ^4.2 || ^5.0
- vimeo/psalm: ^4.9
Suggests
- enqueue/dbal: To use Database queue with dbal connection.
- enqueue/fs: To use Filesystem queue.
- enqueue/null: Null queue for testing purposes.
- enqueue/pheanstalk: To use queue with Beanstalk connection.
- enqueue/redis: To use queue with Redis connection.
- enqueue/sqs: To use queue with AWS SQS service.
This package is auto-updated.
Last update: 2024-11-29 06:10:42 UTC
README
Message queue implementation using enqueue/enqueue for Antidot Framework.
composer require antidot-fw/message-queue
Message Queue
A message queue is an asynchronous communication method. It allows storing messages in the queue system until they are consumed and destroyed. Each message is processed only once by a unique consumer.
Different Queue Systems
- Null Queue
- Filesystem Queue
- DBAL Queue
- Redis Queue
- Beanstalk
- Amazon SQS
Each implementation will have different configuration details, see concrete documentation section. Furthermore, you can use any of systems implemented in the PHP-enqueue package, making the needed factories.
Usage
You can define as many contexts as you need. You can bind each context to different queues. Once you have created a Context class, you can start sending jobs to the queue. The job should contain the queue name, the message type, and the message itself.
<?php declare(strict_types=1); /** @var \Psr\Container\ContainerInterface $container */ $producer = $container->get(\Antidot\Queue\Producer::class); $producer->enqueue(Job::create('some_queue', 'some_message', 'Hola Mundo!!'));
Start listening a queue
bin/console queue:start default # "default is the queue name"
Now you can configure actions for the message types received by the queue. The action is a callable class that receives a JobPayload as the first parameter.
Jobs and Producer
A Job is a class responsible for transport given data to the queue. It is composed of two parameters: the Queue name as a single string, and the JobPayload with the data to process in the queue. JsonPayload is a JSON serializable object composed of two other parameters: the message type and the message data as string or array.
Once you have a job class, you need to pass it to the producer to send the message to the queue. See the example below.
<?php declare(strict_types=1); use Psr\Container\ContainerInterface; use Antidot\Queue\Producer; /** @var ContainerInterface $container */ /** @var Producer $producer */ $producer = $container->get(Producer::class); // Send String Job of type "some_message_type" to "default" queue. $job1 = Job::create('default', 'some_message_type', 'Hello world!!'); $producer->enqueue($job1); // Send Array Job of type "other_message_type" to "other_queue" queue. $job2 = Job::create('other_queue', 'other_message_type', ['greet' => 'Hello world!!']); $producer->enqueue($job2);
Actions
The actions are invokable classes that will execute when the queue processes the given message. This class has a unique parameter, the JobPayload.
<?php declare(strict_types=1); use Antidot\Queue\JobPayload; class SomeMessageTypeAction { public function __invoke(JobPayload $payload): void { // do some stuff with the job payload here. } }
Config
Bind an action to a message type.
services: some_action_service: class: Some\Action\Class parameters: queues: contexts: default: message_types: # message_type: action_service some_message: some_action_service
This is the default config.
parameters: queues: default_context: default contexts: default: message_types: [] context_type: fs # redis|dbal|sqs|beanstalk|null context_service: queue.context.default container: queue.container.default extensions: - Enqueue\Consumption\Extension\LoggerExtension - Enqueue\Consumption\Extension\SignalExtension - Enqueue\Consumption\Extension\LogExtension
Transport specific config
Null Queue
So util for testing purposes, it discards any received job. The only configuration required by this transport type is to set it as context.
composer require enqueue/null
Filesystem Queue
The Filesystem queue stores produced jobs inside a file in memory. It requires the absolute file path to store the jobs.
composer require enqueue/fs
parameters: queues: default_context: default contexts: default: message_types: [] context_type: fs context_params: path: file:///absoute/path/to/writable/dir
DBAL Queue
The Doctrine DBAL queue stores produced jobs inside a database. It requires the name of the DBAL connection service.
composer require enqueue/dbal
parameters: queues: default_context: default contexts: default: message_types: [] context_type: dbal context_params: connection: Doctrine\DBAL\Connection
Redis Queue
The redis queue stores produced jobs inside a redis database. It requires the redis connection params. You can use it with Predis or with the PHP Redis extension.
With Predis:
composer require enqueue/redis predis/predis:^1
parameters: queues: default_context: default contexts: default: message_types: [] context_type: redis context_params: host: localhost port: 6379 scheme_extensions: ['predis']
With PHP extension:
Ensure that you have PHP Redis extension installed and enabled
composer require enqueue/redis
parameters: queues: default_context: default contexts: default: message_types: [] context_type: redis host: localhost port: 6379 scheme_extensions: ['phpredis']
Beanstalk Queue
The Beanstalk queue requires the beanstalk host and beanstalk port to work. It uses Pheanstalk PHP library.
composer require enqueue/pheanstalk
parameters: queues: default_context: default contexts: default: message_types: [] context_type: beanstalk context_params: host: localhost port: 11300
Amazon SQS Queue
The Amazon SQS queue stores produced jobs at aws. It requires the AWS console credentials. You can use both standard and FIFO queues depending on your requirements. The queue must exist in AWS before sending a Job to work.
composer require enqueue/sqs
parameters: queues: default_context: default contexts: default: message_types: [] context_type: sqs context_params: key: AWS-KEY secret: AWS-SECRET region: eu-west-3
Consumer
The worker is the CLI command responsible for listening to the given queue to get messages and process each message one by one. In this early version, the only argument that it uses is the queue name to start listening.
bin/console queue:start queue_name
Events
The Antidot Framework Message Queue uses the PSR-14 Event Dispatcher to allow listening different instant occurred in the queue execution:
- QueueConsumeWasStarted: Dispatches on queue start.
- MessageReceived: Dispatches for any received job before process.
- MessageProcessed: Dispatches after the job processed, it will have the result of the process.
Extensions
See more about extensions on php-enqueue official docs
LogExtension
You can enable or disable debug mode logger in the framework default config. it uses PSR-3 Logger Interface internally.
Running in Production
In the production environment, you usually need a daemon to keep the consumer process alive. You can use Supervisor or any other system daemon alternative.
Supervisor
You need to install supervidor in your system. Then you need to configure the consumer as a supervisor job.
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /absolute/path/to/app/bin/console queue:start QUEUE_NAME
autostart=true
autorestart=true
user=ubuntu
numprocs=2 # Be cautious, it will block your computer depending on the available simultaneous execution thread it has.
redirect_stderr=true
stdout_logfile=/absolute/path/to/app/var/log/worker.log
stopwaitsecs=3600