mostafax/background-processing-engine

Enterprise-grade, queue-first background file processing engine for Laravel

Maintainers

Package info

github.com/mostafax2/background-processing-engine

pkg:composer/mostafax/background-processing-engine

Statistics

Installs: 0

Dependents: 1

Suggesters: 0

Stars: 0

Open Issues: 0

dev-main 2026-06-02 12:24 UTC

This package is auto-updated.

Last update: 2026-06-02 12:28:40 UTC


README

Enterprise-grade, queue-first background file processing for Laravel.

Latest Version PHP Laravel License

Table of Contents

Requirements

Requirement Version
PHP 8.2+
Laravel 10 / 11 / 12 / 13
Database MySQL 8+ / PostgreSQL 14+
Queue Redis (recommended) / Database / SQS
Optional
FFmpeg Video / Audio processing
LibreOffice Word / Excel / PowerPoint conversion
Imagick Advanced image processing
GhostScript PDF compression
pdftotext PDF text extraction

Installation

1. Install via Composer

composer require mostafax/background-processing-engine

2. Run the installer

php artisan bpe:install

This single command will:

  • Publish the config file to config/bpe.php
  • Publish and run the 5 database migrations
  • Display usage examples

3. (Optional) Publish individually

# Config only
php artisan vendor:publish --tag=bpe-config

# Migrations only
php artisan vendor:publish --tag=bpe-migrations
php artisan migrate

4. Configure the queue

Add your queue workers to config/horizon.php (recommended) or use:

# Basic worker (development)
php artisan queue:work --queue=bpe-high,bpe-default,bpe-low,bpe-batch

# With Horizon (production)
php artisan horizon

Configuration

After publishing, edit config/bpe.php:

return [

    'queue' => [
        'prefix'   => env('BPE_QUEUE_PREFIX', 'bpe'),
        'topology' => [
            'high'    => env('BPE_QUEUE_HIGH',    'bpe-high'),
            'default' => env('BPE_QUEUE_DEFAULT', 'bpe-default'),
            'low'     => env('BPE_QUEUE_LOW',     'bpe-low'),
            'batch'   => env('BPE_QUEUE_BATCH',   'bpe-batch'),
        ],
    ],

    'storage' => [
        'default' => env('BPE_STORAGE_DISK', 'local'), // s3, minio, local, ftp
        'temp'    => env('BPE_TEMP_DIR', sys_get_temp_dir()),
    ],

    'drivers' => [
        'ffmpeg' => [
            'binaries' => [
                'ffmpeg'  => env('BPE_FFMPEG_PATH',  'ffmpeg'),
                'ffprobe' => env('BPE_FFPROBE_PATH', 'ffprobe'),
            ],
        ],
        'libreoffice' => [
            'binary' => env('BPE_LIBREOFFICE_PATH', 'libreoffice'),
        ],
    ],

    'cache' => [
        'enabled' => true,
        'store'   => env('BPE_CACHE_STORE', 'redis'), // redis, file, array
        'ttl'     => 300, // seconds
    ],

    'retry' => [
        'default_attempts' => 3,
        'default_backoff'  => [30, 60, 120], // seconds
    ],

    'pruning' => [
        'enabled'   => true,
        'keep_days' => 30,
    ],
];

Environment Variables

Add these to your .env:

# Storage
BPE_STORAGE_DISK=local          # local | s3 | minio

# Queue
BPE_QUEUE_PREFIX=bpe
BPE_CACHE_STORE=file            # redis | file | array

# Drivers (optional — only needed for specific file types)
BPE_FFMPEG_PATH=/usr/bin/ffmpeg
BPE_LIBREOFFICE_PATH=/usr/bin/libreoffice

# Security
BPE_MAX_FILE_SIZE=524288000     # 500 MB in bytes

Quick Start

Add the facade alias (auto-registered via Laravel auto-discovery):

use Mostafax\BPE\Support\Facades\BPE as FileProcessor;

Or use the helper function:

bpe($file)->dispatch();
bpe()->image($file)->resize(800, 600)->dispatch();

Minimal example

$result = FileProcessor::image($request->file('photo'))
    ->resize(1920, 1080)
    ->compress(85)
    ->dispatch();

// $result->taskId   — UUID to track the job
// $result->status   — 'dispatched'
// $result->queueName

File Type Processors

Image Processor

Supported MIME types: image/jpeg, image/png, image/webp, image/gif, image/bmp, image/tiff, image/avif

Requires: GD (built-in) or Imagick extension

FileProcessor::image($file)
    ->resize(width: 1920, height: 1080, mode: 'fit')   // mode: fit | fill | force
    ->compress(quality: 85)                             // 1-100
    ->convert('webp')                                   // jpg | png | webp | gif
    ->watermark('/path/to/logo.png', position: 'bottom-right', opacity: 0.5)
    ->crop(width: 800, height: 600, x: 0, y: 0)
    ->optimize()                                        // strip metadata + compress
    ->queue('bpe-high')
    ->disk('s3')
    ->dispatch();

Video Processor

Supported MIME types: video/mp4, video/avi, video/quicktime, video/webm, video/3gpp

Requires: FFmpeg

FileProcessor::video($file)
    ->generateThumbnail(at: '00:00:05', width: 640)    // extract frame as image
    ->compress(crf: 23, preset: 'medium')              // H.264, CRF 0-51
    ->convert('mp4', codec: 'h264')                    // mp4 | webm | avi
    ->extractAudio(format: 'mp3')                      // strip audio track
    ->trim(start: '00:00:10', end: '00:02:00')         // cut video segment
    ->watermark('/path/to/logo.png')
    ->queue('bpe-default')
    ->dispatch();

PDF Processor

Supported MIME types: application/pdf

Requires: GhostScript (compress), pdftotext (extract text), pdftk (split/merge), Imagick (toImage)

FileProcessor::pdf($file)
    ->compress()                                        // reduce file size via GhostScript
    ->extractText()                                     // → .txt file
    ->split(pages: [1, 3, 5])                          // extract specific pages
    ->merge([$pdf2, $pdf3])                             // combine PDFs
    ->watermark(text: 'CONFIDENTIAL', opacity: 0.3)
    ->toImage(format: 'jpg', density: 150)              // first page → image
    ->queue('bpe-default')
    ->dispatch();

Word Processor

Supported MIME types: application/msword, application/vnd.openxmlformats-officedocument.wordprocessingml.document

Requires: LibreOffice

FileProcessor::word($file)
    ->convertToPDF()        // .docx → .pdf
    ->convertToHTML()       // .docx → .html
    ->extractText()         // .docx → .txt
    ->queue('bpe-default')
    ->dispatch();

Excel Processor

Supported MIME types: application/vnd.ms-excel, application/vnd.openxmlformats-officedocument.spreadsheetml.sheet

Requires: LibreOffice

FileProcessor::excel($file)
    ->convertToCSV()        // .xlsx → .csv
    ->convertToPDF()        // .xlsx → .pdf
    ->parseData()           // .xlsx → .json (structured rows)
    ->queue('bpe-batch')
    ->dispatch();

CSV Processor

Supported MIME types: text/csv, text/plain, application/csv

FileProcessor::csv($file)
    ->validate([
        'columns'  => ['name', 'email', 'age'],   // required columns
        'max_rows' => 100000,
    ])
    ->transform([
        'map' => ['name' => 'full_name', 'email' => 'email_address'],
    ])
    ->import([
        'table'      => 'users',        // import directly to DB table
        'chunk_size' => 500,
    ])
    ->queue('bpe-batch')
    ->dispatch();

Queue & Priority

BPE uses 4 priority queues. Workers poll them in weight order.

bpe-high    (weight: 10) → user-facing, SLA-bound
bpe-default (weight:  5) → standard async tasks
bpe-low     (weight:  2) → non-urgent
bpe-batch   (weight:  1) → nightly / bulk jobs
use Mostafax\BPE\Domain\Processing\ValueObjects\Priority;

FileProcessor::image($file)
    ->priority(Priority::HIGH)          // sets queue automatically
    // OR manually:
    ->queue('bpe-high')
    ->dispatch();

Horizon configuration:

// config/horizon.php
'environments' => [
    'production' => [
        'bpe-high-supervisor' => [
            'connection'   => 'redis',
            'queue'        => ['bpe-high'],
            'balance'      => 'auto',
            'minProcesses' => 5,
            'maxProcesses' => 30,
            'timeout'      => 3600,
        ],
        'bpe-batch-supervisor' => [
            'connection' => 'redis',
            'queue'      => ['bpe-batch'],
            'processes'  => 3,
            'timeout'    => 7200,
        ],
    ],
],

Batch Processing

Process multiple files in one call:

$files = $request->file('files');   // array of UploadedFile

$tasks = collect($files)->map(function ($file, $index) {
    return FileProcessor::process($file)
        ->queue('bpe-batch')
        ->priority(Priority::BATCH)
        ->withMetadata(['batch_index' => $index])
        ->dispatch();
});

// Each $task has a unique taskId you can track independently

Job Chaining

Run processors sequentially — each step waits for the previous:

// Compress video, then generate thumbnail from compressed version
FileProcessor::video($video)
    ->compress(crf: 23)
    ->queue('bpe-default')
    ->dispatch();

// Then in the TaskCompleted listener, dispatch the next step:
Event::listen(TaskCompleted::class, function ($event) {
    $result = $event->result;
    FileProcessor::image(FileReference::fromPath($result['thumbnail_path']))
        ->resize(640, 360)
        ->dispatch();
});

Retry Policies

use Mostafax\BPE\Domain\Processing\ValueObjects\RetryPolicy;

FileProcessor::video($file)
    // Custom: 5 attempts, increasing backoff
    ->retry(attempts: 5, backoff: [10, 30, 60, 120, 300])
    ->dispatch();

Built-in policies:

RetryPolicy::default()     // 3 attempts, [30, 60, 120] seconds
RetryPolicy::aggressive()  // 5 attempts, [10, 30, 60, 120, 300] seconds
RetryPolicy::noRetry()     // 1 attempt, fail immediately

Status Tracking

// Facade
$status = FileProcessor::status($taskId);
// Returns: ProcessingTaskDTO

echo $status->status;        // 'pending' | 'processing' | 'completed' | 'failed'
echo $status->progress;      // 0-100
echo $status->errorMessage;  // null | string
print_r($status->result);    // ['path' => '...', 'disk' => '...', ...]

// Via HTTP API
GET /api/bpe/tasks/{taskId}

// Response:
{
    "data": {
        "uuid": "abc-123...",
        "status": "completed",
        "progress": 100,
        "file_type": "image",
        "queue_name": "bpe-high",
        "result": { "path": "bpe-results/2024/01/01/abc.webp", "disk": "s3" },
        "started_at": "2024-01-01T12:00:00Z",
        "completed_at": "2024-01-01T12:00:03Z"
    }
}

Real-time Progress (Broadcasting)

The package automatically broadcasts events on the public channel bpe.task.{taskId}.

Laravel Echo (frontend):

import Echo from 'laravel-echo';
import Pusher from 'pusher-js';

window.Pusher = Pusher;
window.Echo = new Echo({ broadcaster: 'pusher', key: process.env.MIX_PUSHER_APP_KEY });

const taskId = 'abc-123-...';

Echo.channel(`bpe.task.${taskId}`)
    .listen('.task.started', (e) => {
        console.log('Started:', e);
    })
    .listen('.task.progress', (e) => {
        progressBar.style.width = `${e.progress}%`;
        label.textContent = `${e.step}${e.progress}%`;
    })
    .listen('.task.completed', (e) => {
        showResult(e.result);
    })
    .listen('.task.failed', (e) => {
        showError(e.message);
    });

Broadcast event payloads:

Event Payload
task.started { task_id, status: 'processing' }
task.progress { task_id, progress: 0-100, step: 'compress', status }
task.completed { task_id, status: 'completed', result: {...} }
task.failed { task_id, status: 'failed', message: '...' }

Notifications

FileProcessor::image($file)
    ->notify(channels: ['mail', 'slack', 'webhook'])
    ->dispatch();

Configure notification channels in config/bpe.php:

'notifications' => [
    'channels' => ['mail'],
    'mail'    => ['from' => 'noreply@yourapp.com'],
    'slack'   => ['webhook' => env('SLACK_WEBHOOK_URL')],
    'webhook' => ['url' => env('BPE_WEBHOOK_URL')],
],

Multi-Tenancy

All tasks are automatically scoped by tenant_id:

// Scope to a specific tenant
FileProcessor::forTenant($tenantId)
    ->image($file)
    ->resize(1920, 1080)
    ->dispatch();

// Query scoped to tenant
ProcessingTask::forTenant($tenantId)->where('status', 'completed')->get();

Add the HasProcessingTasks trait to your Tenant or User model:

use Mostafax\BPE\Support\Traits\HasProcessingTasks;

class User extends Authenticatable
{
    use HasProcessingTasks;
}

// Then:
$user->processingTasks()->get();
$user->completedTasks()->count();

Custom Processors

Register any custom file type:

// 1. Create your processor
namespace App\Processors;

use Mostafax\BPE\Processors\AbstractProcessor;
use Mostafax\BPE\Contracts\ProcessorStepInterface;
use Mostafax\BPE\Application\DTOs\ProcessingResultDTO;
use Mostafax\BPE\Domain\Processing\ValueObjects\FileReference;

class DicomProcessor extends AbstractProcessor
{
    public function supportedMimeTypes(): array
    {
        return ['application/dicom'];
    }

    public function fileType(): string { return 'dicom'; }

    protected function resolveStep(string $name): ProcessorStepInterface
    {
        return match ($name) {
            'anonymize' => new AnonymizeDicomStep(),
            'convert'   => new ConvertDicomStep(),
        };
    }

    protected function buildResult(FileReference $originalFile, string $outputPath): ProcessingResultDTO
    {
        return new ProcessingResultDTO(
            path: $outputPath,
            disk: $originalFile->disk(),
            mimeType: 'image/jpeg',
            size: filesize($outputPath),
        );
    }
}
// 2. Register in AppServiceProvider
use Mostafax\BPE\Support\Facades\BPE as FileProcessor;

public function boot(): void
{
    FileProcessor::extend('dicom', \App\Processors\DicomProcessor::class);
}
// 3. Use it
FileProcessor::dicom($file)
    ->anonymize()
    ->convert('jpg')
    ->dispatch();

Artisan Commands

# Install the package (publish + migrate)
php artisan bpe:install

# Show processing statistics
php artisan bpe:status

# Retry failed tasks
php artisan bpe:retry {uuid}
php artisan bpe:retry --all-failed

# Prune old tasks
php artisan bpe:prune --days=30

# Run demo (no real file needed)
php artisan bpe:demo --type=image
php artisan bpe:demo --type=csv
php artisan bpe:demo --queue
php artisan bpe:demo --stats

API Endpoints

The package registers these routes under api/bpe/*:

Method Endpoint Description
GET /api/bpe/tasks/{id} Get task status
POST /api/bpe/tasks/{id}/cancel Cancel task
POST /api/bpe/tasks/{id}/retry Retry failed task
GET /api/bpe/health Health check

Control which routes are enabled in config/bpe.php:

'api' => [
    'enabled'    => true,
    'prefix'     => 'api/bpe',
    'middleware' => ['api', 'auth:sanctum'],
],

Example HTTP responses:

// GET /api/bpe/tasks/{id}
{
    "data": {
        "uuid": "abc-123",
        "status": "completed",
        "progress": 100,
        "file_type": "image",
        "mime_type": "image/jpeg",
        "queue_name": "bpe-high",
        "priority": "high",
        "attempts": 1,
        "result": {
            "path": "bpe-results/2024/06/01/result.webp",
            "disk": "s3",
            "size": 142500,
            "mime_type": "image/webp",
            "metadata": { "width": 1920, "height": 1080 }
        },
        "started_at": "2024-06-01T10:00:01Z",
        "completed_at": "2024-06-01T10:00:04Z"
    }
}

// GET /api/bpe/health
{
    "status": "ok",
    "version": "1.0.0",
    "time": "2024-06-01T10:00:00Z"
}

Monitoring

Laravel Horizon

BPE integrates with Horizon out of the box. No extra setup needed. All jobs appear under their respective queues in the Horizon dashboard.

Artisan Status

php artisan bpe:status
BPE Processing Statistics
┌─────────────┬───────┐
│ Status      │ Count │
├─────────────┼───────┤
│ pending     │   12  │
│ dispatched  │    3  │
│ processing  │    2  │
│ completed   │  891  │
│ failed      │    4  │
│ cancelled   │    1  │
└─────────────┴───────┘

Database Tables

All processing data is persisted in 5 tables:

Table Purpose
bpe_tasks Main task record — status, pipeline, retry info
bpe_results Output files — URLs, dimensions, metadata
bpe_logs Per-task log entries with stage tracking
bpe_events Immutable event store (audit trail)
bpe_metrics Time-series performance metrics

Scalability

100K jobs/day (~70/min)

  • 1 Redis instance + 5 Horizon workers
  • Single MySQL + BPE with default config

1M jobs/day (~700/min)

  • Redis Sentinel (2 replicas)
  • 20-30 Horizon workers across 3-5 servers
  • MySQL + Read Replica

10M jobs/day (~7000/min)

  • Redis Cluster (6 shards)
  • 100+ Kubernetes workers (HPA on queue depth)
  • PostgreSQL + Citus horizontal sharding

Testing

Use the built-in fake:

use Mostafax\BPE\Support\Facades\BPE as FileProcessor;

// In your test setUp()
FileProcessor::fake();   // disables real processing; queues are captured

// Act
$result = FileProcessor::image($fakeFile)->resize(800, 600)->dispatch();

// Assert
$this->assertDatabaseHas('bpe_tasks', [
    'uuid'   => $result->taskId,
    'status' => 'dispatched',
    'type'   => 'image',
]);

Architecture

BPE follows Clean Architecture + DDD with 8 Bounded Contexts:

Domain Layer           (ProcessingTask aggregate, Value Objects, Domain Events)
    ↑ depends on nothing external
Application Layer      (PendingProcess builder, Commands, Queries, DTOs)
    ↑ depends on Domain
Infrastructure Layer   (Eloquent Repositories, Queue, Storage, Drivers)
    ↑ depends on Application contracts
Interface Layer        (HTTP Controllers, Artisan Commands, Broadcast Events)
    ↑ depends on Application

Key design patterns:

  • Aggregate RootProcessingTask owns all state transitions
  • Value ObjectsPriority, RetryPolicy, ProcessingPipeline are immutable
  • Repository PatternCachedTaskRepository wraps EloquentTaskRepository
  • Strategy Pattern — Processors and Steps are swappable implementations
  • Fluent BuilderPendingProcess accumulates configuration before dispatching
  • Event Sourcing — All state changes recorded in bpe_events

License

MIT © Mostafa — mostafa.m.elbiar2@gmail.com