salesrender/plugin-component-queue

SalesRender plugin queue abstract component

Installs: 1 014

Dependents: 3

Suggesters: 0

Security: 0

Stars: 0

Watchers: 2

Forks: 0

Open Issues: 0

pkg:composer/salesrender/plugin-component-queue

0.3.5 2023-11-09 13:59 UTC

This package is auto-updated.

Last update: 2026-02-13 20:58:44 UTC


README

Abstract queue component for the SalesRender plugin ecosystem. Provides a task queue with retry logic, built on top of Symfony Console and Symfony Process. Tasks are persisted in the database via plugin-component-db and executed as background child processes through CLI commands.

Installation

composer require salesrender/plugin-component-queue

Requirements

Requirement Version
PHP >= 7.4.0
ext-json *
symfony/console ^5.3
symfony/process ^5.3
salesrender/plugin-component-db ^0.3.8
xakepehok/path ^0.2.1
khill/php-duration ^1.1

Architecture overview

The queue operates as a two-command pattern:

  1. QueueCommand -- a long-running daemon that polls the database for pending tasks and spawns background child processes (one per task) via symfony/process.
  2. QueueHandleCommand -- invoked by the child process to load a single task by ID and execute the actual business logic.

Tasks are persisted as database models (extending Task) with built-in retry tracking (TaskAttempt). The queue command runs in a loop until memory usage exceeds the configured limit.

Key classes

TaskAttempt

Tracks retry state for a task.

Namespace: SalesRender\Plugin\Components\Queue\Models\Task

Method Signature Description
__construct __construct(int $limit, int $interval) Create attempt tracker with max retries and interval (seconds) between attempts
getLastTime getLastTime(): ?int Unix timestamp of last attempt, or null if never attempted
getNumber getNumber(): int Current attempt number (starts at 0)
getLimit getLimit(): int Maximum number of attempts allowed
setLimit setLimit(int $limit): void Override the attempt limit
getInterval getInterval(): int Interval in seconds between retries
setInterval setInterval(int $interval): void Override the retry interval
getLog getLog(): string Log message from the last attempt
attempt attempt(string $log): void Record an attempt: increments number, sets lastTime to time(), saves log
isSpent isSpent(): bool Returns true when all attempts have been exhausted (number >= limit)

Task (abstract)

Abstract database model representing a queued task. Extends Model from plugin-component-db.

Namespace: SalesRender\Plugin\Components\Queue\Models\Task

Method Signature Description
__construct __construct(TaskAttempt $attempt) Generates UUID, captures current PluginReference from Connector, sets createdAt
getPluginReference getPluginReference(): ?PluginReference Returns the plugin reference (companyId, alias, pluginId) or null
schema static schema(): array Database schema with fields: companyId, pluginAlias, pluginId, createdAt, attemptLastTime, attemptNumber, attemptLimit, attemptInterval, attemptLog

Inherited from Model: save(), delete(), findById(), findByCondition(), freeUpMemory(), tableName().

QueueCommand (abstract)

Long-running daemon command that polls for tasks and spawns child processes.

Namespace: SalesRender\Plugin\Components\Queue\Commands

Method Signature Description
__construct __construct(string $name, int $limit, int $maxMemoryInMb = 25) Sets command name to {name}:queue, concurrency limit, and max memory
findModels abstract findModels(): ModelInterface[] Must be implemented. Return array of task models ready to be processed
handleQueue handleQueue(ModelInterface $model): bool Spawns a child process: php console.php {name}:handle {id}
startedLog startedLog(ModelInterface $model, OutputInterface $output): void Logs "Process started" message; override for custom logging
execute execute(InputInterface $input, OutputInterface $output): int Main loop: mutex lock, poll for models, spawn processes, until memory limit

CLI option: --disable-mutex (-dm) -- disables file-based mutex that prevents duplicate instances.

Environment variable: LV_PLUGIN_PHP_BINARY -- path to the PHP binary used to spawn child processes.

QueueHandleCommand (abstract)

Command invoked by child processes to handle a single task.

Namespace: SalesRender\Plugin\Components\Queue\Commands

Method Signature Description
__construct __construct(string $name) Sets command name to {name}:handle

The execute method must be implemented in subclasses. Receives id as a required argument ($input->getArgument('id')).

Usage

1. Define a Task model

Create a concrete task class that extends Task. Pass a TaskAttempt with the desired retry limit and interval.

From plugin-logistic-eushipments -- a simple task with 100 retries and 600-second interval:

use SalesRender\Plugin\Components\Queue\Models\Task\Task;
use SalesRender\Plugin\Components\Queue\Models\Task\TaskAttempt;

final class BindingSyncTask extends Task
{
    public function __construct()
    {
        parent::__construct(new TaskAttempt(100, 600));
    }

    public function getAttempt(): TaskAttempt
    {
        return $this->attempt;
    }
}

From plugin-core-chat -- a task carrying additional data (Chat object), with custom serialization:

use SalesRender\Plugin\Components\Queue\Models\Task\Task;
use SalesRender\Plugin\Components\Queue\Models\Task\TaskAttempt;

class ChatSendTask extends Task
{
    protected Chat $chat;

    public function __construct(Chat $chat)
    {
        parent::__construct(new TaskAttempt(100, 10));
        $this->chat = $chat;
    }

    public function getChat(): Chat
    {
        return $this->chat;
    }

    public function getAttempt(): TaskAttempt
    {
        return $this->attempt;
    }

    protected static function beforeWrite(array $data): array
    {
        $data = parent::beforeWrite($data);
        $data['chat'] = json_encode($data['chat']);
        return $data;
    }

    protected static function afterRead(array $data): array
    {
        $data = parent::afterRead($data);
        $data['chat'] = Chat::parseFromArray(json_decode($data['chat'], true));
        return $data;
    }

    public static function schema(): array
    {
        return array_merge(parent::schema(), [
            'chat' => ['TEXT', 'NOT NULL'],
        ]);
    }
}

2. Implement QueueCommand

Override findModels() to query pending tasks from the database. Use Medoo conditions to filter by attemptLastTime and attemptInterval so that tasks are retried only after the interval has elapsed.

From plugin-core-chat:

use SalesRender\Plugin\Components\Queue\Commands\QueueCommand;
use Medoo\Medoo;

class ChatSendQueueCommand extends QueueCommand
{
    public function __construct()
    {
        parent::__construct(
            'chatSendQueue',
            $_ENV['LV_PLUGIN_CHAT_SEND_QUEUE_LIMIT'] ?? 100,
            25
        );
    }

    protected function findModels(): array
    {
        ChatSendTask::freeUpMemory();
        $condition = [
            'OR' => [
                'attemptLastTime' => null,
                'attemptLastTime[<=]' => Medoo::raw('(:time - <attemptInterval>)', [':time' => time()]),
            ],
            "ORDER" => ["createdAt" => "ASC"],
            'LIMIT' => $this->limit
        ];

        $processes = array_keys($this->processes);
        if (!empty($processes)) {
            $condition['id[!]'] = $processes;
        }

        return ChatSendTask::findByCondition($condition);
    }
}

3. Implement QueueHandleCommand

Override execute() to load the task by ID, perform the business logic, and manage retry/deletion.

From plugin-core-chat:

use SalesRender\Plugin\Components\Db\Components\Connector;
use SalesRender\Plugin\Components\Queue\Commands\QueueHandleCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class ChatSendQueueHandleCommand extends QueueHandleCommand
{
    public function __construct()
    {
        parent::__construct('chatSendQueue');
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        /** @var ChatSendTask $task */
        $task = ChatSendTask::findById($input->getArgument('id'));
        if (is_null($task)) {
            $output->writeln("<error>Task with passed id was not found</error>");
            return Command::INVALID;
        }

        if ($task->getPluginReference()) {
            Connector::setReference($task->getPluginReference());
        }

        try {
            // Execute business logic...
            $task->delete();
            return Command::SUCCESS;
        } catch (Throwable $throwable) {
            $output->writeln("<error>{$throwable->getMessage()}</error>");
            $task->getAttempt()->attempt($throwable->getMessage());
        }

        if ($task->getAttempt()->isSpent()) {
            $task->delete();
        } else {
            $task->save();
        }

        return Command::FAILURE;
    }
}

4. Register commands

Register both commands in your ConsoleAppFactory and add a cron task for the queue command.

From plugin-core-chat:

use Symfony\Component\Console\Application;
use SalesRender\Plugin\Core\Commands\CronCommand;

class ConsoleAppFactory extends \SalesRender\Plugin\Core\Factories\ConsoleAppFactory
{
    public function build(): Application
    {
        $this->app->add(new ChatSendQueueCommand());
        $this->app->add(new ChatSendQueueHandleCommand());

        CronCommand::addTask(
            '* * * * * ' . PHP_BINARY . ' ' . Path::root()->down('console.php') . ' chatSendQueue:queue'
        );

        return parent::build();
    }
}

5. Enqueue a task

Create a task instance and save it:

$task = new ChatSendTask($chat);
$task->save();

Configuration

Environment variable Description
LV_PLUGIN_PHP_BINARY Path to the PHP binary. Used by QueueCommand to spawn child processes via symfony/process.

The queue concurrency limit and memory limit are set in the QueueCommand constructor. Common pattern is to read the limit from an environment variable with a fallback default.

Retry logic

The retry mechanism is built into the TaskAttempt class:

  1. When a handle command catches an error, it calls $task->getAttempt()->attempt($errorMessage) -- this increments the attempt counter and records the timestamp.
  2. On next poll cycle, QueueCommand::findModels() checks attemptLastTime[<=] (now - attemptInterval) -- so the task is not retried until the interval has elapsed.
  3. After processing, $task->getAttempt()->isSpent() returns true when the number of attempts reaches the limit. If spent, the task is deleted; otherwise it is saved for a future retry.

API reference

Task database schema

The base Task::schema() defines the following columns (in addition to id from Model):

Column Type Notes
companyId VARCHAR(255) Nullable; set from Connector::getReference()
pluginAlias VARCHAR(255) Nullable; set from Connector::getReference()
pluginId VARCHAR(255) Nullable; set from Connector::getReference()
createdAt INT NOT NULL Unix timestamp of task creation
attemptLastTime INT Nullable; unix timestamp of last attempt
attemptNumber INT NOT NULL Current attempt number
attemptLimit INT NOT NULL Maximum attempts allowed
attemptInterval INT NOT NULL Seconds between retries
attemptLog VARCHAR(500) Log message from last attempt

Subclasses should override schema() with array_merge(parent::schema(), [...]) to add custom fields.

Command naming convention

For a queue named "foo":

  • Queue daemon command: foo:queue
  • Handle command: foo:handle

Dependencies

Package Purpose
salesrender/plugin-component-db Database ORM: Model, Connector, PluginReference, ReflectionHelper, UuidHelper
symfony/console CLI command framework
symfony/process Spawning child processes for task handling
xakepehok/path Path helper for building file paths (Path::root())
khill/php-duration Human-readable duration formatting for uptime display

See also