kelemen/parallel

Library for parallel running tasks

2.2.0 2023-03-22 09:34 UTC

This package is auto-updated.

Last update: 2024-04-16 10:39:30 UTC


README

Library for parallel (concurrent) task processing. Implemented with symfony/process.

Basic table output

output

Configuration

Basic

command.php

#!/usr/bin/env php
<?php

require_once __DIR__ . '/../vendor/autoload.php';

// First setup Parallel class
// If you setup log dir (optional) parallel will automatically create sub folder /stats and log running statistics in json format here.
$parallel = new \Parallel\Parallel(__DIR__, 'parallel', 5, 0.1);

// Add some tasks (only examples, not part of parallel)
// Task is defined by its name and by dependencies (optional)
// Dependencies ($runAfter = []) are tasks which have to be done before task can start
$parallel->addTask(new \Parallel\AdminsTask('task:admin'), ['task:categories']);
$parallel->addTask(new \Parallel\UsersTask('task:user'));
$parallel->addTask(new \Parallel\ArticlesTask('task:articles'), ['task:admin', 'task:user']);
$parallel->addTask(new \Parallel\CategoriesTask('task:categories'));

// Some tasks can be too much resource expensive, so we can define how many tasks can run along this task.
// If we setup 0, this task will be run alone although we setup 5 as global max concurrent
$parallel->addTask(new \Parallel\ArticleCategoriesTask('task:articlesCategories'), 0);

// Run symfony application under hood
$parallel->runConsoleApp();

Now simply run

php command.php parallel:run

Run only subnet of registered tasks

Sometimes you want to run only subnet of registered task (eg. in development). For this purpose use --subnet option in parallel:run command. --subnet option is validated as regexp and accept multiple values. Also all dependencies that doesn't match any of subnet regexp is removed from matched tasks.

# This command run only task:user and task:categories tasks
php command.php parallel:run --subnet task:user$ --subnet task:categories$

Logging

PSR logger is implemented. So we can use monolog/monolog.

<?php
// Setup PSR logger
// ...
$parallel->setLogger($psrLogger);

Analyze command

Parallel can visualize tasks dependencies graph. All you have to do is set analyze dir. Output file in HTML format will be generated to setup directory.

<?php
$parallel->setAnalyzeDir(__DIR__ . '/../log');
# Now you can run
php command.php analyze:graph

Tasks

Every task has to return some kind of result (SuccessResult, SkipResult, ErrorResult) as the result of task/item processing.

Task types

SimpleTask

Suitable for tasks with static input data processing.

<?php
class ImplementedSimpleTask extends SimpleTask
{
    protected function processTask(InputInterface $input, OutputInterface $output): TaskResult
    {
        // Do some magic
        return new SuccessResult();
    }
}

ProgressTask

Suitable if you need to process each item from a medium dataset separately. All source items are provided at once. In some cases it can be too expensive for memory (see BatchProgressTask).

<?php
class ImplementedProgressTask extends ProgressTask
{
    protected function items(): iterable
    {
        return DB::table('users');
    }

    protected function itemsCount(): int
    {
        return DB::table('users')->count();
    }

    protected function processItem($item): TaskResult
    {
        // $item here is one record form users table
        // It can be anything what is provided by items() method (array, object ...)
        if (!$item['is_active']) {
            return new SkipResult();
        }
        
        file_put_contents('active_users', $item['id'] . "/n", FILE_APPEND | LOCK_EX);
        return SuccessResult();
    }
}

BatchProgressTask

Most advanced task. Suitable if you need to process each item from a large dataset separately. Items can be provided and processed in batches.

<?php
class ImplementedBatchProgressTask extends BatchProgressTask
{
    protected function startup(): void
    {
        // Here you can prepare data
    }

    protected function shutdown(): void
    {
        // Here you can run cleanup
    }

    protected function items(int $processed): iterable
    {
        // Fetch data (eg. from database) by 500 and use offset
        return DB::table('users')->limit(500)->offset($processed);
    }

    protected function itemsCount(): int
    {
        // Count ALL data that will be processed
        return DB::table('users')->count();
    }

    protected function processItem($item): TaskResult
    {
        // $item here is one record form users table
        if (!$item['is_active']) {
            return new SkipResult();
        }
        
        return new SuccessResult([
            'id' => $item['id'],
            'name' => $item['name']
        ]);
    }

    protected function batch(array $items): void
    {
        DB::table('active_users')->insert($items);
    }
}

Messages

Anywhere in ProgressTask and BatchProgressTask can be send message to output with sendMessage($message) method.