webpatser / torque
Coroutine-based queue worker for Laravel — N jobs per process via PHP Fibers and Redis Streams
Requires
- php: ^8.5
- amphp/amp: ^3.1
- amphp/redis: ^2.0
- amphp/sync: ^2.3
- illuminate/console: ^12.0|^13.0
- illuminate/queue: ^12.0|^13.0
- illuminate/support: ^12.0|^13.0
- revolt/event-loop: ^1.0
Requires (Dev)
- amphp/http-client: ^5.3
- amphp/mysql: ^3.0
- orchestra/testbench: ^10.0|^11.0
- pestphp/pest: ^4.0
Suggests
- amphp/http-client: Required for async HTTP client pooling (^5.3)
- amphp/mysql: Required for async MySQL connection pooling (^3.0)
README
The queue that keeps spinning. Coroutine-based queue worker for Laravel.
Torque replaces Horizon's 1-job-per-process model with N-jobs-per-process using PHP 8.5 Fibers. When a job waits on I/O, the coroutine scheduler switches to another job. Same hardware, 3-10x throughput for I/O-bound workloads.
4 workers x 50 coroutines = 200 concurrent jobs in ~300MB RAM
Requirements
- PHP 8.5+
- Laravel 12+
- Redis 7+ or Valkey (Redis Streams support)
- Revolt event loop (installed automatically)
Installation
composer require webpatser/torque
Publish the config:
php artisan vendor:publish --tag=torque-config
Add the queue connection to config/queue.php:
'torque' => [ 'driver' => 'torque', 'queue' => 'default', 'retry_after' => 90, 'block_for' => 2000, 'prefix' => 'torque:', 'redis_uri' => env('TORQUE_REDIS_URI', 'redis://127.0.0.1:6379'), 'consumer_group' => 'torque', ],
Set it as default in .env:
QUEUE_CONNECTION=torque
Usage
Starting the worker
php artisan torque:start
Options:
php artisan torque:start --workers=8 --concurrency=100 --queues=emails,notifications
Dispatching jobs
Standard Laravel dispatching works unchanged:
ProcessDocument::dispatch($document); ProcessDocument::dispatch($document)->onQueue('high'); ProcessDocument::dispatch($document)->delay(now()->addMinutes(5)); // Batches work out of the box Bus::batch([ new ProcessDocument($doc1), new ProcessDocument($doc2), new ProcessDocument($doc3), ])->dispatch();
Async jobs with TorqueJob
Regular Laravel jobs work fine — they run synchronously within their coroutine slot. For full async I/O, extend TorqueJob and type-hint the pools you need:
use Webpatser\Torque\Job\TorqueJob; use Webpatser\Torque\Pool\MysqlPool; use Webpatser\Torque\Pool\HttpPool; class IndexDocument extends TorqueJob { public function __construct( private int $documentId, ) {} public function handle(MysqlPool $db, HttpPool $http): void { $result = $db->execute('SELECT * FROM documents WHERE id = ?', [$this->documentId]); $row = $result->fetchRow(); $http->post('http://elasticsearch:9200/docs/_doc/' . $this->documentId, json_encode($row)); } }
Per-Fiber state isolation
Use CoroutineContext when you need per-job isolated state (e.g., request-scoped data):
use Webpatser\Torque\Job\CoroutineContext; // Inside a job handler CoroutineContext::set('tenant_id', $this->tenantId); $tenantId = CoroutineContext::get('tenant_id');
State is automatically cleaned up when the Fiber completes (backed by WeakMap).
Job Event Streams
Every job automatically records lifecycle events to a per-job Redis Stream — no code changes needed.
$ php artisan torque:tail --job=088066c1-b045-4fb6-bc32-ca15cfdf7d08 📦 11:08:34 queued App\Jobs\ScrapeKvK → scrpr ▶ 11:10:28 started worker=web-01-4879 attempt=1 ⚠ 11:10:52 exception attempt=1 No alive nodes. All the 1 nodes seem to be down. ▶ 11:11:34 started worker=web-01-4882 attempt=2 ✓ 11:11:34 completed memory=58.5MB
Custom progress events
Add the Streamable trait to emit progress from inside your job:
use Webpatser\Torque\Stream\Streamable; class ImportCsv implements ShouldQueue { use Streamable; public function handle(): void { foreach ($this->rows as $i => $row) { // process... $this->emit("Imported row {$i}", progress: $i / count($this->rows)); } } }
Reading streams programmatically
use Webpatser\Torque\Stream\JobStream; $stream = app(JobStream::class); // All events so far $events = $stream->events($uuid); // Tail (blocks, yields events as they arrive) foreach ($stream->tail($uuid) as $event) { echo $event['type'] . ': ' . ($event['data']['message'] ?? ''); } // Check completion $stream->isFinished($uuid); // true after completed/failed
Streams auto-expire after 5 minutes (configurable via job_streams.ttl).
CLI Commands
| Command | Description |
|---|---|
torque:start |
Start the master + worker processes |
torque:stop |
Graceful shutdown (SIGTERM). Use --force for SIGKILL |
torque:status |
Show worker metrics, throughput, and queue depths |
torque:monitor |
Live htop-style terminal dashboard |
torque:tail |
Tail a job's event stream in real-time |
torque:pause |
Pause job processing (in-flight jobs complete) |
torque:pause continue |
Resume processing |
torque:supervisor |
Generate a Supervisor config file |
Configuration
All options are in config/torque.php. Key settings:
| Setting | Default | Description |
|---|---|---|
workers |
4 | Number of worker processes |
coroutines_per_worker |
50 | Concurrent job slots per worker |
max_jobs_per_worker |
10000 | Restart worker after N jobs (prevents memory leaks) |
max_worker_lifetime |
3600 | Restart worker after N seconds |
block_for |
2000 | XREADGROUP block timeout (ms) |
Autoscaling
'autoscale' => [ 'enabled' => true, 'min_workers' => 2, 'max_workers' => 8, 'scale_up_threshold' => 0.85, // Scale up when 85% of slots are busy 'scale_down_threshold' => 0.20, // Scale down when 20% of slots are busy 'cooldown' => 30, // Seconds between scaling decisions ],
Connection pools
'pools' => [ 'redis' => ['size' => 30, 'idle_timeout' => 60], 'mysql' => ['size' => 20, 'idle_timeout' => 60], 'http' => ['size' => 15, 'idle_timeout' => 30], ],
Dashboard
Torque includes a Livewire 4 + Flux UI Pro dashboard at /torque (configurable).
Features:
- Real-time metrics (throughput, latency, concurrent jobs, memory)
- Worker table with coroutine slot usage bars
- Stream/queue overview with pending and delayed counts
- Failed jobs list with retry and delete actions
- Kibana-style configurable poll interval (1s to 1m, or paused)
Authorization
The dashboard is denied by default. Define the viewTorque gate in your AuthServiceProvider:
Gate::define('viewTorque', fn (User $user) => $user->isAdmin());
Dashboard middleware
Default: ['web', 'auth']. Override in config:
'dashboard' => [ 'enabled' => true, 'path' => 'torque', 'middleware' => ['web', 'auth', 'can:admin'], ],
Failed jobs
Jobs that exhaust all retries are moved to a dead-letter Redis Stream. You can:
- View them in the dashboard
- Retry or delete via dashboard or programmatically
- Listen for the
JobPermanentlyFailedevent for custom notifications
use Webpatser\Torque\Events\JobPermanentlyFailed; Event::listen(JobPermanentlyFailed::class, function ($event) { // $event->jobName, $event->queue, $event->exceptionMessage, etc. Notification::route('slack', '#alerts')->notify(new YourNotification($event)); });
Architecture
Master Process (torque:start)
├── Worker 1 (Revolt event loop)
│ ├── 50 Fibers (concurrent jobs)
│ ├── Redis Pool
│ ├── MySQL Pool
│ └── HTTP Pool
├── Worker 2
│ └── ...
├── Worker N
│ └── ...
└── AutoScaler (optional)
Redis Streams
├── torque:default (XREADGROUP consumer groups)
├── torque:default:delayed (sorted set)
├── torque:stream:dead-letter
├── torque:worker:* (per-worker stats with heartbeat TTL)
└── torque:job:* (per-job event streams, auto-expiring)
How it works
- Master spawns N worker processes via
pcntl_exec()(php artisan torque:worker) - Each worker runs a Revolt event loop with M Fiber slots
- Each Fiber loops:
XREADGROUP COUNT 1 BLOCK→ process → repeat. When the job does async I/O, the Fiber suspends and another job runs - Work-stealing: idle Fibers claim stale messages from dead consumers via
XAUTOCLAIM(per-queueretry_afteras idle threshold) - On completion:
XACK+XDEL. On failure: retry with exponential backoff or dead-letter - Each Fiber has its own dedicated Redis connection for blocking reads
Queue backend: Redis Streams
Redis Streams (not LISTs like Horizon) provide:
- Consumer groups: multiple workers, no duplicate processing
- Acknowledgment:
XACKafter success, unacked jobs auto-reclaimed viaXAUTOCLAIM - Backpressure:
XREADGROUP COUNT {slots} BLOCK {ms}pulls exactly as many jobs as available slots - Pending Entries List: Redis tracks assigned-but-unacked jobs natively
Compatibility
| Feature | Horizon | Torque |
|---|---|---|
| Queue backend | Redis LIST | Redis Streams |
| Concurrency | 1 job/process | N jobs/process (Fibers) |
| I/O model | Blocking (PDO, curl) | Non-blocking (AMPHP) |
| PHP extensions | None | None |
| Eloquent in jobs | Full support | Sync fallback (blocking) |
| Laravel Queue contract | Full | Full |
| Job batches | Yes | Yes |
| Delayed jobs | Redis sorted set | Redis sorted set |
| Dashboard | Blade + polling | Livewire 4 + Flux UI |
| Autoscaling | Balancing strategies | Slot-pressure based |
Production deployment
Generate a Supervisor config:
php artisan torque:supervisor --workers=4 --user=forge
This creates storage/torque-supervisor.conf. Copy it to your Supervisor config directory:
sudo cp storage/torque-supervisor.conf /etc/supervisor/conf.d/torque.conf sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start torque
Dependencies
Required (installed automatically):
revolt/event-loop— Fiber scheduleramphp/amp— async/await primitivesamphp/redis— Non-blocking Redis clientamphp/sync— LocalSemaphore for pool management
Optional (install when needed):
amphp/mysql— Async MySQL forMysqlPoolamphp/http-client— Async HTTP forHttpPool
License
MIT