brandembassy / queue-management
This package's canonical repository appears to be gone and the package has been frozen as a result. Email us for help if needed.
Installs: 187 173
Dependents: 0
Suggesters: 0
Security: 0
Stars: 2
Watchers: 11
Forks: 0
Open Issues: 0
pkg:composer/brandembassy/queue-management
Requires
- php: >=8.1
 - ext-json: *
 - aws/aws-sdk-php: ^3.209
 - brandembassy/datetime: ^3.0
 - doctrine/collections: ^1.8.0 || ^2.0
 - nette/utils: ^3.0
 - predis/predis: ^1.1 || ^2.1.2
 - psr/log: ^1.1
 - ramsey/uuid: ^4.2
 - symfony/event-dispatcher-contracts: ^3.5
 - tracy/tracy: ^2.9
 
Requires (Dev)
- brandembassy/coding-standard: ^14.0
 - brandembassy/mockery-tools: ^4.1.1
 - dealerdirect/phpcodesniffer-composer-installer: ^0.7.2
 - mockery/mockery: ^1.5.1
 - phpunit/phpunit: ^10.5
 - roave/security-advisories: dev-latest
 
- dev-master
 - 8.1
 - 8.0
 - 7.5
 - 7.4
 - 7.3
 - 7.2
 - 7.1
 - 7.0
 - 6.8
 - 6.7
 - 6.6
 - 6.5
 - 6.4
 - 6.3
 - 6.2
 - 6.1
 - 6.0
 - 5.x-dev
 - 5.7.5
 - 5.7.4
 - 5.7.3
 - 5.7.2
 - 5.7.1
 - 5.7
 - 5.6.1
 - 5.6
 - 5.5
 - 5.4
 - 5.3
 - 5.2.x-dev
 - 5.2.2
 - 5.2.1
 - 5.2
 - 5.1
 - 5.0
 - 4.7
 - 4.6
 - 4.5
 - 4.4.1
 - 4.4
 - 4.3.1
 - 4.3
 - 4.2
 - 4.1
 - 4.0
 - 3.1.2
 - 3.1.1
 - 3.1
 - 3.0
 - 2.x-dev
 - 2.2
 - 2.1.1
 - 2.1
 - 2.0
 - 2.0-beta
 - 1.8
 - 1.7.3
 - 1.7.2
 - 1.7.1
 - 1.7
 - 1.6.1
 - 1.6
 - 1.5
 - 1.4.2
 - 1.4.1
 - 1.4
 - 1.3.3
 - 1.3.2
 - 1.3.1
 - 1.3
 - 1.2
 - 1.1
 - 1.0
 - 0.2.1
 - 0.2
 - 0.1
 - dev-DE-129671-add-sqs-scheduler
 - dev-allow-doctrine-collections-2.0
 - dev-DE-44637-job-get-parameters
 - dev-DE-47403-update-predis
 - dev-DE-40274-allow-php-8-1
 - dev-db-reconnect
 - dev-deduplication
 - dev-sqs2-update-aws-sdk
 - dev-sqs
 - dev-sqs-support-poc
 - dev-reconnect-when-publish-fail
 
This package is auto-updated.
Last update: 2025-07-11 08:54:01 UTC
README
Usage
1. Create Job class
<?php declare(strict_types = 1); use BE\QueueManagement\Jobs\SimpleJob; class ExampleJob extends SimpleJob { public const JOB_NAME = 'exampleJob'; public const PARAMETER_FOO = 'foo'; public function getFoo(): string { return $this->getParameter(self::PARAMETER_FOO); } }
2. Create job processor
<?php declare(strict_types = 1); use BE\QueueManagement\Jobs\Execution\JobProcessorInterface; use BE\QueueManagement\Jobs\JobInterface; use YourApp\Jobs\ExampleJob; class ExampleJobProcessor implements JobProcessorInterface { public function process(JobInterface $job): void { assert($job instanceof ExampleJob); echo $job->getFoo(); } }
3. Create job definition
For example using neon DI:
parameters: queue: jobs: defaultJobLoader: BE\QueueManagement\Jobs\Loading\SimpleJobLoader() jobDefinitions: exampleJob: class: YourApp\Jobs\ExampleJob queueName: example_queue maxAttempts: 20 # null means no limit jobLoader: YourApp\JobLoaders\ExampleJobLoader() # if not set default job loader is used jobDelayRule: BE\QueueManagement\Jobs\FailResolving\DelayRules\ConstantDelayRule() jobProcessorService: queue.processors.exampleJobProcessor services: queue.processors.exampleJobProcessor: YourApp\JobProcessors\ExampleJobProcessor # JobDefinitionsContainer queue.jobDefinitionsContainer: BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionsContainer(%queue.jobs.jobDefinitions%)
4. Push job into queue
<?php declare(strict_types = 1); use BE\QueueManagement\Queue\QueueManagerInterface; use YourApp\Jobs\ExampleJob; use BE\QueueManagement\Jobs\JobInterface; use BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionsContainer; class JobPusher { /** * @var QueueManagerInterface */ protected $queueManager; /** * @var JobDefinitionsContainer */ private $jobDefinitionsContainer; public function __construct(QueueManagerInterface $queueManager, JobDefinitionsContainer $jobDefinitionsContainer) { $this->queueManager = $queueManager; $this->jobDefinitionsContainer = $jobDefinitionsContainer; } protected function push(string $jobUuid): void { $jobDefinition = $this->jobDefinitionsContainer->get(ExampleJob::JOB_NAME); $exampleJob = new ExampleJob( $jobUuid, new DateTimeImmutable(), JobInterface::INIT_ATTEMPTS, $jobDefinition, new ArrayCollection([ExampleJob::PARAMETER_FOO => 'bar']) ); $this->queueManager->push($exampleJob); } }
5. Run worker
<?php declare(strict_types = 1); namespace BE\AdapterSdk\Console\Commands\Queue; use BE\QueueManagement\Queue\WorkerInterface; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Command\Command; class WorkerStartCommand extends Command { /** * @var WorkerInterface */ private $worker; public function __construct(WorkerInterface $worker) { parent::__construct(); $this->worker = $worker; } protected function configure(): void { $this->setName('queue:worker:start'); $this->setDescription('Start queue worker'); } protected function execute(InputInterface $input, OutputInterface $output): int { $queueName = $input->getArgument(self::QUEUE_NAME); assert(is_string($queueName)); $this->sqsWorker->start($queueName); $output->writeln('Worker started'); return 0; } }