ody / task
Manage Swoole tasks in ODY framework
This package is auto-updated.
Last update: 2025-03-27 20:46:27 UTC
README
Table of Contents
- Task Class
- TaskInterface
- TaskMiddlewareInterface
- TaskGroup Class
- TaskBatch Class
- TaskManager Class
- TaskHandler Class
Task Class
The primary entry point for creating and managing tasks in the system.
Constants
Constant | Type | Value | Description |
---|---|---|---|
PRIORITY_HIGH |
int | 20 | High priority tasks are processed before normal and low priority tasks |
PRIORITY_NORMAL |
int | 10 | Default priority level |
PRIORITY_LOW |
int | 5 | Low priority tasks are processed after high and normal priority tasks |
STATUS_PENDING |
string | 'pending' | Task is waiting to be processed |
STATUS_PROCESSING |
string | 'processing' | Task is currently being processed |
STATUS_COMPLETED |
string | 'completed' | Task has been completed successfully |
STATUS_FAILED |
string | 'failed' | Task has failed |
STATUS_CANCELLED |
string | 'cancelled' | Task has been cancelled |
STATUS_RETRYING |
string | 'retrying' | Task failed but will be retried |
Methods
execute(string $taskClass, array $params = [], int $priority = self::PRIORITY_NORMAL): int
Execute a task immediately.
Parameters:
$taskClass
: The full class name of the task to execute (must implement TaskInterface)$params
: Parameters to pass to the task's handle method$priority
: Priority level for the task
Returns: Task ID
Example:
$taskId = Task::execute(\App\Tasks\SendEmailTask::class, [ 'to' => 'user@example.com', 'subject' => 'Welcome', 'body' => 'Welcome to our service!' ]);
later(string $taskClass, array $params = [], int $delayMs = 1000, int $priority = self::PRIORITY_NORMAL): int
Schedule a task to run after a delay.
Parameters:
$taskClass
: The full class name of the task to execute$params
: Parameters to pass to the task's handle method$delayMs
: Delay in milliseconds before executing the task$priority
: Priority level for the task
Returns: Task ID
Example:
$taskId = Task::later(\App\Tasks\SendReminderTask::class, ['user_id' => 123], 60000 // 60 seconds );
withRetry(string $taskClass, array $params = [], array $retryOptions = ['attempts' => 3, 'delay' => 1000, 'multiplier' => 2], int $priority = self::PRIORITY_NORMAL): int
Execute a task with automatic retry on failure.
Parameters:
$taskClass
: The full class name of the task to execute$params
: Parameters to pass to the task's handle method$retryOptions
: Retry configurationattempts
: Maximum number of attempts (including the first attempt)delay
: Initial delay in milliseconds before retryingmultiplier
: Multiplier for exponential backoff (each retry increases delay by this factor)
$priority
: Priority level for the task
Returns: Task ID
Example:
$taskId = Task::withRetry(\App\Tasks\APIRequestTask::class, ['endpoint' => '/api/data', 'method' => 'GET'], ['attempts' => 5, 'delay' => 1000, 'multiplier' => 2] );
group(string $name = null): TaskGroup
Create a new task group for organizing related tasks.
Parameters:
$name
: Optional name for the group (auto-generated if not provided)
Returns: A new TaskGroup instance
Example:
$group = Task::group('user-onboarding') ->add(\App\Tasks\CreateUserTask::class, ['email' => 'user@example.com']) ->add(\App\Tasks\SendWelcomeEmailTask::class, ['email' => 'user@example.com']);
batch(array $tasks): TaskBatch
Create a batch for processing multiple similar tasks efficiently.
Parameters:
$tasks
: Array of task definitions (strings or arrays with 'class' and 'params' keys)
Returns: A new TaskBatch instance
Example:
$tasks = []; foreach ($users as $user) { $tasks[] = [ 'class' => \App\Tasks\NotifyUserTask::class, 'params' => ['user_id' => $user->id, 'message' => 'System maintenance'] ]; } $batch = Task::batch($tasks);
cancel(int $taskId): bool
Cancel a pending task.
Parameters:
$taskId
: The ID of the task to cancel
Returns: True if the task was cancelled, false otherwise
Example:
$cancelled = Task::cancel($taskId);
status(int $taskId): array
Get the current status of a task.
Parameters:
$taskId
: The ID of the task
Returns: Array with task status information
Example:
$status = Task::status($taskId); // Returns: [ // 'id' => 123, // 'class' => '\App\Tasks\SendEmailTask', // 'status' => 'completed', // 'attempts' => 1, // 'created_at' => 1615480245.7843, // 'started_at' => 1615480245.8421, // 'completed_at' => 1615480246.0124, // 'execution_time' => 0.1703, // 'result' => [...], // 'error' => null // ]
TaskInterface
Interface that all task classes must implement.
Methods
handle(array $params = [])
Process the task.
Parameters:
$params
: Parameters passed to the task
Returns: Any value, which will be available in the task result
Example:
class SendEmailTask implements TaskInterface { public function handle(array $params = []) { // Send email logic here $result = mail($params['to'], $params['subject'], $params['body']); return [ 'success' => $result, 'to' => $params['to'], 'sent_at' => date('Y-m-d H:i:s') ]; } }
TaskMiddlewareInterface
Interface for creating task middleware.
Methods
process(array $task): array
Process the task before it's enqueued.
Parameters:
$task
: Array containing task informationparams
: The task parameters
Returns: Modified task array
Example:
class LoggingMiddleware implements TaskMiddlewareInterface { public function process(array $task): array { // Add logging metadata to the task $task['params']['__meta'] = [ 'logged_at' => time(), 'request_id' => $_SERVER['REQUEST_ID'] ?? uniqid() ]; return $task; } }
TaskGroup Class
Manages a group of related tasks.
Methods
__construct(?string $name = null)
Create a new task group.
Parameters:
$name
: Optional name for the group (auto-generated if not provided)
add(string $taskClass, array $params = [], int $priority = Task::PRIORITY_NORMAL): self
Add a task to the group.
Parameters:
$taskClass
: The full class name of the task to execute$params
: Parameters to pass to the task's handle method$priority
: Priority level for the task
Returns: $this (for method chaining)
concurrency(?int $limit): self
Set a concurrency limit for the group.
Parameters:
$limit
: Maximum number of tasks in the group that can run simultaneously (null for no limit)
Returns: $this (for method chaining)
allowFailures(bool $allow = true): self
Configure whether failures in one task should stop other tasks in the group.
Parameters:
$allow
: If true, other tasks will continue even if some tasks fail
Returns: $this (for method chaining)
middleware($middleware): self
Add middleware to all tasks in this group.
Parameters:
$middleware
: Middleware class name or callable
Returns: $this (for method chaining)
dispatch(): array
Execute all tasks in the group.
Returns: Array of task IDs
wait(int $timeout = 30000): array
Wait for all tasks in the group to complete.
Parameters:
$timeout
: Maximum time to wait in milliseconds
Returns: Array of task results, indexed by task ID
cancel(): bool
Cancel all pending tasks in the group.
Returns: True if all tasks were cancelled, false otherwise
TaskBatch Class
Efficiently processes multiple similar tasks.
Methods
__construct(array $tasks)
Create a new task batch.
Parameters:
$tasks
: Array of task definitions (strings or arrays with 'class' and 'params' keys)
dispatch(int $priority = Task::PRIORITY_NORMAL): array
Execute all tasks in the batch.
Parameters:
$priority
: Priority level for all tasks in the batch
Returns: Array of task IDs
wait(int $timeout = 30000): array
Wait for all tasks in the batch to complete.
Parameters:
$timeout
: Maximum time to wait in milliseconds
Returns: Array of task results, indexed by task ID
cancel(): bool
Cancel all pending tasks in the batch.
Returns: True if all tasks were cancelled, false otherwise
TaskManager Class
Core class managing task queues and processing.
Methods
getInstance(): TaskManager
Get the singleton instance of the TaskManager.
Returns: TaskManager instance
setServer(Server $server): void
Set the Swoole server instance.
Parameters:
$server
: Swoole server instance
registerMiddleware($middleware): self
Register global middleware that applies to all tasks.
Parameters:
$middleware
: Middleware class name or callable
Returns: $this (for method chaining)
enqueue(string $taskClass, array $params = [], int $priority = Task::PRIORITY_NORMAL): int
Add a task to the queue for immediate execution.
Parameters:
$taskClass
: The full class name of the task to execute$params
: Parameters to pass to the task's handle method$priority
: Priority level for the task
Returns: Task ID
enqueueDelayed(string $taskClass, array $params = [], int $delayMs = 1000, int $priority = Task::PRIORITY_NORMAL): int
Add a task to be executed after a delay.
Parameters:
$taskClass
: The full class name of the task to execute$params
: Parameters to pass to the task's handle method$delayMs
: Delay in milliseconds before executing the task$priority
: Priority level for the task
Returns: Task ID
enqueueWithRetry(string $taskClass, array $params = [], array $retryOptions = ['attempts' => 3, 'delay' => 1000, 'multiplier' => 2], int $priority = Task::PRIORITY_NORMAL): int
Add a task with retry options.
Parameters:
$taskClass
: The full class name of the task to execute$params
: Parameters to pass to the task's handle method$retryOptions
: Retry configurationattempts
: Maximum number of attempts (including the first attempt)delay
: Initial delay in milliseconds before retryingmultiplier
: Multiplier for exponential backoff
$priority
: Priority level for the task
Returns: Task ID
getNextTask(): ?array
Get the next task from the highest priority queue.
Returns: Task data array or null if queues are empty
addTaskToGroup(int $taskId, string $groupName): void
Add a task to a group.
Parameters:
$taskId
: The ID of the task$groupName
: The name of the group
addTaskToBatch(int $taskId, string $batchId): void
Add a task to a batch.
Parameters:
$taskId
: The ID of the task$batchId
: The ID of the batch
waitForGroup(string $groupName, int $timeout = 30000): array
Wait for all tasks in a group to complete.
Parameters:
$groupName
: The name of the group$timeout
: Maximum time to wait in milliseconds
Returns: Array of task results, indexed by task ID
waitForBatch(string $batchId, int $timeout = 30000): array
Wait for all tasks in a batch to complete.
Parameters:
$batchId
: The ID of the batch$timeout
: Maximum time to wait in milliseconds
Returns: Array of task results, indexed by task ID
cancelTask(int $taskId): bool
Cancel a specific task.
Parameters:
$taskId
: The ID of the task to cancel
Returns: True if the task was cancelled, false otherwise
cancelGroup(string $groupName): bool
Cancel all tasks in a group.
Parameters:
$groupName
: The name of the group
Returns: True if all tasks were cancelled, false otherwise
cancelBatch(string $batchId): bool
Cancel all tasks in a batch.
Parameters:
$batchId
: The ID of the batch
Returns: True if all tasks were cancelled, false otherwise
getTaskStatus(int $taskId): array
Get the status of a task.
Parameters:
$taskId
: The ID of the task
Returns: Array with task status information
getMetrics(): array
Get metrics for monitoring.
Returns: Array with metrics:
total_tasks
: Total number of tasks createdcompleted_tasks
: Number of successfully completed tasksfailed_tasks
: Number of failed tasksretried_tasks
: Number of retried taskscancelled_tasks
: Number of cancelled tasksaverage_execution_time
: Average task execution time in seconds
TaskHandler Class
Integrates the TaskManager with Swoole.
Methods
init(Server $server): void
Initialize the task handler with the server.
Parameters:
$server
: Swoole server instance
Example:
$server = new Swoole\Server('0.0.0.0', 9501); $server->set([ 'worker_num' => 4, 'task_worker_num' => 8, 'task_enable_coroutine' => true, ]); SwooleTaskHandler::init($server);
handleTask(array $data)
Handle a task execution.
Parameters:
$data
: Task data array
Returns: The result from the task's handle method, or an error array