2tvenom / php-queue
PHP Queue for task execution
Requires
- php: >=5.4.0
This package is auto-updated.
Last update: 2025-01-08 22:42:58 UTC
README
The PhpQueue library provides queue for execution php scripts. This code has been developed and maintained by Ven from August 2013 to October 2013.
You can send comments, patches, questions here on github or to 2tvenom@gmail.com
- Queue
- Task
- Task and sub tasks
- Error in task and sub tasks
- Callbacks and error callback
- Queue drivers
- Autoloader
- Web interface
##Queue ###Simple queue
###Adding task to queue
PhpQueue have components: Queue, QueueDriver, TaskPerformer and Task.
#####Example
<? use PhpQueue\Queue; use PhpQueue\Drivers\SqlPdoDriver; use PhpQueue\TaskPerformer; use PhpQueue\Task; $pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", ""); $driver = new SqlPdoDriver($pdo); $queue = new Queue($driver); $task = new Task("Job"); $queue->add_task($task); ?>
###Get task from queue and execute
Steps of getting task from the queue and execute. Queue return false
If queue not have task.
- Connect to queue
- Get task from queue
- Queue set task status In Process
- Execute task
- Modify task status in queue (Warning! This step is required)
#####Example
<? $pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", ""); $driver = new SqlPdoDriver($pdo); $queue = new Queue($driver); $task_performer = new TaskPerformer(); $task = $queue->get_task(); //in queue this task have status "In process" $task = $task_performer->execute_task($task); $queue->modify_task($task); ?>
Task
Create task
Task name this is name of class implemented in PhpQueue\Interfaces\IJob
#####Example
<? class Job implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { return 2*2; } } $task = new PhpQueue\Task("Job"); ?>
Request data
Task can receive input data. This is one object, array or scalar value.
#####Example
<? class Job implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { return $task->get_request_data() * 2; } } ?>
You can set data in constructor $task = new PhpQueue\Task("Job", 100);
or by method $task->set_request_data(100);
###Select task from queue
###Priority Tasks in queue sorted by priority. All tasks by default have zero priority. If the tasks have the same priority, then to order by id desc.
Set priority: set_priority($priority)
. Priority must be integer.
Get priority: get_priority()
####Example
<? $task = new PhpQueue\Task("Job"); $queue->add_task($task); $super_task = new PhpQueue\Task("SuperJob"); $super_task->set_priority(10); $queue->add_task($super_task);
####Result
First performed task will SuperJob
, next Job
###Task group id
It is possible divide the queue to several queues by task_group_id
.
By default all tasks have group id is 0
.
Set task group id: set_task_group_id($group_id)
. Group id must be integer.
Get priority: get_task_group_id()
####Example Adding tasks
<? $task1 = new PhpQueue\Task("Job1"); $task1->set_task_group_id(1); $queue->add_task($task1); $task2 = new PhpQueue\Task("Job2"); $task2->set_task_group_id(2); $queue->add_task($task2);
Get task
<? $task1 = $queue->get_task(array( TaskConst::TASK_GROUP_ID => 1 )); $task2 = $queue->get_task(array( TaskConst::TASK_GROUP_ID => 2 )); $task3 = $queue->get_task();
####Result
task1 is Job1
task2 is Job2
task3 is false
Execution date
Task will execute after
this date.
Set execution date: set_execution_date($date)
. Date format is Y-m-d H:i:s
Get date: get_execution_date()
####Example
<? $task = new Task("JobByDate"); $task->set_execution_date(date('Y-m-d H:i:s', strtotime('now') + 10)); $queue->add_task($task); $task1 = $queue->get_task(); sleep(15); $task2 = $queue->get_task();
####Result
task1 is false
task2 is JobByDate
Unique id
All tasks have unique id. Generated when the an object is created. Unique id is string of 32 characters (md5).
Select from queue by unique id not set task status to In process
Get unique id of task: get_uniqid()
.
####Example Add to queue
<? $task = new Task("Job"); $queue->add_task($task); $uniqid = $task->get_uniqid();
####Result
String: a8a042ffabf5230dfdfa0a2cf9d47110
Select by unique id
<? $task = $queue->get_task(array( TaskConst::UNIQID => "a8a042ffabf5230dfdfa0a2cf9d47110" ));
####Result
$task is Task Job
with unique id a8a042ffabf5230dfdfa0a2cf9d47110
.
Task still have status New
in queue
##Task and sub tasks
###Sub tasks
PhpQueue have two type of tasks:
- Simple task
- Task with sub tasks
Task can have sub tasks with one nested level.
####Example Adding sub tasks
<? $task = new Task("JobWithSubTasks"); $task ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); $queue->add_task($task);
Performer will execute only sub tasks.
####Example Execute sub tasks
for($i=0; $i<3; $i++){ $task = $queue->get_task(); $task = $task_performer->execute_task($task); $queue->modify_task($task); }
####Result Execute steps:
- Execute
Job1
. Status done - Execute
Job2
. Status done - Execute
Job3
. Status done JobWithSubTasks
status done. Parent task not perform.
Exclusive tasks
By default task have exclusive status false
.
Non exclusive
Subtasks of task can execute all Task Performers.
#####Example Add task
<? $task = new Task("NonExclusiveTask"); $task ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); $queue->add_task($task);
On three servers (server1, server2, server3) started TaskPerformers
<? $performer_name = php_uname('n'); $task_performer = new TaskPerformer($performer_name); $task = $queue->get_task(); $task = $task_performer->execute_task($task); $queue->modify_task($task);
####Result
NonExclusiveTask task status is DONE
Job1
performed byserver1
. StatusDONE
Job2
performed byserver2
. StatusDONE
Job3
performed byserver3
. StatusDONE
Get performer name: get_performer()
. By default performer name is Default_Performer
Exclusive
Subtasks of task can execute only one Task Performer.
Set exclusive: set_exclusive(true)
#####Example Add task
<? $task = new Task("ExclusiveTask"); $task ->set_exclusive(true) ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); $queue->add_task($task);
On three servers (server1, server2, server3) started TaskPerformers
<? $performer_name = php_uname('n'); $task_performer = new TaskPerformer($performer_name); $task = $queue->get_task( array(TaskConst::PERFORMER => $performer_name) ); $task = $task_performer->execute_task($task); $queue->modify_task($task);
####Result
ExclusiveTask task status is DONE
Job1
performed byserver1
. StatusDONE
Job2
performed byserver1
. StatusDONE
Job3
performed byserver1
. StatusDONE
server2
receivefalse
from queueserver3
receivefalse
from queue
####Warning
####All TaskPerformers must have unique name when you use exclusive tasks.
If you not made it you can break logic of queue. Queue and performer not receive second sub task after first performed if not add in queue request get_task()
TaskConst::PERFORMER unique name.
Task response, log, parent
Response
Task response is return data of run
method
####Example
<? $task = new Task("ParentJob"); $task ->set_exclusive(true) ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); class Job1 implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { $sub_task_request_data = $task->get_request_data(); return $sub_task_request_data + 5; } }
####Result
Get response: $task->response()->get_response()
Job1
response: 10Job2
response: 15Job3
response: 20
Log
Logging in task ####Example
<? class Job1 implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { $arg1 = 1; $task->response()->set_log('Step 1'); $arg2 = 2; $task->response()->set_log('Step 2'); return $arg1+$arg2; } }
####Result
<? $logs = $task->response()->get_log();
Return: array array('Step 1', 'Step2');
Access to parent task
Method $task->parent_task()
####Example
<? $task = new Task("ParentTask", 5); $task ->set_exclusive(true) ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); class Job1 implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { $sub_task_request_data = $task->get_request_data(); $parent_request_data = $task->parent_task()->get_request_data(); return $sub_task_request_data * $parent_request_data; } } class Job2 extends Job1 {} class Job3 extends Job1 {}
####Result Response
Job1
response: 25Job2
response: 50Job3
response: 75
Error in task and sub tasks
Error in simple task
After first exception task got status: ERROR
. You can set maximum error trials for task.
Set maximum error trials $task->settings()->set_error_max_trial($trials)
. Where $trials
is maximum trials count (int).
Get current trial number $task->settings()->get_trial()
. Return: int, number of current trial.
####Example
<? $task = new Task("ExceptionTask"); $task->settings()->set_error_max_trial(5); class ExceptionTask implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { if($task->settings()->get_trial() < 3) { throw new Exception("Test exception"); } return 100; } }
####Result Response
- Error. Status "New"
- Error. Status "New"
- Done.
Error in task with sub tasks
By default after last error trial parent task got status ERROR
.
If need continue execute sub tasks need set error break
to false
Set error break flag $task->settings()->set_error_break($flag)
. Where $flag
is bool.
####Example
<? $task = new Task("TaskWithOneExceptionSubTask"); $task-> subtasks() ->add(new \PhpQueue\Task("Job1")) ->add(new \PhpQueue\Task("ExceptionTask")) ->add(new \PhpQueue\Task("Job2")); class ExceptionTask implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { throw new Exception("Test exception"); } } class Job1 implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { return true; } } class Job2 extends Job1 {}
####Result with error break flag = false
Job1
statusDone
ExceptionTask
statusError
TaskWithOneExceptionSubTask
statusError
####Result with error break flag = true
Job1
statusDone
ExceptionTask
statusError
Job2
statusDone
TaskWithOneExceptionSubTask
statusDone with Error
Nested settings
Settitng in parent task cover settings in sub tasks. Settings in sub task override settings of parent task.
####Example
<? $task = new Task("ParentJob"); $task->settings()->set_error_max_trial(5); $job1 = new \PhpQueue\Task("Job1"); $job2 = new \PhpQueue\Task("Job2"); $job2->settings()->set_error_max_trial(2); $job3 = new \PhpQueue\Task("Job3"); $task->subtasks()->add(array($job1, $job2, $job3,));
####Result
Job1
max trials 5Job1
max trials 2Job1
max trials 5
Error log
Get exception log from task: $task->response()->get_error()
Return: array
Callbacks and error callback
Callback
Callback execute after task.
PhpQueue have two types of callbacks:
- Callback
PhpQueue\Interfaces\ICallback
- Error callback
PhpQueue\Interfaces\IErrorCallback
Example
<? $task = new Task("TaskWithCallback"); $task->set_callback("SimpleCallback"); class SimpleCallback implements PhpQueue\Interfaces\ICallback { public static function callback_run(Task $task) { echo "Callback"; } }
Result
TaskWithCallback executed and "Callback" displayed on the screen
Error callback
Error callback executed after error in task
<? $task = new Task("TaskWithError"); $task ->set_callback("SimpleCallback") ->set_error_callback("ErrorCallback"); class TaskWithError implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { throw new Exception("Test exception"); } } class ErrorCallback implements PhpQueue\Interfaces\IErrorCallback { public static function callback_error_run(Task $task) { echo "ERROR!"; } }
Result
TaskWithCallback executed and "ERROR!" displayed on the screen
Callbacks of Parent task
Sub tasks and parent tasks can have callbacks. First executed subtasks callbacks, after parent callback.
Example
<? $task = new Task("TaskWithSubTasks"); $task->set_callback("ParentCallback") $job1 =new \PhpQueue\Task("Job"); $job1->set_callback("Callback"); $job2 =new \PhpQueue\Task("Job"); $job3 =new \PhpQueue\Task("Job"); $job4 =new \PhpQueue\Task("Job"); $job4->set_callback("Callback") $task->subtasks()->add(array($job1, $job2, $job3, $job4)); class Job implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { echo "Job"; } } class Callback implements PhpQueue\Interfaces\ICallback { public static function callback_run(Task $task) { echo "Callback"; } }
Result
Displayed on screen:
Job
Callback
ParentCallback
Job
ParentCallback
Job
ParentCallback
Job
Callback
ParentCallback
Global callback
Task performer cah Callback and Error callback
Example
<? $task = new Task("TaskWithException"); class TaskWithException implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { throw new Exception("Test exception"); } } class ErrorCallback implements PhpQueue\Interfaces\IErrorCallback { public static function callback_error_run(Task $task) { echo "GLOBAL ERROR!"; } } $task_performer = new TaskPerformer(); $task_performer->set_global_error_callback("ErrorCallback"); $task_performer->execute_task($task);
Result
Displayed on screen "GLOBAL ERROR!"
Errors in callback
After arror in callback task got status: CALLBACK ERROR
without execution other callbacks.
Callback execution priority
Callback:
- Sub task callback/error callback
- Parent callback/error callback
- Global callback/error callback
Queue drivers
PhpQueue support:
- MySql
- PostgreSQL
- SQLite
- File
In future: Mongo, Redis
SqlPdoDriver
Requirement: PDO
Example
Create MySQL based queue
<? $pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", ""); $driver = new SqlPdoDriver($pdo); $queue = new Queue($driver);
FileDriver
Requirement: SimpleXMLElement
Example
Create file based queue
<? $driver = new SqlPdoDriver('queue_folder'); $queue = new Queue($driver);
Autoloader
Example
<? require_once __DIR__ . "/../src/PhpQueue/AutoLoader.php"; use PhpQueue\AutoLoader; AutoLoader::RegisterDirectory(array('Callbacks', 'Tasks/Example')); AutoLoader::RegisterNamespaces(array('PhpQueue' => '../src/PhpQueue')); AutoLoader::RegisterAutoLoader();
Web interface
Installation
Just copy queue files from folder web
to web server folder.
Driver connection
In file web/index.php
find this line:
<? $web_driver = include(DRIVERS_PATH . "SqlWebDriver.php");
and change path to file with driver (if need it).
###Driver connection property
Connection property in driver file in folder drivers
.
####Example Connection property of SqlWebDriver
<? define("QUEUE_HOST", "localhost"); define("QUEUE_DATABASE", "queue"); define("QUEUE_USER", "root"); define("QUEUE_PASSWORD", ""); define("QUEUE_TABLE", "queue_tasks");
Customisation
You can customise web interface
PhpQueue have 4 types of web interface:
- List
- Details
- Log
- Error log
For customisation of task need create a folder with the name of the task.
####Example
<? //Task class class JobLog implements Interfaces\IJob { public static function run(Task $task) { for($i=0; $i<6; $i++) { $task->response()->set_log(ceil(rand(0, 100) / 10)); } return $task->get_request_data()*2; } } // /web/templates/task_render/JobLog/log.php foreach($list as $_id => $log_string){ $performed = (int)$log_string; $not_performed = 10 - $performed; echo "Fork {$_id} <span style='color:#009944'>" . str_repeat("#", $performed) . "</span><span style='color:#ff0000'>" . str_repeat("#",$not_performed) . "</span><br>"; } // /web/templates/task_render/JobLog/list.php /** * @var array $task */ use PhpQueue\Task; use PhpQueue\TaskConst; ?> <div class="row"> <div class="col-md-12"> <span class="pull-right label label-<?= TaskModel::$class_by_status[$task[TaskConst::STATUS]] ?>"> <?= TaskModel::$status_text[$task[TaskConst::STATUS]] ?> <? if ($task[TaskConst::STATUS] == Task::STATUS_IN_PROCESS && $task[TaskConst::SUBTASKS_QUANTITY] > 0) { ?> <?= round(100 - (int)$task[TaskConst::SUBTASKS_QUANTITY_NOT_PERFORMED] / ((int)$task[TaskConst::SUBTASKS_QUANTITY] / 100)); ?>% <? } ?> </span> <h1 class="list-group-item-heading" style="text-align: center; color: #3a87ad"> <?= $task[TaskConst::TASK_NAME] ?> </h1> </div> </div>
Result
Custom list
Custom log