brandembassy / queue-management
Installs: 131 444
Dependents: 0
Suggesters: 0
Security: 0
Stars: 2
Watchers: 12
Forks: 0
Open Issues: 1
Requires
- php: >=8.1
- ext-json: *
- aws/aws-sdk-php: ^3.209
- brandembassy/datetime: ^3.0
- doctrine/collections: ^1.8.0
- nette/utils: ^3.0
- php-amqplib/php-amqplib: ^2.12 || ^3.5.2
- predis/predis: ^1.1 || ^2.1.2
- psr/log: ^1.1
- ramsey/uuid: ^4.2
- tracy/tracy: ^2.9
Requires (Dev)
- brandembassy/coding-standard: ^11.1
- brandembassy/mockery-tools: ^4.1.1
- dealerdirect/phpcodesniffer-composer-installer: ^0.7.2
- mockery/mockery: ^1.5.1
- phpunit/phpunit: ^10.0
- roave/security-advisories: dev-latest
- dev-master
- 5.x-dev
- 5.7.4
- 5.7.3
- 5.7.2
- 5.7.1
- 5.7
- 5.6.1
- 5.6
- 5.5
- 5.4
- 5.3
- 5.2.x-dev
- 5.2.2
- 5.2.1
- 5.2
- 5.1
- 5.0
- 4.7
- 4.6
- 4.5
- 4.4.1
- 4.4
- 4.3.1
- 4.3
- 4.2
- 4.1
- 4.0
- 3.1.2
- 3.1.1
- 3.1
- 3.0
- 2.x-dev
- 2.2
- 2.1.1
- 2.1
- 2.0
- 2.0-beta
- 1.8
- 1.7.3
- 1.7.2
- 1.7.1
- 1.7
- 1.6.1
- 1.6
- 1.5
- 1.4.2
- 1.4.1
- 1.4
- 1.3.3
- 1.3.2
- 1.3.1
- 1.3
- 1.2
- 1.1
- 1.0
- 0.2.1
- 0.2
- 0.1
- dev-move-timer-up-to-cover-deduplication-time
- dev-DE-44637-job-get-parameters
- dev-DE-47403-update-predis
- dev-DE-40274-allow-php-8-1
- dev-run-ci-php-8
- dev-db-reconnect
- dev-deduplication
- dev-sqs2-update-aws-sdk
- dev-sqs
- dev-sqs-support-poc
- dev-reconnect-when-publish-fail
This package is auto-updated.
Last update: 2024-03-27 13:59:33 UTC
README
Usage
1. Create Job class
<?php declare(strict_types = 1); use BE\QueueManagement\Jobs\SimpleJob; class ExampleJob extends SimpleJob { public const JOB_NAME = 'exampleJob'; public const PARAMETER_FOO = 'foo'; public function getFoo(): string { return $this->getParameter(self::PARAMETER_FOO); } }
2. Create job processor
<?php declare(strict_types = 1); use BE\QueueManagement\Jobs\Execution\JobProcessorInterface; use BE\QueueManagement\Jobs\JobInterface; use YourApp\Jobs\ExampleJob; class ExampleJobProcessor implements JobProcessorInterface { public function process(JobInterface $job): void { assert($job instanceof ExampleJob); echo $job->getFoo(); } }
3. Create job definition
For example using neon DI:
parameters: queue: jobs: defaultJobLoader: BE\QueueManagement\Jobs\Loading\SimpleJobLoader() jobDefinitions: exampleJob: class: YourApp\Jobs\ExampleJob queueName: example_queue maxAttempts: 20 # null means no limit jobLoader: YourApp\JobLoaders\ExampleJobLoader() # if not set default job loader is used jobDelayRule: BE\QueueManagement\Jobs\FailResolving\DelayRules\ConstantDelayRule() jobProcessorService: queue.processors.exampleJobProcessor services: queue.processors.exampleJobProcessor: YourApp\JobProcessors\ExampleJobProcessor # JobDefinitionsContainer queue.jobDefinitionsContainer: BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionsContainer(%queue.jobs.jobDefinitions%)
4. Push job into queue
<?php declare(strict_types = 1); use BE\QueueManagement\Queue\QueueManagerInterface; use YourApp\Jobs\ExampleJob; use BE\QueueManagement\Jobs\JobInterface; use BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionsContainer; class JobPusher { /** * @var QueueManagerInterface */ protected $queueManager; /** * @var JobDefinitionsContainer */ private $jobDefinitionsContainer; public function __construct(QueueManagerInterface $queueManager, JobDefinitionsContainer $jobDefinitionsContainer) { $this->queueManager = $queueManager; $this->jobDefinitionsContainer = $jobDefinitionsContainer; } protected function push(string $jobUuid): void { $jobDefinition = $this->jobDefinitionsContainer->get(ExampleJob::JOB_NAME); $exampleJob = new ExampleJob( $jobUuid, new DateTimeImmutable(), JobInterface::INIT_ATTEMPTS, $jobDefinition, new ArrayCollection([ExampleJob::PARAMETER_FOO => 'bar']) ); $this->queueManager->push($exampleJob); } }
5. Run worker
<?php declare(strict_types = 1); namespace BE\AdapterSdk\Console\Commands\Queue; use BE\QueueManagement\Queue\RabbitMQ\RabbitMQQueueManager; use BE\QueueManagement\Queue\WorkerInterface; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Command\Command; class WorkerStartCommand extends Command { /** * @var WorkerInterface */ private $worker; public function __construct(WorkerInterface $worker) { parent::__construct(); $this->worker = $worker; } protected function configure(): void { $this->setName('queue:worker:start'); $this->setDescription('Start queue worker'); } protected function execute(InputInterface $input, OutputInterface $output): int { $this->worker->start( 'example_queue', [ RabbitMQQueueManager::PREFETCH_COUNT => 1, RabbitMQQueueManager::NO_ACK => true, ] ); $output->writeln('Worker started'); return 0; } }