pmg / queue-tactician
Run your asynchronous jobs with Tactician
Installs: 48 778
Dependents: 1
Suggesters: 0
Security: 0
Stars: 1
Watchers: 16
Forks: 1
Open Issues: 0
Requires
- php: ^8.2
- league/tactician: ^1.0
- pmg/queue: ^6.0
Requires (Dev)
- phpunit/phpunit: ^9.5
- pmg/queue: ^6.0@beta
- symfony/phpunit-bridge: ^5.4
This package is auto-updated.
Last update: 2024-12-10 00:59:08 UTC
README
This is a middleware for Tactician to integrate it with pmg/queue.
Installation and Usage
Install with composer.
composer require pmg/queue-tactician
To use it, add the middleware to your middleware chain sometime before the default command handler middleware.
use League\Tactician\CommandBus; use League\Tactician\Handler\CommandHandlerMiddleware; use PMG\Queue\Producer; use PMG\Queue\Tactician\QueueingMiddleware; /** @var Producer */ $producer = createAQueueProducerSomehow(); $bus = new CommandBus([ new QueueingMiddleware($producer), new CommandHandlerMiddleware(/*...*/), ]);
Enqueueing Commands
Any command that implements PMG\Queue\Message
will be put into the queue via
the producer and no further middlewares will be called.
use PMG\Queue\Message; final class DoLongRunningStuff implements Message { /** * {@inheritdoc} */ public function getName() { return 'LongRunningStuff'; } } // goes right into the queue $bus->handle(new DoLongRunningStuff());
Dequeueing (Consuming) Commands
To use tactician to process the messages via the consumer, use
PMG\Queue\Handler\TacticianHandler
.
use PMG\Queue\DefaultConsumer; use PMG\Queue\Handler\TacticianHandler; /** @var League\Tactician\CommandBus $bus */ $handler = new TacticianHandler($bus); /** @var PMG\Queue\Driver $driver */ $consumer = new DefaultConsumer($driver, $handler); $consumer->run();
The above assumes that the CommandBus
instance still has the
QueueingMiddleware
installed. If not, you'll need to use your own handler that
invokes the command bus, perhaps via CallableHandler
.
use League\Tactician\CommandBus; use League\Tactician\Handler\CommandHandlerMiddleware; use PMG\Queue\DefaultConsumer; use PMG\Queue\Message; use PMG\Queue\Handler\CallableHandler; // no QueueingMiddleware! $differentBus = new CommandBus([ new CommandHandlerMiddleware(/*...*/), ]); $handler = new CallableHandler([$bus, 'handle']); /** @var PMG\Queue\Driver $driver */ $consumer = new DefaultConsumer($driver, $handler); $consumer->run();
Beware of Wrapping This Handler with PcntlForkingHandler
The shared instance of the command bus means that it's very likely that things like open database connections will cause issues if/when a child press is forked to handle messages.
Instead a better bet is to create a new command bus for each message.
CreatingTacticianHandler
can do that for you.
use League\Tactician\CommandBus; use League\Tactician\Handler\CommandHandlerMiddleware; use PMG\Queue\Message; use PMG\Queue\Handler\CallableHandler; use PMG\Queue\Tactician\QueuedCommand; use PMG\Queue\Tactician\QueueingMiddleware; use PMG\Queue\Handler\CreatingTacticianHandler; $handler = new CreatingTacticianHandler(function () { // this is invoked for every message return new CommandBus([ new QueueingMiddleware(createAProduerSomehow()), new CommandHandlerMiddlware(/* ... */) ]); }); /** @var PMG\Queue\Driver $driver */ $consumer = new DefaultConsumer($driver, $handler); $consumer->run();