tobento / service-queue
A queue system for processing jobs in background.
Requires
- php: >=8.0
- psr/clock: ^1.0
- psr/container: ^2.0
- psr/simple-cache: 2 - 3
- tobento/service-autowire: ^1.0.9
Requires (Dev)
- mockery/mockery: ^1.6
- phpunit/phpunit: ^9.5
- tobento/service-cache: ^1.0
- tobento/service-clock: ^1.0
- tobento/service-collection: ^1.0.5
- tobento/service-console: ^1.0.3
- tobento/service-container: ^1.0.6
- tobento/service-database: ^1.1.2
- tobento/service-encryption: ^1.0
- tobento/service-event: ^1.0
- tobento/service-storage: ^1.2.5
- vimeo/psalm: ^4.0
Suggests
- tobento/service-cache: May be used for unique jobs
- tobento/service-clock: To support storage queue
- tobento/service-console: To run queue worker via console commands
- tobento/service-database: To support storage queue factory
- tobento/service-encryption: To support job encryption
- tobento/service-event: Used for console commands to write events
- tobento/service-storage: To support storage queue
README
A queue system for processing jobs in background.
Table of Contents
- Getting started
- Documentation
- Credits
Getting started
Add the latest version of the task queue project running this command.
composer require tobento/service-queue
Requirements
- PHP 8.0 or greater
Highlights
- Framework-agnostic, will work with any project
- Decoupled design
Documentation
Creating Jobs
Job
You may use the Job::class
to create jobs.
Using A Named Job
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\JobInterface; $job = new Job( name: 'sample', payload: ['key' => 'value'], ); var_dump($job instanceof JobInterface); // bool(true)
Next, you will need to add add a Job Handler which handles the job:
Using A Job Handler
First, create the job handler:
use Tobento\Service\Queue\JobHandlerInterface; use Tobento\Service\Queue\JobInterface; final class Mail implements JobHandlerInterface { public function __construct( private MailerInterface $mailer, private MessageFacotryInterface $messageFactory, ) {} public function handleJob(JobInterface $job): void { $message = $this->messageFactory->createFromArray($job->getPayload()); $this->mailer->send($message); } public static function toPayload(MessageInterface $message): array { return $message->jsonSerialize(); } }
Finally, create the job:
use Tobento\Service\Queue\Job; $job = new Job( name: Mail::class, payload: Mail::toPayload($message), );
Callable Job
You may use the CallableJob::class
to create jobs.
Parameters of the class constructor must be optional null|(type)
if they cannot be resolved by the container!
use Tobento\Service\Queue\CallableJob; use Tobento\Service\Queue\JobInterface; final class MailJob extends CallableJob { public function __construct( private null|MessageInterface $message = null, ) {} public function handleJob( JobInterface $job, MailerInterface $mailer, MessageFacotryInterface $messageFactory, ): void { $message = $messageFactory->createFromArray($job->getPayload()); $mailer->send($message); } public function getPayload(): array { if (is_null($this->message)) { return []; // or throw exception } return $this->message->jsonSerialize(); } public function renderTemplate(): static { // render template logic ... return $this; } }
Creating the job:
$job = (new MailJob($message)) ->renderTemplate();
Job Parameters
You may use the available parameters providing basic features for jobs or create custom parameters to add new features or customizing existing to suit your needs.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Duration(seconds: 10)) ->parameter(new Parameter\Retry(max: 2));
Parameter helper methods
The Job and Callable Job support the following helper methods:
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\JobInterface; $job = (new Job(name: 'sample')) ->queue(name: 'secondary') ->data(['key' => 'value']) ->duration(seconds: 10) ->retry(max: 2) ->delay(seconds: 5) ->unique() ->priority(100) ->pushing(function() {}) ->encrypt();
If you are using a Callable Job you may specify default parameters using the __construct
method:
use Tobento\Service\Queue\JobHandlerInterface; use Tobento\Service\Queue\Parameter; final class SampleJob extends CallableJob { public function __construct() { $this->duration(seconds: 10); $this->retry(max: 2); // or using its classes: $this->parameter(new Parameter\Priority(100)); } //... }
Delay Parameter
Use the delay parameter to set the seconds the job needs to be delayed.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Delay(seconds: 60)) // or using helper method: ->delay(seconds: 60);
Queues supporting delays:
Data Parameter
Use the data parameter to add additional job data.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Data(['key' => 'value'])) // or using helper method: ->data(['key' => 'value']);
Duration Parameter
Use the duration parameter to set the approximate duration the job needs to process.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Duration(seconds: 10)) // or using helper method: ->duration(seconds: 10);
The Failed Job Handler will requeue the job if the job could not be run to prevent timing out.
Encrypt Parameter
The encrypt parameter uses the Service Encryption to encrypt the job data.
It will encrypt the following data:
- job payload
- Data Parameter values
First, install the service:
composer require tobento/service-encryption
Next, bind the encrypter to your container used by the Job Processor:
Example using the Service Container as container:
use Tobento\Service\Queue\JobProcessor; use Tobento\Service\Container\Container; use Tobento\Service\Encryption\EncrypterInterface; $container = new Container(); $container->set(EncrypterInterface::class, function() { // create enrcypter: return $encrypter; }); $jobProcessor = new JobProcessor($container);
Check out the Crypto Implementation section to learn more.
Finally, add the parameter to your job:
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Encrypt()) // or using helper method: ->encrypt();
You may create a custom encrypt parameter to use another encrypter or to customize the encryption.
Monitor Parameter
The monitor parameter is added by the Worker and may be used to log data about jobs such as the runtime in seconds and the memory usage. For instance, the parameter is used by the Work Command to write its data to the console.
use Tobento\Service\Queue\Parameter\Monitor; if ($job->parameters()->has(Monitor::class)) { $monitor = $job->parameters()->get(Monitor::class); $runtimeInSeconds = $monitor->runtimeInSeconds(); $memoryUsage = $monitor->memoryUsage(); }
Priority Parameter
Use the priority parameter to specify the priority of the job. Higher prioritized jobs will be processed first.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Priority(100)) // or using helper method: ->priority(100);
Pushing Parameter
Use the pushing parameter to specify a handler executed before the job gets pushed to the queue.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\JobInterface; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Pushing( handler: function(JobInterface $job, AnyResolvableClass $foo): JobInterface { return $job; }, // you may set a priority. Higher gets executed first: priority: 100, // 0 is default )) // or using helper method: ->pushing(handler: function() {}, priority: 100);
Queue Parameter
Use the queue parameter to specify the queue to push the job to.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Queue(name: 'secondary')) // or using helper method: ->queue(name: 'secondary');
The parameter will automatically be added by the Job Processor when the job is pushed to the queue.
Retry Parameter
Use the retry parameter to specify the max number of retries.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Retry(max: 2)) // or using helper method: ->retry(max: 2);
The Failed Job Handler uses the parameter to handle the retries.
Unique Parameter
The unique parameter will prevent any new, duplicate jobs from entering the queue while another instance of that job is queued or processing.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Unique( // A unique id. If null it uses the job id. id: null, // null|string )) // or using helper method: ->unique(id: null);
The parameter requires a CacheInterface::class
to be binded to your container passed to the JobProcessor:
Example using the Cache Service and Container Service:
use Tobento\Service\Queue\JobProcessor; use Tobento\Service\Container\Container; use Psr\SimpleCache\CacheInterface; use Tobento\Service\Cache\Simple\Psr6Cache; use Tobento\Service\Cache\ArrayCacheItemPool; use Tobento\Service\Clock\SystemClock; $container = new Container(); $container->set(CacheInterface::class, function() { // create cache: return new Psr6Cache( pool: new ArrayCacheItemPool( clock: new SystemClock(), ), namespace: 'default', ttl: null, ); }); $jobProcessor = new JobProcessor($container);
Without Overlapping Parameter
If you add the without overlapping parameter, the job will only be processed once at a time to prevent overlapping.
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\WithoutOverlapping( // A unique id. If null it uses the job id. id: null, // null|string )) // or using helper method: ->withoutOverlapping(id: null);
The parameter requires a CacheInterface::class
to be binded to your container passed to the JobProcessor:
Example using the Cache Service and Container Service:
use Tobento\Service\Queue\JobProcessor; use Tobento\Service\Container\Container; use Psr\SimpleCache\CacheInterface; use Tobento\Service\Cache\Simple\Psr6Cache; use Tobento\Service\Cache\ArrayCacheItemPool; use Tobento\Service\Clock\SystemClock; $container = new Container(); $container->set(CacheInterface::class, function() { // create cache: return new Psr6Cache( pool: new ArrayCacheItemPool( clock: new SystemClock(), ), namespace: 'default', ttl: null, ); }); $jobProcessor = new JobProcessor($container);
Dispatching Jobs
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\QueueInterface; class SomeService { public function createJob(QueueInterface $queue): void { $job = new Job( name: 'sample', payload: ['key' => 'value'], ); $queue->push($job); } }
You may consider binding one of the Queues to the container as the default QueueInterface
implementation, otherwise you will need to use the queues in order to dispatch on a certain queue:
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\QueueException; class SomeService { public function createJob(QueuesInterface $queues): void { $job = new Job(name: 'sample'); $queues->queue(name: 'secondary')->push($job); // throws QueueException if not exists. // or $queues->get(name: 'secondary')?->push($job); // or you may check if queue exists before: if ($queues->has(name: 'secondary')) { $queues->queue(name: 'secondary')->push($job); } } }
Queue
In Memory Queue
The InMemoryQueue::class
does store jobs in memory.
use Tobento\Service\Queue\InMemoryQueue; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\JobProcessorInterface; $queue = new InMemoryQueue( name: 'inmemeory', jobProcessor: $jobProcessor, // JobProcessorInterface priority: 100, ); var_dump($queue instanceof QueueInterface); // bool(true)
Null Queue
The NullQueue::class
does not queue any job and therefore jobs will not be processed at all.
use Tobento\Service\Queue\NullQueue; use Tobento\Service\Queue\QueueInterface; $queue = new NullQueue( name: 'null', priority: 100, ); var_dump($queue instanceof QueueInterface); // bool(true)
Storage Queue
The StorageQueue::class
uses the Storage Service to store the jobs.
First, you will need to install the storage service:
composer require tobento/service-storage
Next, you may install the clock service or use another implementation:
composer require tobento/service-clock
Finally, create the queue:
use Tobento\Service\Queue\Storage; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\JobProcessorInterface; use Tobento\Service\Storage\StorageInterface; use Psr\Clock\ClockInterface; $queue = new Storage\Queue( name: 'storage', jobProcessor: $jobProcessor, // JobProcessorInterface storage: $storage, // StorageInterface clock: $clock, // ClockInterface table: 'jobs', priority: 100, ); var_dump($queue instanceof QueueInterface); // bool(true)
The storage needs to have the following table columns:
Sync Queue
The SyncQueue::class
does dispatch jobs immediately without queuing.
use Tobento\Service\Queue\SyncQueue; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\JobProcessorInterface; use Psr\EventDispatcher\EventDispatcherInterface; $queue = new SyncQueue( name: 'sync', jobProcessor: $jobProcessor, // JobProcessorInterface eventDispatcher: null, // null|EventDispatcherInterface priority: 100, ); var_dump($queue instanceof QueueInterface); // bool(true)
Queues
Default Queues
use Tobento\Service\Queue\Queues; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\QueueInterface; $queues = new Queues( $queue, // QueueInterface $anotherQueue, // QueueInterface ); var_dump($queues instanceof QueuesInterface); // bool(true) var_dump($queue instanceof QueueInterface); // bool(true)
Lazy Queues
The LazyQueues::class
creates the queues only on demand.
use Tobento\Service\Queue\LazyQueues; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueFactoryInterface; use Tobento\Service\Queue\QueueFactory; use Tobento\Service\Queue\SyncQueue; use Tobento\Service\Queue\NullQueue; use Psr\Container\ContainerInterface; $queues = new LazyQueues( container: $container, // ContainerInterface queues: [ // using a factory: 'primary' => [ // factory must implement QueueFactoryInterface 'factory' => QueueFactory::class, 'config' => [ 'queue' => SyncQueue::class, 'priority' => 100, ], ], // using a closure: 'secondary' => static function (string $name, ContainerInterface $c): QueueInterface { // create queue ... return $queue; }, // or you may sometimes just create the queue (not lazy): 'null' => new NullQueue(name: 'null'), ], ); var_dump($queues instanceof QueuesInterface); // bool(true) var_dump($queue instanceof QueueInterface); // bool(true)
You may check out the Queue Factories to learn more about it.
Queue Factories
Queue Factory
use Tobento\Service\Queue\QueueFactory; use Tobento\Service\Queue\QueueFactoryInterface; use Tobento\Service\Queue\JobProcessorInterface; $factory = new QueueFactory( jobProcessor: $jobProcessor // JobProcessorInterface ); var_dump($factory instanceof QueueFactoryInterface); // bool(true)
Check out the Job Processor to learn more about it.
Create queue
use Tobento\Service\Queue\InMemoryQueue; use Tobento\Service\Queue\NullQueue; use Tobento\Service\Queue\SyncQueue; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueException; $queue = $factory->createQueue(name: 'primary', config: [ // specify the queue you want to create: 'queue' => InMemoryQueue::class, //'queue' => NullQueue::class, //'queue' => SyncQueue::class, // you may specify a priority: 'priority' => 200, ]); var_dump($queue instanceof QueueInterface); // bool(true) // or throws QueueException on failure.
Storage Queue Factory
use Tobento\Service\Queue\Storage\QueueFactory; use Tobento\Service\Queue\QueueFactoryInterface; use Tobento\Service\Queue\JobProcessorInterface; use Tobento\Service\Database\DatabasesInterface; use Psr\Clock\ClockInterface; $factory = new QueueFactory( jobProcessor: $jobProcessor, // JobProcessorInterface clock: $clock, // ClockInterface databases: null, // null|DatabasesInterface ); var_dump($factory instanceof QueueFactoryInterface); // bool(true)
Check out the Job Processor to learn more about it.
Create JsonFileStorage::class
queue
use Tobento\Service\Storage\JsonFileStorage; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueException; $queue = $factory->createQueue(name: 'primary', config: [ // specify the table storage: 'table' => 'queue', // specify the storage: 'storage' => JsonFileStorage::class, 'dir' => 'home/private/storage/', // you may specify a priority: 'priority' => 200, ]); var_dump($queue instanceof QueueInterface); // bool(true) // or throws QueueException on failure.
Create InMemoryStorage::class
queue
use Tobento\Service\Storage\InMemoryStorage; use Tobento\Service\Storage\PdoMySqlStorage; use Tobento\Service\Storage\PdoMariaDbStorage; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueException; $queue = $factory->createQueue(name: 'primary', config: [ // specify the table storage: 'table' => 'queue', // specify the storage: 'storage' => InMemoryStorage::class, // you may specify a priority: 'priority' => 200, ]); var_dump($queue instanceof QueueInterface); // bool(true) // or throws QueueException on failure.
Create PdoMySqlStorage::class
or PdoMariaDbStorage::class
queue
use Tobento\Service\Storage\PdoMySqlStorage; use Tobento\Service\Storage\PdoMariaDbStorage; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueException; $queue = $factory->createQueue(name: 'primary', config: [ // specify the table storage: 'table' => 'queue', // specify the storage: 'storage' => PdoMySqlStorage::class, //'storage' => PdoMariaDbStorage::class, // specify the name of the database used: 'database' => 'name', // you may specify a priority: 'priority' => 200, ]); var_dump($queue instanceof QueueInterface); // bool(true) // or throws QueueException on failure.
Job Processor
The JobProcessor::class
is responsible for processing jobs.
use Tobento\Service\Queue\JobProcessor; use Tobento\Service\Queue\JobProcessorInterface; use Tobento\Service\Queue\JobHandlerInterface; use Psr\Container\ContainerInterface; $jobProcessor = new JobProcessor( container: $container // ContainerInterface ); var_dump($jobProcessor instanceof JobProcessorInterface); // bool(true)
Adding Job Handlers
You may add job handlers for Named Jobs:
use Tobento\Service\Queue\JobHandlerInterface; $jobProcessor->addJobHandler( name: 'sample', handler: SampleHandler::class, // string|JobHandlerInterface );
Example of handler:
use Tobento\Service\Queue\JobHandlerInterface; use Tobento\Service\Queue\JobInterface; final class SampleHandler implements JobHandlerInterface { public function handleJob(JobInterface $job): void { // handle job } }
Failed Job Handler
The FailedJobHandler::class
is responsible for handling failed jobs.
use Tobento\Service\Queue\FailedJobHandler; use Tobento\Service\Queue\FailedJobHandlerInterface; use Tobento\Service\Queue\QueuesInterface; $handler = new FailedJobHandler( queues: $queues, // QueuesInterface ); var_dump($handler instanceof FailedJobHandlerInterface); // bool(true)
After a failed job has exceeded the number of attempts defined with the Retry Parameter, the job will be lost.
You may extend FailedJobHandler::class
and handle the finally failed jobs by using the finallyFailed
method for storing the jobs in a database or simply log them:
use Tobento\Service\Queue\FailedJobHandler; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\JobInterface; use Psr\Log\LoggerInterface; class LogFailedJobHandler extends FailedJobHandler { public function __construct( protected null|QueuesInterface $queues = null, protected null|LoggerInterface $logger = null, ) {} protected function finallyFailed(JobInterface $job, \Throwable $e): void { if (is_null($this->logger)) { return; } $this->logger->error( sprintf('Job %s with the id %s failed: %s', $job->getName(), $job->getId(), $e->getMessage()), [ 'name' => $job->getName(), 'id' => $job->getId(), 'payload' => $job->getPayload(), 'parameters' => $job->parameters()->jsonSerialize(), 'exception' => $e, ] ); } /** * Handle exception thrown by the worker e.g. */ public function handleException(\Throwable $e): void { if (is_null($this->logger)) { return; } $this->logger->error( sprintf('Queue exception: %s', $e->getMessage()), [ 'exception' => $e, ] ); } }
Worker
The Worker::class
processes the queued jobs.
use Tobento\Service\Queue\Worker; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\JobProcessorInterface; use Tobento\Service\Queue\FailedJobHandlerInterface; use Psr\EventDispatcher\EventDispatcherInterface; $worker = new Worker( queues: $queues, // QueuesInterface jobProcessor: $jobProcessor, // JobProcessorInterface failedJobHandler: $failedJobHandler, // null|FailedJobHandlerInterface eventDispatcher: $eventDispatcher, // null|EventDispatcherInterface );
Running Worker
use Tobento\Service\Queue\WorkerOptions; $status = $worker->run( // specify the name of the queue you wish to use. // If null, it uses all queues by its priority, highest first. queue: 'name', // null|string // specify the options: options: new WorkerOptions( // The maximum amount of RAM the worker may consume: memory: 128, // The maximum number of seconds a worker may run: timeout: 60, // The number of seconds to wait in between polling the queue: sleep: 3, // The maximum number of jobs to run, 0 (unlimited): maxJobs: 0, // Indicates if the worker should stop when the queue is empty: stopWhenEmpty: false, ), ); // you may exit: exit($status);
Running Worker Using Commands
Check out the Console and Work Command section if you want to run the worker using commands.
Console
You may using the following commands using the Console Service.
To get quickly started consider using the following two app bundles:
Otherwise, you need to install the Console Service and set up your console by yourself.
Work Command
Running jobs from all queues
php app queue:work
Running jobs from specific queue only
php app queue:work --queue=primary
Available Options
Clear Command
Delete all of the jobs from the queues
php app queue:clear
Delete jobs from specific queues only
php app queue:clear --queue=primary --queue=secondary
Events
Available Events
use Tobento\Service\Queue\Event;
Just make sure you pass an event dispatcher to your worker!
Learn More
Creating Custom Job Parameters
You may create a custom parameter by extending the Parameter::class
:
use Tobento\Service\Queue\Parameter\Parameter; class SampleParameter extends Parameter { // }
Storable parameter
By implementing the JsonSerializable
interface your parameter will be stored and available when handling the job.
use Tobento\Service\Queue\Parameter\Parameter; use JsonSerializable; class SampleParameter extends Parameter implements JsonSerializable { public function __construct( private string $value, ) {} /** * Serializes the object to a value that can be serialized natively by json_encode(). * Will be used to create the parameter by the parameters factory. * So it must much its __construct method. * * @return array */ public function jsonSerialize(): array { return ['value' => $this->value]; } }
Failable interface
By implementing the Failable
interface your can handle failed jobs.
use Tobento\Service\Queue\Parameter\Parameter; use Tobento\Service\Queue\Parameter\Failable; use Tobento\Service\Queue\JobInterface; use Throwable; class SampleParameter extends Parameter implements Failable { /** * Returns the failed job handler. * * @return callable */ public function getFailedJobHandler(): callable { return [$this, 'processFailedJob']; } /** * Process failed job. * * @param JobInterface $job * @param Throwable $e * @param ... any parameters resolvable by your container. * @return void */ public function processFailedJob(JobInterface $job, Throwable $e): void { // } }
Check out the Tobento\Service\Queue\Parameter\Delay::class
to see its implementation.
Poppable interface
By implementing the Poppable
interface you can handle jobs after it is popped from the queue.
use Tobento\Service\Queue\Parameter\Parameter; use Tobento\Service\Queue\Parameter\Poppable; use Tobento\Service\Queue\JobInterface; use Tobento\Service\Queue\QueueInterface; use JsonSerializable; class SampleParameter extends Parameter implements Poppable, JsonSerializable { /** * Returns the popping job handler. * * @return callable */ public function getPoppingJobHandler(): callable { return [$this, 'poppingJob']; } /** * Popping job. * * @param JobInterface $job * @param QueueInterface $queue * @param ... any parameters resolvable by your container. * @return null|JobInterface */ public function poppingJob(JobInterface $job, QueueInterface $queue): null|JobInterface { // called after the job is popped from the queue. // If returning null, the job gets not processed. return $job; } /** * Implemented as the parameter gets stored. Otherwise popping job handler gets not executed. */ public function jsonSerialize(): array { return []; } }
Check out the Tobento\Service\Queue\Parameter\Encrypt::class
to see its implementation.
Processable interface
By implementing the Processable
interface you can handle jobs processing.
use Tobento\Service\Queue\Parameter\Parameter; use Tobento\Service\Queue\Parameter\Processable; use Tobento\Service\Queue\JobInterface; use JsonSerializable; class SampleParameter extends Parameter implements Processable, JsonSerializable { /** * Returns the before process job handler. * * @return null|callable */ public function getBeforeProcessJobHandler(): null|callable { return [$this, 'beforeProcessJob']; // or return null if not required } /** * Returns the after process job handler. * * @return null|callable */ public function getAfterProcessJobHandler(): null|callable { return [$this, 'afterProcessJob']; // or return null if not required } /** * Before process job handler. * * @param JobInterface $job * @return JobInterface */ public function beforeProcessJob(JobInterface $job): JobInterface { return $job; } /** * After process job handler. * * @param JobInterface $job * @return JobInterface */ public function afterProcessJob(JobInterface $job): JobInterface { return $job; } /** * Implemented as the parameter gets stored. Otherwise handlers gets not executed. */ public function jsonSerialize(): array { return []; } }
Check out the Tobento\Service\Queue\Parameter\Duration::class
to see its implementation.
Pushable interface
By implementing the Pushable
interface you can handle jobs before being pushed to the queue.
use Tobento\Service\Queue\Parameter\Parameter; use Tobento\Service\Queue\Parameter\Pushable; use Tobento\Service\Queue\JobInterface; use Tobento\Service\Queue\QueueInterface; class SampleParameter extends Parameter implements Pushable { /** * Returns the pushing job handler. * * @return callable */ public function getPushingJobHandler(): callable { return [$this, 'pushingJob']; } /** * Pushing job. * * @param JobInterface $job * @param QueueInterface $queue * @param ... any parameters resolvable by your container. * @return JobInterface */ public function pushingJob(JobInterface $job, QueueInterface $queue): JobInterface { return $job; } }
Check out the Tobento\Service\Queue\Parameter\PushingJob::class
to see its implementation.
Chunkable Job Example
This example shows a possible way to create a chunkable job using the data parameter to store its process data.
use Tobento\Service\Queue\CallableJob; use Tobento\Service\Queue\Parameter; use Tobento\Service\Queue\QueuesInterface; final class ChunkableJob extends CallableJob { public function handleJob( JobInterface $job, QueuesInterface $queues, // Repository $repository, ): void { if (! $job->parameters()->has(Parameter\Data::class)) { // first time running job: $data = new Parameter\Data([ //'total' => $repository->count(), 'total' => 500, 'number' => 100, 'offset' => 0, ]); $job->parameters()->add($data); } else { $data = $job->parameters()->get(Parameter\Data::class); } $total = $data->get(key: 'total', default: 0); $number = $data->get(key: 'number', default: 100); $offset = $data->get(key: 'offset', default: 0); // Handle Job: //$items = $repository->findAll(limit: [$number, $offset]); $items = range($offset, $number); // For demo we use range foreach($items as $item) { // do sth } // Update offset: $data->set(key: 'offset', value: $offset+$number); // Repush to queue if not finished: if ($offset < $total) { $queues->queue( name: $job->parameters()->get(Parameter\Queue::class)->name() )->push($job); } } public function getPayload(): array { return []; } }