backplane / leptir
Background task processor
Installs: 2 420
Dependents: 0
Suggesters: 0
Security: 0
Stars: 2
Watchers: 7
Forks: 1
Open Issues: 0
Requires
- php: >=5.4.0
- zendframework/zend-config: 2.*
- zendframework/zend-http: 2.*
- zendframework/zendframework: 2.*
Requires (Dev)
- amazonwebservices/aws-sdk-for-php: dev-master
- aws/aws-sdk-php: *
- phpunit/phpunit: 3.7.*
- predis/predis: 1.1.*@dev
This package is not auto-updated.
Last update: 2016-02-14 07:47:10 UTC
README
Leptir is a asynchronous, highly scalable task processor which can use multiple brokers(queues) in parallel. Different priorities can be defined for different brokers.
Leptir configuration
return array( 'leptir' => array( 'brokers' => array( array( 'type' => 'mongodb', 'connection' => array( 'collection' => 'tasksp1', 'host' => 'localhost', 'port' => 27017, 'database' => 'leptir', 'options' => array( 'secondaryPreferred' => true ) ), 'configuration' => array( 'priority' => 1, 'task_count_caching_time' => 0.6 ) ), array( 'type' => 'mongodb', 'connection' => array( 'collection' => 'tasksp2' // default values will be used ), 'configuration' => array( 'priority' => 2 ) ) ), 'loggers' => array( // loggers configuration explained later ), 'daemon' => array( 'configuration' => array( 'task_execution_time' => 600, 'number_of_workers' => 4, 'empty_queue_sleep_time' => 0.5, 'workers_active_sleep_time' => 0.2 ) ), 'meta_storage' => array( array( //storage configuration ), array( // storage configuraion ) ), 'error_reporting' => array( array( 'class' => 'Application\ErrorReporting\ErrorReporting' ) ) ) )
Explaination of leptir configuration parameters:
task_execution_time
- Maximum task execution time. If task doesn't finish in that time it will be terminated. Maximum execution time can be overrided for every task individually when pushing to the broker queue.number_of_workers
- Maximum number of active workers per leptir instance.empty_queue_sleep_time
- Time to sleep when queue is empty (0 - no sleep) --- reduces number of queries for broker. (seconds, floating point supported)workers_active_sleep_time
- Time to sleep when all the workers are busy. (0 - no sleep) --- reduces number of queries for broker. (seconds, floating point supported)
Parameters empty_queue_sleep_time
and workers_active_sleep_time
are used to reduce number of queries for broker (more usefull for SQS broker where number of queries affect the price).
Configuration can contain definition for multiple brokers which will be used at the same time. Brokers also can have different priorities. Brokers with lower priority index will have higher chance that their task will be choosen.
Configuration can also contain definition for multiple task meta information storage.
Brokers
Brokers are esentially queues which are holding unprocessed tasks. Every broker has scheduling support. Priority is assigned to every broker as well (default priority is 0). Currently supported brokers:
- MongoDB
- Amazon SQS
- Redis
Tasks can be prioritized by using multiple brokers with different priority levels.
Example of broker usage:
$broker = new Broker($brokersConfiguration); $task = new TestTask( array( 'param1' => 'First parameter' 'paramFloat' => 3.89, 'randomIntParam' => 10 ) ); $broker->pushTask($task, null, 1);
Broker methods:
public function pushTask(BaseTask $task, \DateTime $timeOfExecution = null, $priority = 0, $timeLimit = 0)
Method is used to push a new task into the queue.
Every broker has couple of configuration options:
priority
: Broker priority level (lower number means higher priority)task_count_caching_time
- Caching time for information about current number of tasks in the queue. Used to reduce number of request to database/SQS.
Configuration example:
array( 'type' => 'mongodb', 'connection' => array( 'host' => 'localhost', 'port' => 27017, 'database' => 'leptir' 'collection' => 'tasks', 'options' => array( 'connect' => true, 'secondaryPreferred' => true ) ), 'configuration' => array( 'priority' => 0 ) )
This configuration is also a default one. Default configuration is merged with user defined one before broker is created.
Amazon SQS brokerConfiguration example:
array( 'type' => 'sqs', 'connection' => array( 'sqs_key' => 'SQS KEY', 'sqs_secret' => 'SQS SECRET', 'sqs_queue' => 'QUEUE URI' ), 'configuration' => array( 'priority' => 0 ) )Redis broker
Configuration example:
array( 'type' => 'redis', 'connection' => array( 'scheme' => 'tcp', 'host' => 'localhost', 'port' => 6379, 'database' => 0, 'key' => 'leptir:ztasks' ), 'configuration' => array( 'priority' => 0 ) )
This configuration is also a default one.
Task Meta Storage
Used to store information about executed tasks. Multiple meta info storages can be defined.
Currently supported storages:
- MongoDB
- Redis
Task information format:
{ "status" : 3, "retC" : 1, "exTime" : 0.0005939006805419922, "respM" : "Test task done.", "type" : "Leptir\\Task\\Test\\TestTask", "exStart" : ISODate("2013-12-04T02:43:35Z"), "_id" : "10577529e96d5eb0225.50827478" }
status
- Current status of the task (1 - pending, 2 - in progress, 3 - completed)retC
- Status code returned by task (1 - success, 2 - warning, 3 - error, 4 - unknown, 5 - time limit exceeded)exTime
- execution timerespM
- Task's response messagetype
- Task class nameexStart
- Task execution start time_id
- unique task id (generated by broker)
Configuration:
array( 'connection' => array( 'host' => 'localhost', 'port' => 27017, 'database' => 'leptir' 'collection' => 'info' 'options' => array( // mongo connection options ) ), 'configuration' => array( ) )Redis meta storage
Redis meta storage is storing key-value pair information about every task where key is task id and value is JSON encoded task information array.
Configuration:
array( 'connection' => array( 'scheme' => 'tcp', 'host' => 'localhost', 'port' => 6379, 'database' => 0 ), 'configuration' => array( 'expire_after_seconds' => 86400 ) )
Loggers
Loggers for Leptir daemon can be defined in configuration file. Available loggers:
- File logger
- STDOUT logger
- STDERR logger
Leptir is using Zend\Logging library to generate logs. Configuration examples:
'loggers' => array( 'logfile' => array( 'type' => 'file', 'options' => array( 'path' => '/var/log/leptir.log' ) ), 'stdoutlog' => array( 'type' => 'stdout' ) )
logfile
and stdoutlog
are names (used only for readibility). STDOUT and STDERR loggers don't require additional options. Logfile path
is mandatory for file logger.
Error reporting
Leptir will catch all the exceptions and errors and include this information into meta information. Sometimes there's a need to report every error to the third party service for easier tracking. We can define multiple error reporting implementations and configure leptir to forward every error and exception using these implementations. Every implementation of error reporting object must implement Leptir\ErrorReport\ErrorReportInterface
.
Configuration example:
'error_reporting' => array( array( 'class' => 'Application\ErrorReporting\ErrorReporting' ) )
ErrorReportingInterface contains methos:
reportException(\Exception $ex)
reportMessage($message)
addErrorData($errorData)
ErrorData (additional error information) is not automatically cleared. Error reporting implementation needs to clear data for each report manually if needed.
Tasks
Every task has to extends abstract class Leptir\Task\BaseTask
. Mandatory method that needs to be implemented is protected doJob
method.
Simple task example:
<?php namespace Leptir\Task\Test; use Leptir\Task\BaseTask; /** * Simple task used for testing purpose. This task will not anything smart, it will just sleep for * random amount of seconds (between 6 and 19) * * Class SlowTask * @package Leptir\Task\Test */ class SlowTask extends BaseTask { protected function doJob() { $sleepTime = rand(6, 19); $this->logInfo('Sleeping for '. $sleepTime); sleep($sleepTime); $this->addResponseLine('Task had a great nap'); return self::EXIT_SUCCESS; } }
Additional methods that can be overrided for extended control over task execution.
protected function beforeStart()
- this method will be executed before doJob methodprotected function afterFinish()
- this method will be executed after doJob methodpublic function getAdditionalMetaInfo()
- method should return associative array of additional fields to save as task information (task info backend section described before)
There are also several methotds implemented in BaseTask class that can be used while implementing custom tasks:
- Methods for fetching parameters passed to task:
protected function getInt($paramName, $defaultValue = null)
protected function getString($paramName, $defaultValue = null)
protected function getFloat($paramName, $defaultValue = null)
protected function getArray($paramName, $defaultValue = null)
Methods will throw Leptir\Exception\LeptirInputException when parameter type doesn't match requested one.
- Current execution time - somethinmes task implementation requires task execution time (to make an action if running time is to long or something similar)
protected function getExecutionTime()
Method will return float number represending the execution time.
- Logging methods
protected function logInfo($message)
protected function logError($message)
protected function logWarning($message)
protected function logDebug($message)
- Execution flow methods - task execution flow can be tracked - helps writing unittest for user written tasks
changeState($state)
- change current state of the executiongetLastState()
- get last state task was ingetExecutionFlow()
- returns an array with list of states task was in while running
Command line actions
Starting leptirphp index.php leptir start [--daemon] [--config=] [--pid=]
Leptir can be started as daemon. Only file loggers will be active when leptir is running as a daemon.
Stopping leptirphp index.php leptir stop [--pid=]
Command used to stop leptir process.
Install leptir as a servicephp index.php leptir install [--config=] [--pid=] [--php_path=]
Command used to install leptir as a service (creates /etc/ini.d/leptir
file). File content:
#!/bin/bash PID_PATH='{{PID_PATH}}' case "$1" in start) # check if PID file exists if [ -f $PID_PATH ]; then pid="`cat $PID_PATH`" if ps $pid > /dev/null then echo -e "\e[31mLeptir is already flying on this box.\e[0m" exit 1 else echo -e "\e[33m.pid file is there, but process is not running. Cleaning .pid file and starting the process.\e[0m" rm -f "$PID_PATH" fi fi echo -e "\e[32mStarting a little butterfly. Fly buddy, fly!\e[0m" {{PHP_PATH}}php {{ROOT_PATH}}/public/index.php leptir start --config={{CONFIG_PATH}} --daemon --pid $PID_PATH ;; stop) echo -ne "Stopping a little butterfly. You'll have to wait for all the tasks to finish though.\n" {{PHP_PATH}}php {{ROOT_PATH}}/public/index.php leptir stop --pid $PID_PATH while [ -f $PID_PATH ]; do sleep 1 echo -ne "." done echo ;; *) echo "Usage: $0 (start|stop)" exit 1 esac exit 0