mostafax / background-processing-engine
Enterprise-grade, queue-first background file processing engine for Laravel
Package info
github.com/mostafax2/background-processing-engine
pkg:composer/mostafax/background-processing-engine
Requires
- php: ^8.2
- illuminate/broadcasting: ^10.0|^11.0|^12.0|^13.0
- illuminate/bus: ^10.0|^11.0|^12.0|^13.0
- illuminate/cache: ^10.0|^11.0|^12.0|^13.0
- illuminate/database: ^10.0|^11.0|^12.0|^13.0
- illuminate/events: ^10.0|^11.0|^12.0|^13.0
- illuminate/notifications: ^10.0|^11.0|^12.0|^13.0
- illuminate/queue: ^10.0|^11.0|^12.0|^13.0
- illuminate/support: ^10.0|^11.0|^12.0|^13.0
Requires (Dev)
- laravel/pint: ^1.0
- mockery/mockery: ^1.6
- orchestra/testbench: ^8.0|^9.0
- pestphp/pest: ^2.0
- pestphp/pest-plugin-laravel: ^2.0
- phpstan/phpstan: ^1.10
This package is auto-updated.
Last update: 2026-06-02 12:28:40 UTC
README
Enterprise-grade, queue-first background file processing for Laravel.
Table of Contents
- Requirements
- Installation
- Configuration
- Quick Start
- File Type Processors
- Queue & Priority
- Batch Processing
- Job Chaining
- Retry Policies
- Status Tracking
- Real-time Progress (Broadcasting)
- Notifications
- Multi-Tenancy
- Custom Processors
- Artisan Commands
- API Endpoints
- Monitoring
- Scalability
- Testing
- Architecture
Requirements
| Requirement | Version |
|---|---|
| PHP | 8.2+ |
| Laravel | 10 / 11 / 12 / 13 |
| Database | MySQL 8+ / PostgreSQL 14+ |
| Queue | Redis (recommended) / Database / SQS |
| Optional | |
| FFmpeg | Video / Audio processing |
| LibreOffice | Word / Excel / PowerPoint conversion |
| Imagick | Advanced image processing |
| GhostScript | PDF compression |
| pdftotext | PDF text extraction |
Installation
1. Install via Composer
composer require mostafax/background-processing-engine
2. Run the installer
php artisan bpe:install
This single command will:
- Publish the config file to
config/bpe.php - Publish and run the 5 database migrations
- Display usage examples
3. (Optional) Publish individually
# Config only php artisan vendor:publish --tag=bpe-config # Migrations only php artisan vendor:publish --tag=bpe-migrations php artisan migrate
4. Configure the queue
Add your queue workers to config/horizon.php (recommended) or use:
# Basic worker (development) php artisan queue:work --queue=bpe-high,bpe-default,bpe-low,bpe-batch # With Horizon (production) php artisan horizon
Configuration
After publishing, edit config/bpe.php:
return [ 'queue' => [ 'prefix' => env('BPE_QUEUE_PREFIX', 'bpe'), 'topology' => [ 'high' => env('BPE_QUEUE_HIGH', 'bpe-high'), 'default' => env('BPE_QUEUE_DEFAULT', 'bpe-default'), 'low' => env('BPE_QUEUE_LOW', 'bpe-low'), 'batch' => env('BPE_QUEUE_BATCH', 'bpe-batch'), ], ], 'storage' => [ 'default' => env('BPE_STORAGE_DISK', 'local'), // s3, minio, local, ftp 'temp' => env('BPE_TEMP_DIR', sys_get_temp_dir()), ], 'drivers' => [ 'ffmpeg' => [ 'binaries' => [ 'ffmpeg' => env('BPE_FFMPEG_PATH', 'ffmpeg'), 'ffprobe' => env('BPE_FFPROBE_PATH', 'ffprobe'), ], ], 'libreoffice' => [ 'binary' => env('BPE_LIBREOFFICE_PATH', 'libreoffice'), ], ], 'cache' => [ 'enabled' => true, 'store' => env('BPE_CACHE_STORE', 'redis'), // redis, file, array 'ttl' => 300, // seconds ], 'retry' => [ 'default_attempts' => 3, 'default_backoff' => [30, 60, 120], // seconds ], 'pruning' => [ 'enabled' => true, 'keep_days' => 30, ], ];
Environment Variables
Add these to your .env:
# Storage BPE_STORAGE_DISK=local # local | s3 | minio # Queue BPE_QUEUE_PREFIX=bpe BPE_CACHE_STORE=file # redis | file | array # Drivers (optional — only needed for specific file types) BPE_FFMPEG_PATH=/usr/bin/ffmpeg BPE_LIBREOFFICE_PATH=/usr/bin/libreoffice # Security BPE_MAX_FILE_SIZE=524288000 # 500 MB in bytes
Quick Start
Add the facade alias (auto-registered via Laravel auto-discovery):
use Mostafax\BPE\Support\Facades\BPE as FileProcessor;
Or use the helper function:
bpe($file)->dispatch(); bpe()->image($file)->resize(800, 600)->dispatch();
Minimal example
$result = FileProcessor::image($request->file('photo')) ->resize(1920, 1080) ->compress(85) ->dispatch(); // $result->taskId — UUID to track the job // $result->status — 'dispatched' // $result->queueName
File Type Processors
Image Processor
Supported MIME types: image/jpeg, image/png, image/webp, image/gif, image/bmp, image/tiff, image/avif
Requires: GD (built-in) or Imagick extension
FileProcessor::image($file) ->resize(width: 1920, height: 1080, mode: 'fit') // mode: fit | fill | force ->compress(quality: 85) // 1-100 ->convert('webp') // jpg | png | webp | gif ->watermark('/path/to/logo.png', position: 'bottom-right', opacity: 0.5) ->crop(width: 800, height: 600, x: 0, y: 0) ->optimize() // strip metadata + compress ->queue('bpe-high') ->disk('s3') ->dispatch();
Video Processor
Supported MIME types: video/mp4, video/avi, video/quicktime, video/webm, video/3gpp
Requires: FFmpeg
FileProcessor::video($file) ->generateThumbnail(at: '00:00:05', width: 640) // extract frame as image ->compress(crf: 23, preset: 'medium') // H.264, CRF 0-51 ->convert('mp4', codec: 'h264') // mp4 | webm | avi ->extractAudio(format: 'mp3') // strip audio track ->trim(start: '00:00:10', end: '00:02:00') // cut video segment ->watermark('/path/to/logo.png') ->queue('bpe-default') ->dispatch();
PDF Processor
Supported MIME types: application/pdf
Requires: GhostScript (compress), pdftotext (extract text), pdftk (split/merge), Imagick (toImage)
FileProcessor::pdf($file) ->compress() // reduce file size via GhostScript ->extractText() // → .txt file ->split(pages: [1, 3, 5]) // extract specific pages ->merge([$pdf2, $pdf3]) // combine PDFs ->watermark(text: 'CONFIDENTIAL', opacity: 0.3) ->toImage(format: 'jpg', density: 150) // first page → image ->queue('bpe-default') ->dispatch();
Word Processor
Supported MIME types: application/msword, application/vnd.openxmlformats-officedocument.wordprocessingml.document
Requires: LibreOffice
FileProcessor::word($file) ->convertToPDF() // .docx → .pdf ->convertToHTML() // .docx → .html ->extractText() // .docx → .txt ->queue('bpe-default') ->dispatch();
Excel Processor
Supported MIME types: application/vnd.ms-excel, application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
Requires: LibreOffice
FileProcessor::excel($file) ->convertToCSV() // .xlsx → .csv ->convertToPDF() // .xlsx → .pdf ->parseData() // .xlsx → .json (structured rows) ->queue('bpe-batch') ->dispatch();
CSV Processor
Supported MIME types: text/csv, text/plain, application/csv
FileProcessor::csv($file) ->validate([ 'columns' => ['name', 'email', 'age'], // required columns 'max_rows' => 100000, ]) ->transform([ 'map' => ['name' => 'full_name', 'email' => 'email_address'], ]) ->import([ 'table' => 'users', // import directly to DB table 'chunk_size' => 500, ]) ->queue('bpe-batch') ->dispatch();
Queue & Priority
BPE uses 4 priority queues. Workers poll them in weight order.
bpe-high (weight: 10) → user-facing, SLA-bound
bpe-default (weight: 5) → standard async tasks
bpe-low (weight: 2) → non-urgent
bpe-batch (weight: 1) → nightly / bulk jobs
use Mostafax\BPE\Domain\Processing\ValueObjects\Priority; FileProcessor::image($file) ->priority(Priority::HIGH) // sets queue automatically // OR manually: ->queue('bpe-high') ->dispatch();
Horizon configuration:
// config/horizon.php 'environments' => [ 'production' => [ 'bpe-high-supervisor' => [ 'connection' => 'redis', 'queue' => ['bpe-high'], 'balance' => 'auto', 'minProcesses' => 5, 'maxProcesses' => 30, 'timeout' => 3600, ], 'bpe-batch-supervisor' => [ 'connection' => 'redis', 'queue' => ['bpe-batch'], 'processes' => 3, 'timeout' => 7200, ], ], ],
Batch Processing
Process multiple files in one call:
$files = $request->file('files'); // array of UploadedFile $tasks = collect($files)->map(function ($file, $index) { return FileProcessor::process($file) ->queue('bpe-batch') ->priority(Priority::BATCH) ->withMetadata(['batch_index' => $index]) ->dispatch(); }); // Each $task has a unique taskId you can track independently
Job Chaining
Run processors sequentially — each step waits for the previous:
// Compress video, then generate thumbnail from compressed version FileProcessor::video($video) ->compress(crf: 23) ->queue('bpe-default') ->dispatch(); // Then in the TaskCompleted listener, dispatch the next step: Event::listen(TaskCompleted::class, function ($event) { $result = $event->result; FileProcessor::image(FileReference::fromPath($result['thumbnail_path'])) ->resize(640, 360) ->dispatch(); });
Retry Policies
use Mostafax\BPE\Domain\Processing\ValueObjects\RetryPolicy; FileProcessor::video($file) // Custom: 5 attempts, increasing backoff ->retry(attempts: 5, backoff: [10, 30, 60, 120, 300]) ->dispatch();
Built-in policies:
RetryPolicy::default() // 3 attempts, [30, 60, 120] seconds RetryPolicy::aggressive() // 5 attempts, [10, 30, 60, 120, 300] seconds RetryPolicy::noRetry() // 1 attempt, fail immediately
Status Tracking
// Facade $status = FileProcessor::status($taskId); // Returns: ProcessingTaskDTO echo $status->status; // 'pending' | 'processing' | 'completed' | 'failed' echo $status->progress; // 0-100 echo $status->errorMessage; // null | string print_r($status->result); // ['path' => '...', 'disk' => '...', ...] // Via HTTP API GET /api/bpe/tasks/{taskId} // Response: { "data": { "uuid": "abc-123...", "status": "completed", "progress": 100, "file_type": "image", "queue_name": "bpe-high", "result": { "path": "bpe-results/2024/01/01/abc.webp", "disk": "s3" }, "started_at": "2024-01-01T12:00:00Z", "completed_at": "2024-01-01T12:00:03Z" } }
Real-time Progress (Broadcasting)
The package automatically broadcasts events on the public channel bpe.task.{taskId}.
Laravel Echo (frontend):
import Echo from 'laravel-echo'; import Pusher from 'pusher-js'; window.Pusher = Pusher; window.Echo = new Echo({ broadcaster: 'pusher', key: process.env.MIX_PUSHER_APP_KEY }); const taskId = 'abc-123-...'; Echo.channel(`bpe.task.${taskId}`) .listen('.task.started', (e) => { console.log('Started:', e); }) .listen('.task.progress', (e) => { progressBar.style.width = `${e.progress}%`; label.textContent = `${e.step} — ${e.progress}%`; }) .listen('.task.completed', (e) => { showResult(e.result); }) .listen('.task.failed', (e) => { showError(e.message); });
Broadcast event payloads:
| Event | Payload |
|---|---|
task.started |
{ task_id, status: 'processing' } |
task.progress |
{ task_id, progress: 0-100, step: 'compress', status } |
task.completed |
{ task_id, status: 'completed', result: {...} } |
task.failed |
{ task_id, status: 'failed', message: '...' } |
Notifications
FileProcessor::image($file) ->notify(channels: ['mail', 'slack', 'webhook']) ->dispatch();
Configure notification channels in config/bpe.php:
'notifications' => [ 'channels' => ['mail'], 'mail' => ['from' => 'noreply@yourapp.com'], 'slack' => ['webhook' => env('SLACK_WEBHOOK_URL')], 'webhook' => ['url' => env('BPE_WEBHOOK_URL')], ],
Multi-Tenancy
All tasks are automatically scoped by tenant_id:
// Scope to a specific tenant FileProcessor::forTenant($tenantId) ->image($file) ->resize(1920, 1080) ->dispatch(); // Query scoped to tenant ProcessingTask::forTenant($tenantId)->where('status', 'completed')->get();
Add the HasProcessingTasks trait to your Tenant or User model:
use Mostafax\BPE\Support\Traits\HasProcessingTasks; class User extends Authenticatable { use HasProcessingTasks; } // Then: $user->processingTasks()->get(); $user->completedTasks()->count();
Custom Processors
Register any custom file type:
// 1. Create your processor namespace App\Processors; use Mostafax\BPE\Processors\AbstractProcessor; use Mostafax\BPE\Contracts\ProcessorStepInterface; use Mostafax\BPE\Application\DTOs\ProcessingResultDTO; use Mostafax\BPE\Domain\Processing\ValueObjects\FileReference; class DicomProcessor extends AbstractProcessor { public function supportedMimeTypes(): array { return ['application/dicom']; } public function fileType(): string { return 'dicom'; } protected function resolveStep(string $name): ProcessorStepInterface { return match ($name) { 'anonymize' => new AnonymizeDicomStep(), 'convert' => new ConvertDicomStep(), }; } protected function buildResult(FileReference $originalFile, string $outputPath): ProcessingResultDTO { return new ProcessingResultDTO( path: $outputPath, disk: $originalFile->disk(), mimeType: 'image/jpeg', size: filesize($outputPath), ); } }
// 2. Register in AppServiceProvider use Mostafax\BPE\Support\Facades\BPE as FileProcessor; public function boot(): void { FileProcessor::extend('dicom', \App\Processors\DicomProcessor::class); }
// 3. Use it FileProcessor::dicom($file) ->anonymize() ->convert('jpg') ->dispatch();
Artisan Commands
# Install the package (publish + migrate) php artisan bpe:install # Show processing statistics php artisan bpe:status # Retry failed tasks php artisan bpe:retry {uuid} php artisan bpe:retry --all-failed # Prune old tasks php artisan bpe:prune --days=30 # Run demo (no real file needed) php artisan bpe:demo --type=image php artisan bpe:demo --type=csv php artisan bpe:demo --queue php artisan bpe:demo --stats
API Endpoints
The package registers these routes under api/bpe/*:
| Method | Endpoint | Description |
|---|---|---|
GET |
/api/bpe/tasks/{id} |
Get task status |
POST |
/api/bpe/tasks/{id}/cancel |
Cancel task |
POST |
/api/bpe/tasks/{id}/retry |
Retry failed task |
GET |
/api/bpe/health |
Health check |
Control which routes are enabled in config/bpe.php:
'api' => [ 'enabled' => true, 'prefix' => 'api/bpe', 'middleware' => ['api', 'auth:sanctum'], ],
Example HTTP responses:
// GET /api/bpe/tasks/{id} { "data": { "uuid": "abc-123", "status": "completed", "progress": 100, "file_type": "image", "mime_type": "image/jpeg", "queue_name": "bpe-high", "priority": "high", "attempts": 1, "result": { "path": "bpe-results/2024/06/01/result.webp", "disk": "s3", "size": 142500, "mime_type": "image/webp", "metadata": { "width": 1920, "height": 1080 } }, "started_at": "2024-06-01T10:00:01Z", "completed_at": "2024-06-01T10:00:04Z" } } // GET /api/bpe/health { "status": "ok", "version": "1.0.0", "time": "2024-06-01T10:00:00Z" }
Monitoring
Laravel Horizon
BPE integrates with Horizon out of the box. No extra setup needed. All jobs appear under their respective queues in the Horizon dashboard.
Artisan Status
php artisan bpe:status
BPE Processing Statistics
┌─────────────┬───────┐
│ Status │ Count │
├─────────────┼───────┤
│ pending │ 12 │
│ dispatched │ 3 │
│ processing │ 2 │
│ completed │ 891 │
│ failed │ 4 │
│ cancelled │ 1 │
└─────────────┴───────┘
Database Tables
All processing data is persisted in 5 tables:
| Table | Purpose |
|---|---|
bpe_tasks |
Main task record — status, pipeline, retry info |
bpe_results |
Output files — URLs, dimensions, metadata |
bpe_logs |
Per-task log entries with stage tracking |
bpe_events |
Immutable event store (audit trail) |
bpe_metrics |
Time-series performance metrics |
Scalability
100K jobs/day (~70/min)
- 1 Redis instance + 5 Horizon workers
- Single MySQL + BPE with default config
1M jobs/day (~700/min)
- Redis Sentinel (2 replicas)
- 20-30 Horizon workers across 3-5 servers
- MySQL + Read Replica
10M jobs/day (~7000/min)
- Redis Cluster (6 shards)
- 100+ Kubernetes workers (HPA on queue depth)
- PostgreSQL + Citus horizontal sharding
Testing
Use the built-in fake:
use Mostafax\BPE\Support\Facades\BPE as FileProcessor; // In your test setUp() FileProcessor::fake(); // disables real processing; queues are captured // Act $result = FileProcessor::image($fakeFile)->resize(800, 600)->dispatch(); // Assert $this->assertDatabaseHas('bpe_tasks', [ 'uuid' => $result->taskId, 'status' => 'dispatched', 'type' => 'image', ]);
Architecture
BPE follows Clean Architecture + DDD with 8 Bounded Contexts:
Domain Layer (ProcessingTask aggregate, Value Objects, Domain Events)
↑ depends on nothing external
Application Layer (PendingProcess builder, Commands, Queries, DTOs)
↑ depends on Domain
Infrastructure Layer (Eloquent Repositories, Queue, Storage, Drivers)
↑ depends on Application contracts
Interface Layer (HTTP Controllers, Artisan Commands, Broadcast Events)
↑ depends on Application
Key design patterns:
- Aggregate Root —
ProcessingTaskowns all state transitions - Value Objects —
Priority,RetryPolicy,ProcessingPipelineare immutable - Repository Pattern —
CachedTaskRepositorywrapsEloquentTaskRepository - Strategy Pattern — Processors and Steps are swappable implementations
- Fluent Builder —
PendingProcessaccumulates configuration before dispatching - Event Sourcing — All state changes recorded in
bpe_events
License
MIT © Mostafa — mostafa.m.elbiar2@gmail.com