yangusik / thrun-laravel
Laravel adapter for Thrun async queue worker
Requires
- php: ^8.4
- ext-pcntl: *
- ext-redis: *
- illuminate/console: ^11.0 || ^13.0
- illuminate/container: ^11.0 || ^13.0
- illuminate/support: ^11.0 || ^13.0
- yangusik/thrun: *
This package is auto-updated.
Last update: 2026-06-06 10:47:48 UTC
README
Laravel adapter for the async queue worker Thrun. Built on real OS threads (TrueAsync) and provides two workflow styles: clean architecture (recommended) and self-handling jobs (for simple tasks).
Benchmarks
Measured on WSL2, 8GB RAM, PHP 8.6 TrueAsync fork:
| Scenario | Config | Jobs | Time | Throughput | RSS |
|---|---|---|---|---|---|
| Horizon IO | 12 workers | 1,000 | 12.1s | 83/s | 872 MB |
| Thrun IO | 1 thread, 100 coroutines | 1,000 | 2.3s | 434/s | 80 MB |
| Horizon IO | 12 workers | 10,000 | 55.0s | 182/s | 1019 MB |
| Thrun IO | 1 thread, 100 coroutines | 10,000 | 6.3s | 1580/s | 84 MB |
| Horizon CPU | 12 workers | 100 | 18.4s | 5.4/s | 1022 MB |
| Thrun CPU | 12 threads | 100 | 16.3s | 6.1/s | 100 MB |
| Horizon CPU | 12 workers | 1,000 | 162.6s | 6.2/s | 1023 MB |
| Thrun CPU | 12 threads | 1,000 | 139.5s | 7.2/s | 101 MB |
| Horizon NOOP | 12 workers | 1,000 | 5.0s | 198/s | 656 MB |
| Thrun NOOP | 12 threads | 1,000 | 2.3s | 434/s | 103 MB |
TrueAsync 12x10 uses 17x less RSS than Horizon 12 workers, 11x more IO throughput.
Installation
composer require yangusik/thrun-laravel
Publish the configuration file:
php artisan vendor:publish --tag=thrun-config
Configuration (config/thrun.php)
Redis
'redis' => [ 'host' => env('THRUN_REDIS_HOST', '127.0.0.1'), 'port' => (int) env('THRUN_REDIS_PORT', 6379), 'prefix' => env('THRUN_REDIS_PREFIX', 'thrun:queue'), 'timeout' => 1.0, ],
Note:
prefixaffects Redis keys. Default isthrun:queue, but you can change it when multiple environments share the same Redis instance.
Queues
Each queue is a separate transport. Currently supported: redis and memory.
'queues' => [ 'emails' => ['transport' => 'redis'], 'notifications' => ['transport' => 'redis'], ],
Supervisors
Each supervisor is an isolated worker group with its own queues, strategy, and policies.
'supervisors' => [ 'default' => [ 'queues' => ['emails', 'notifications'], 'worker' => ['threads' => 2, 'concurrency' => 100], 'supervisor'=> ['max_crashes' => 3, 'restart_window' => 300, 'restart_backoff' => 1.0], 'strategy' => ['class' => PriorityStrategy::class, 'priorities' => ['emails' => 3, 'notifications' => 1]], 'policy' => ['enabled' => false, 'class' => MaxConcurrencyPolicy::class, 'options' => ['max_per_partition' => 5]], 'handlers' => [], // manual routing ], ],
Failed Jobs
When a message exhausts all retries, it is sent to the global failed job store (configured separately from supervisors):
'failed' => [ 'driver' => env('THRUN_FAILED_DRIVER', 'redis'), 'redis' => [ 'prefix' => env('THRUN_FAILED_PREFIX', 'thrun:failed'), ], ],
Supported drivers: redis (default) and null (no-op).
You can register custom failed job drivers:
use Thrun\Laravel\Transport\TransportFactory; $factory = app(TransportFactory::class); $factory->extendFailed('database', function (array $config) { return new DatabaseFailedJobSender( table: $config['database']['table'] ?? 'thrun_failed_jobs', ); });
Auto-discover
'auto_discover' => [ 'App\\Handlers', // regular Handler classes 'App\\Jobs', // self-handling Job classes ],
Two Workflow Styles
1. Clean Architecture (recommended) — Message + Handler
Message is a DTO with data. Handler is an invokable class with business logic. Pure Symfony-style separation of concerns.
Message:
namespace App\Messages; use Thrun\Laravel\Handler\Attribute\Delay; use Thrun\Laravel\Handler\Attribute\Queue; use Thrun\Laravel\Handler\Attribute\Retry; #[Queue('emails')] #[Retry(backoff: [1000, 2000, 4000], maxAttempts: 3)] #[Delay(5000)] final readonly class SendEmailMessage { public function __construct( public string $to, public string $subject, ) {} }
Handler:
namespace App\Handlers; use App\Messages\SendEmailMessage; use Thrun\Laravel\Handler\AsThrunHandler; use Thrun\Worker\Acknowledger; #[AsThrunHandler] // auto-wired to SendEmailMessage final class SendEmailHandler { public function __construct(private MailerInterface $mailer) {} public function __invoke(SendEmailMessage $message, Acknowledger $ack): void { $this->mailer->send($message->to, $message->subject); $ack->ack(); } }
Dispatch:
use Thrun\Laravel\Bus\ThrunMessageBus; use Thrun\Laravel\Bus\DispatchOptions; $bus->dispatch(new SendEmailMessage('user@test.com', 'Hello')); // or with option override: $bus->dispatch( new SendEmailMessage('user@test.com', 'Hello'), 'emails', new DispatchOptions(delayMs: 10_000, messageId: 'email-42'), );
2. Alternative — Self-handling Job
A single class acts as both Message and Handler. Data goes in the constructor, logic in __invoke(). For simple tasks when you don't want extra files.
namespace App\Jobs; use Thrun\Laravel\Handler\Attribute\Queue; use Thrun\Laravel\Handler\Attribute\Retry; use Thrun\Laravel\Handler\Attribute\ThrunJob; use Thrun\Worker\Acknowledger; #[ThrunJob] #[Queue('emails')] #[Retry(backoff: [1000, 2000, 4000], maxAttempts: 3)] final readonly class SendEmailJob { public function __construct( public string $to, public string $subject, ) {} public function __invoke(MailerInterface $mailer, Acknowledger $ack): void { $mailer->send($this->to, $this->subject); $ack->ack(); } }
// queue is taken from #[Queue] attribute $bus->dispatch(new SendEmailJob('user@test.com', 'Hello')); // or override: $bus->dispatch(new SendEmailJob('user@test.com', 'Hello'), 'urgent-emails');
Important: the constructor accepts scalar data only (
int,string,array, etc.) — these values get serialized to Redis. Never inject services or objects into the constructor. All services are injected via DI in__invoke()— LaravelContainer::call()resolves them automatically.
Attributes
| Attribute | Purpose | Applies to |
|---|---|---|
#[ThrunJob] |
Self-handling job marker | Job |
#[Queue('emails')] |
Default queue for the message | Job / Message |
#[Retry(backoff: [...], maxAttempts: 3)] |
Retry policy | Job / Message |
#[Delay(5000)] |
Delay in ms | Job / Message |
#[Timeout(30000)] |
Hard execution timeout | Job / Message |
#[AsThrunHandler(messageClass: ...)] |
Explicit Handler → Message binding | Handler |
Dispatch priority:
dispatch()argument (explicit)#[Queue]attribute on class'default'
Acknowledger
Acknowledger is the explicit processing acknowledgement object.
public function __invoke(MyMessage $message, Acknowledger $ack): void { // ... logic ... $ack->ack(); // confirm success // $ack->nack(); // reject (goes to retry or failure transport) }
Recommendation: always accept
Acknowledger $ackexplicitly and call$ack->ack(). This gives you full control over the message lifecycle.
Auto-discover
Class scanning happens automatically for namespaces listed in auto_discover.
Handlers
#[AsThrunHandler]— explicit binding tomessageClass- Naming convention:
SendEmailHandler→SendEmailMessage
Self-handling Jobs
#[ThrunJob]on an invokable class — the class registers as its own handler
Middleware
You can register worker middleware per supervisor via config. Classes are resolved through the Laravel container (constructor injection is supported).
'supervisors' => [ 'default' => [ // ... 'middleware' => [ \App\Middleware\LogMiddleware::class, \App\Middleware\MetricsMiddleware::class, ], ], ],
A middleware must implement WorkerMiddlewareInterface from the core thrun package:
namespace App\Middleware; use Thrun\Worker\Acknowledger; use Thrun\Contract\WorkerMiddlewareInterface; final class LogMiddleware implements WorkerMiddlewareInterface { public function handle(object $message, Acknowledger $ack, \Closure $next): void { try { $next($message, $ack); } catch (\Throwable $e) { // log, metrics, etc. throw $e; } } }
Failed Jobs & CLI
When a message exhausts all retries, it is persisted to the failed job store (configured in config/thrun.php under failed).
List failed jobs
php artisan thrun:failed php artisan thrun:failed --queue=emails php artisan thrun:failed --limit=100
Show details of a failed job
php artisan thrun:failed:show 019e9c83-c3d7-7216-b37d-04b1c154a5c8
Shows: type, queue, exception, message, file, line, full trace, payload, stamps.
Retry a failed job
php artisan thrun:retry 019e9c83-c3d7-7216-b37d-04b1c154a5c8 php artisan thrun:retry --all
Retry creates a new message with a fresh JobIdStamp but preserves MessageIdStamp.
Flush failed jobs
php artisan thrun:failed:flush
Flush queues
# Flush a specific queue (ready, processing, delayed) php artisan thrun:flush emails # Flush all configured queues php artisan thrun:flush # Flush queues + failed jobs php artisan thrun:flush --failed
Running the Worker
# All supervisors php artisan thrun:work # Single supervisor php artisan thrun:work --supervisor=default # With stats php artisan thrun:work --stats
DispatchOptions
Explicit stamp control when dispatching (overrides class attributes):
use Thrun\Laravel\Bus\DispatchOptions; $bus->dispatch($message, 'emails', new DispatchOptions( messageId: 'uuid-42', delayMs: 5000, retryBackoff: [1000, 2000, 4000], maxAttempts: 3, timeoutMs: 30000, ));
For edge cases you can use dispatchCustom() with a ready-made Envelope:
use Thrun\Envelope\Envelope; use Thrun\Envelope\Stamp\QueueStamp; $bus->dispatchCustom( Envelope::wrap($message, new QueueStamp('custom')), 'emails', );
Message IDs
You can generate dynamic message IDs directly from the message payload using IdentifiableMessage:
use Thrun\Laravel\Contract\IdentifiableMessage; #[Queue('emails')] final readonly class SendEmailMessage implements IdentifiableMessage { public function __construct( public string $to, public string $subject, public int $userId, public int $productId, ) {} public function getId(): string { return "{$this->userId}-{$this->productId}"; } }
$bus->dispatch(new SendEmailMessage('a@b.com', 'Hi', 42, 7)); // messageId = "42-7" automatically
Priority:
DispatchOptions->messageIdIdentifiableMessage->getId()null(no ID)
Fluent Builder
For quick one-off overrides without creating a DispatchOptions object:
$bus->builder() ->id('custom-42') ->retry([1000, 2000], 3) ->delay(5000) ->timeout(30000) ->send($message, 'emails');
Extending Transports
You can register custom transports (e.g. RabbitMQ) via closures or config drivers.
Closure driver
use Thrun\Laravel\Transport\TransportFactory; $factory = app(TransportFactory::class); $factory->extend('rabbitmq', function (string $name, array $config) { return new RabbitMQTransport( host: $config['host'], port: $config['port'], queue: $name, ); });
Config-based driver
'queues' => [ 'orders' => [ 'transport' => 'custom', 'driver' => \App\Transport\RabbitMQTransport::class, 'host' => 'localhost', ], ],
The factory will try to resolve the class via Laravel Container::make() with ['name' => ..., 'config' => ...], or fall back to new $driverClass($name, $config).
Requirements
- PHP (TrueAsync Core) ^8.6
- Laravel ^11.0
- ext-async (TrueAsync extension)
- phpredis (TrueAsync fork) (if using Redis transport)