onlyphp / simple-queue
A powerful and flexible job processing system for PHP applications, supporting both CodeIgniter 3 and Laravel frameworks. This package allows you to easily dispatch jobs with various types of processing, including closures, static class methods, object methods, global functions, and invokable classe
Requires
- php: >=8.0
- opis/closure: ^4.0
README
A framework-agnostic queue system for PHP that seamlessly integrates with multiple database systems and frameworks. This package offers Laravel-inspired queue functionality without the need for cron jobs, enabling you to effortlessly dispatch jobs with various processing types, including closures, static class methods, object methods, global functions, and invokable classes. This package supports databases such as PDO, MySQL, MSSQL, Oracle and is compatible with CodeIgniter 3, CodeIgniter 4, and Laravel framework.
⚠️ Warning
DO NOT USE THIS PACKAGE.
This package is under active development and may contain critical bugs. It is primarily intended for personal use and testing within my own projects.
This version has not undergone rigorous testing and may be unstable or unreliable.
🛠️ Primary Focus (Testing & Development):
- CodeIgniter 3
⏭️ Supported (Experimental):
- PDO
- MySQL
- MSSQL
- Oracle
- CodeIgniter 4
- Laravel
✨ Features
- 🚀 No cron jobs required - automatic worker process management
- 🔄 Automatic table creation and management
- 💪 Multiple database support (PDO, MySQL, MSSQL, Oracle, CodeIgniter 3/4, Laravel)
- ⚡ Priority queues (urgent, high, normal, low)
- 🔁 Automatic retry mechanism with customizable attempts
- ⏱️ Job timeout handling
- 📊 Job status monitoring
- 🔍 Failed job handling
📝 Requirements
- PHP >= 8.0
🔧 Installation
Install via Composer:
composer require onlyphp/simple-queue
🚀 Basic Usage
Initialize with Different Databases
use OnlyPHP\SimpleQueue\JobProcessor; // PDO MySQL $pdo = new PDO('mysql:host=localhost;dbname=your_database', 'username', 'password'); $queue = new JobProcessor($pdo); // PDO MSSQL $pdoMssql = new PDO('sqlsrv:Server=localhost;Database=your_database', 'username', 'password'); $queueMssql = new JobProcessor($pdoMssql); // PDO Oracle $pdoOracle = new PDO('oci:dbname=your_database', 'username', 'password'); $queueOracle = new JobProcessor($pdoOracle); // CodeIgniter 3 $queue = new JobProcessor($this->db); // CodeIgniter 4 $db = \Config\Database::connect(); $queue = new JobProcessor($db); // Laravel $queue = new JobProcessor(DB::connection());
🖥️ Priority Levels
urgent
: Highest priorityhigh
: High prioritynormal
: Default prioritylow
: Low priority
🎯 Examples
Simple Jobs
// Dispatch a closure $queue->job(function() { echo "Processing job..."; })->dispatch(); // Dispatch a function $queue->job('process_data', ['param1', 'param2'])->setIncludePathFile('/path/to/file.php')->dispatch(); // Dispatch a class method $queue->job([new YourClass(), 'methodName'], $params)->dispatch(); // Execute job immediately without queueing (foreground processed) using closure/callable $result = $queue->job($callable)->dispatchNow();
Advanced Jobs
1. Email Processing Queue
class EmailService { public function sendBulkEmails(array $recipients, string $template) { foreach ($recipients as $recipient) { // Process each email } } } // Queue the email job $emailService = new EmailService(); $recipients = ['user1@example.com', 'user2@example.com']; $queue->job([$emailService, 'sendBulkEmails'], $recipients) ->setName('bulk-email-campaign') ->setPriority('high') ->setMaxRetries(3) ->setRetryDelay(60) ->dispatch();
2. File Processing Queue
class FileProcessor { public function processLargeFile(string $filePath) { // Process large file } } // Queue file processing $processor = new FileProcessor(); $queue->job([$processor, 'processLargeFile'], $filePath) ->setName('large-file-processing') ->setTimeout(3600) // 1 hour timeout ->dispatch();
3. Order Processing System
class OrderProcessor { public function processOrder(int $orderId, array $items) { // Process order logic } } // Queue order processing $processor = new OrderProcessor(); $order = [ 'id' => 1234, 'items' => ['item1', 'item2'] ]; $queue->job([$processor, 'processOrder'], $order) ->setName("process-order-{$order['id']}") ->setPriority('urgent') ->setMaxRetries(5) ->setRetryDelay(30) ->dispatch();
4. Data Export Jobs
class DataExporter { public function exportToCSV(string $query, string $filename) { // Export logic } } // Queue export job $exporter = new DataExporter(); $queue->job([$exporter, 'exportToCSV'], $params) ->setName('monthly-report-export') ->setPriority('normal') ->setTimeout(7200) // 2 hours ->dispatch();
5. Image Processing Queue
class ImageProcessor { public function processImage(string $path, array $options) { // Image processing logic } } // Queue image processing $processor = new ImageProcessor(); $options = [ 'resize' => true, 'width' => 800, 'height' => 600, 'optimize' => true ]; $queue->job([$processor, 'processImage'], $options) ->setName('image-processing') ->setPriority('low') ->setMaxRetries(2) ->dispatch();
6. Calling global function without classes
function exportCsvData(string $filePath) { // Process to export data } // Queue file export processing $queue->job('exportCsvData', $filePath) ->setIncludePathFile('/path/to/csv_helpers.php') // Include the file before called the function ->dispatch();
7. Invokable Classes
class MyInvokableClass { public function __invoke($param1, $param2) { // Invokable class logic echo "Processing invokable class with params: $param1, $param2"; } } $invokable = new MyInvokableClass(); $processor->job($invokable, ['param1', 'param2']) ->dispatch();
⚙️ Configuration Options
$config = [ 'process_check_interval' => 1000000, // Worker check interval (microseconds) 'worker_timeout' => 3600, // Worker timeout (seconds) 'max_workers' => 1, // Maximum number of concurrent workers 'lock_dir' => '/tmp' // Directory for worker lock files ]; $queue = new JobProcessor($connection, $config);
📊 Monitoring and Management
$processor = new JobProcessor($connection, $config); // Get queue statistics $stats = $processor->getJobStats(); print_r($stats); /* Output: [ 'total_jobs' => 100, 'pending_jobs' => 10, 'processing_jobs' => 5, 'completed_jobs' => 80, 'failed_jobs' => 5, 'avg_processing_time' => 45.5 ] */ // Get specific job status $jobUuId = $processor->job($callable)->dispatch(); $status = $processor->getJobStatus($jobUuId); // Retry failed jobs $processor->retryAllFailed(); // Clear old failed jobs $processor->clearFailedJobs(30); // Clear jobs older than 30 days
📅 Database Tables
The package automatically creates two tables:
jobs
- Stores queue jobsfailed_jobs
- Stores failed job information
👍 Best Practices
- Job Naming: Always set meaningful names for jobs using
setName()
for better tracking. - Timeouts: Set appropriate timeouts based on job complexity.
- Retries: Configure max retries based on job criticality.
- Priority: Use priorities wisely - reserve 'urgent' for truly time-critical tasks.
🔍 Error Handling
The package handles various types of errors:
- Connection failures
- Execution timeouts
- Runtime errors
- Worker process failures
Failed jobs are automatically logged with full stack traces in the failed_jobs
table.
📛 Limitations
- Windows support is limited for signal handling
- Maximum execution time is subject to PHP's
max_execution_time
setting - Single-server deployment only (no distributed queue support)
📌 To-do Improvements
- Implement job events/hooks
- Add support for job batching
- Add Redis/memory queue support
- Add job progress tracking
- Implement job middleware support
- Add support for unique jobs
- Add job chaining capabilities
🤝 Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the Project
- Create your Feature Branch (
git checkout -b feature/AmazingFeature
) - Commit your Changes (
git commit -m 'Add some AmazingFeature'
) - Push to the Branch (
git push origin feature/AmazingFeature
) - Open a Pull Request
📄 License
This project is licensed under the MIT License.