yangusik / thrun-laravel
Laravel adapter for Thrun async queue worker
Requires
- php: ^8.4
- ext-pcntl: *
- ext-redis: *
- illuminate/console: ^11.0 || ^12.0 || ^13.0
- illuminate/container: ^11.0 || ^12.0 || ^13.0
- illuminate/support: ^11.0 || ^12.0 || ^13.0
- yangusik/thrun: *
This package is auto-updated.
Last update: 2026-06-26 17:05:49 UTC
README
Laravel adapter for the async queue worker Thrun. Built on real OS threads (TrueAsync) and provides two workflow styles: clean architecture (recommended) and self-handling jobs (for simple tasks).
Benchmarks
Measured on WSL2, 8GB RAM, PHP 8.6 TrueAsync fork:
| Scenario | Config | Jobs | Time | Throughput | RSS |
|---|---|---|---|---|---|
| Horizon IO | 12 workers | 1,000 | 12.1s | 83/s | 872 MB |
| Thrun IO | 1 thread, 100 coroutines | 1,000 | 2.3s | 434/s | 80 MB |
| Horizon IO | 12 workers | 10,000 | 55.0s | 182/s | 1019 MB |
| Thrun IO | 1 thread, 100 coroutines | 10,000 | 6.3s | 1580/s | 84 MB |
| Horizon CPU | 12 workers | 100 | 18.4s | 5.4/s | 1022 MB |
| Thrun CPU | 12 threads | 100 | 16.3s | 6.1/s | 100 MB |
| Horizon CPU | 12 workers | 1,000 | 162.6s | 6.2/s | 1023 MB |
| Thrun CPU | 12 threads | 1,000 | 139.5s | 7.2/s | 101 MB |
| Horizon NOOP | 12 workers | 1,000 | 5.0s | 198/s | 656 MB |
| Thrun NOOP | 12 threads | 1,000 | 2.3s | 434/s | 103 MB |
TrueAsync 12x10 uses 17x less RSS than Horizon 12 workers, 11x more IO throughput.
Installation
composer require yangusik/thrun-laravel
Publish the configuration file:
php artisan vendor:publish --tag=thrun-config
Configuration (config/thrun.php)
Redis
'redis' => [ 'host' => env('THRUN_REDIS_HOST', '127.0.0.1'), 'port' => (int) env('THRUN_REDIS_PORT', 6379), 'prefix' => env('THRUN_REDIS_PREFIX', 'thrun:queue'), 'timeout' => 1.0, ],
Note:
prefixaffects Redis keys. Default isthrun:queue, but you can change it when multiple environments share the same Redis instance.
Queues
Each queue is a separate transport. Currently supported: redis and memory.
'queues' => [ 'emails' => ['transport' => 'redis'], 'notifications' => ['transport' => 'memory'], ],
memory transport holds jobs in process memory with no external dependencies.
Primarily useful for local development and testing when you don't want to run Redis.
All memory queues are automatically exposed on the RPC server —
external processes can push jobs into them over a socket.
Note:
memoryqueues are designed for up toqueue_sizejobs in flight. Sending significantly more jobs thanqueue_sizeallows provides no delivery guarantees. Useredisfor workloads that require guaranteed delivery.
FOR MEMORY USE:
$bus->dispatchViaRpc();
Worker configuration
'worker' => [ 'threads' => env('THRUN_WORKER_THREADS', 2), 'concurrency' => env('THRUN_WORKER_CONCURRENCY', 100), 'queue_size' => env('THRUN_WORKER_QUEUE_SIZE', 1000), ],
threads— number of OS threads. For I/O-bound workloads1is enough; increase for CPU-bound tasks.concurrency— number of coroutines per thread.queue_size— internal job buffer size. Forredisqueues this is a soft throughput limiter — Redis keeps the rest. Formemoryqueues the buffer is the only storage, so tune this to match your expected peak load.
Supervisors
Each supervisor is an isolated worker group with its own queues, strategy, and policies.
'supervisors' => [ 'default' => [ 'queues' => ['emails', 'notifications'], 'worker' => ['threads' => 2, 'concurrency' => 100, 'queue_size' => 1000], 'supervisor'=> ['max_crashes' => 3, 'restart_window' => 300, 'restart_backoff' => 1.0], 'strategy' => ['class' => PriorityStrategy::class, 'priorities' => ['emails' => 3, 'notifications' => 1]], 'policy' => ['enabled' => false, 'class' => MaxConcurrencyPolicy::class, 'options' => ['max_per_partition' => 5]], 'handlers' => [], // manual routing ], ],
RPC Server
The RPC server runs alongside the worker and enables cross-process job dispatch and event broadcasting.
'rpc' => [ 'enabled' => env('THRUN_RPC_ENABLED', true), 'transport' => env('THRUN_RPC_TRANSPORT', 'unix'), // unix | tcp 'socket_path' => env('THRUN_RPC_SOCKET', '/tmp/thrun_rpc.sock'), 'host' => env('THRUN_RPC_HOST', '127.0.0.1'), 'port' => (int) env('THRUN_RPC_PORT', 9000), ],
Failed Jobs
When a message exhausts all retries, it is sent to the global failed job store (configured separately from supervisors):
'failed' => [ 'driver' => env('THRUN_FAILED_DRIVER', 'redis'), 'redis' => [ 'prefix' => env('THRUN_FAILED_PREFIX', 'thrun:failed'), ], ],
Supported drivers: redis (default) and null (no-op).
You can register custom failed job drivers:
use Thrun\Laravel\Transport\TransportFactory; $factory = app(TransportFactory::class); $factory->extendFailed('database', function (array $config) { return new DatabaseFailedJobSender( table: $config['database']['table'] ?? 'thrun_failed_jobs', ); });
Auto-discover
'auto_discover' => [ 'App\\Handlers', // regular Handler classes 'App\\Jobs', // self-handling Job classes 'App\\Events', // event listeners ],
Two Workflow Styles
1. Clean Architecture (recommended) — Message + Handler
Message is a DTO with data. Handler is an invokable class with business logic. Pure Symfony-style separation of concerns.
Message:
namespace App\Messages; use Thrun\Laravel\Handler\Attribute\Delay; use Thrun\Laravel\Handler\Attribute\Queue; use Thrun\Laravel\Handler\Attribute\Retry; #[Queue('emails')] #[Retry(backoff: [1000, 2000, 4000], maxAttempts: 3)] #[Delay(5000)] final readonly class SendEmailMessage { public function __construct( public string $to, public string $subject, ) {} }
Handler:
namespace App\Handlers; use App\Messages\SendEmailMessage; use Thrun\Laravel\Handler\AsThrunHandler; use Thrun\Worker\Acknowledger; #[AsThrunHandler] // auto-wired to SendEmailMessage final class SendEmailHandler { public function __construct(private MailerInterface $mailer) {} public function __invoke(SendEmailMessage $message, Acknowledger $ack): void { $this->mailer->send($message->to, $message->subject); $ack->ack(); } }
Dispatch:
use Thrun\Laravel\Bus\ThrunMessageBus; use Thrun\Laravel\Bus\DispatchOptions; $bus->dispatch(new SendEmailMessage('user@test.com', 'Hello')); // or with option override: $bus->dispatch( new SendEmailMessage('user@test.com', 'Hello'), 'emails', new DispatchOptions(delayMs: 10_000, messageId: 'email-42'), );
2. Alternative — Self-handling Job
A single class acts as both Message and Handler. Data goes in the constructor, logic in __invoke(). For simple tasks when you don't want extra files.
namespace App\Jobs; use Thrun\Laravel\Handler\Attribute\Queue; use Thrun\Laravel\Handler\Attribute\Retry; use Thrun\Laravel\Handler\Attribute\ThrunJob; use Thrun\Worker\Acknowledger; #[ThrunJob] #[Queue('emails')] #[Retry(backoff: [1000, 2000, 4000], maxAttempts: 3)] final readonly class SendEmailJob { public function __construct( public string $to, public string $subject, ) {} public function __invoke(MailerInterface $mailer, Acknowledger $ack): void { $mailer->send($this->to, $this->subject); $ack->ack(); } }
// queue is taken from #[Queue] attribute $bus->dispatch(new SendEmailJob('user@test.com', 'Hello')); // or override: $bus->dispatch(new SendEmailJob('user@test.com', 'Hello'), 'urgent-emails');
Important: the constructor accepts scalar data only (
int,string,array, etc.) — these values get serialized to Redis. Never inject services or objects into the constructor. All services are injected via DI in__invoke()— LaravelContainer::call()resolves them automatically.
Attributes
| Attribute | Purpose | Applies to |
|---|---|---|
#[ThrunJob] |
Self-handling job marker | Job |
#[Queue('emails')] |
Default queue for the message | Job / Message |
#[Retry(backoff: [...], maxAttempts: 3)] |
Retry policy | Job / Message |
#[Delay(5000)] |
Delay in ms | Job / Message |
#[Timeout(30000)] |
Hard execution timeout | Job / Message |
#[AsThrunHandler(messageClass: ...)] |
Explicit Handler → Message binding | Handler |
#[ThrunEventListener('order.completed')] |
Event listener binding | Event Listener |
Dispatch priority:
dispatch()argument (explicit)#[Queue]attribute on class'default'
Acknowledger
Acknowledger is the explicit processing acknowledgement object.
public function __invoke(MyMessage $message, Acknowledger $ack): void { // ... logic ... $ack->ack(); // confirm success // $ack->nack(); // reject (goes to retry or failure transport) }
Recommendation: always accept
Acknowledger $ackexplicitly and call$ack->ack(). This gives you full control over the message lifecycle.
Events
Thrun provides a lightweight pub/sub event system over the RPC socket. Events are ephemeral (fire-and-forget, no persistence) — use jobs for guaranteed delivery.
Emitting an event from a handler
use Thrun\Laravel\Rpc\RpcPublisher; final class ProcessOrderHandler { public function __construct(private RpcPublisher $rpc) {} public function __invoke(ProcessOrderMessage $message, Acknowledger $ack): void { // ... business logic ... $this->rpc->emit('order.completed', ['order_id' => $message->orderId]); $ack->ack(); } }
Registering a listener
namespace App\Events; use Thrun\Laravel\Event\Attribute\ThrunEventListener; #[ThrunEventListener('order.completed')] final class OrderCompletedListener { public function __construct(private readonly MailService $mail) {} public function __invoke(array $payload): void { $this->mail->sendConfirmation($payload['order_id']); } }
Wildcard subscriptions:
#[ThrunEventListener('*')] // all events #[ThrunEventListener('payment.*')] // all payment.* events #[ThrunEventListener] // uses class name as identifier (PHP-only)
Running the listener process
php artisan thrun:event # subscribe to specific patterns php artisan thrun:event --subscribe=order.completed --subscribe=payment.*
Emitting from another language (Go example)
payload := map[string]any{ "event": "order.completed", "data": map[string]any{"order_id": 123}, } body, _ := json.Marshal(payload) // FrameType::Event = 0x02 frame := make([]byte, 5+len(body)) binary.BigEndian.PutUint32(frame[:4], uint32(len(body))) frame[4] = 0x02 copy(frame[5:], body) conn.Write(frame)
Wire Protocol
All RPC communication uses a simple length-prefixed binary framing: [4 bytes BE uint32 — payload length][1 byte — frame type][N bytes — JSON payload]
| Type | Byte | Direction | Purpose |
|---|---|---|---|
Job |
0x01 |
client → server | Push job into a local memory queue |
Event |
0x02 |
client → server | Broadcast event to subscribers |
Subscribe |
0x03 |
client → server | Register interest in an event name/pattern |
RpcRequest |
0x04 |
client → server | Synchronous call, expects RpcReply |
RpcReply |
0x05 |
server → client | Response to RpcRequest |
Error |
0x06 |
server → client | Error response |
Auto-discover
| Convention | Result |
|---|---|
#[AsThrunHandler] on handler class |
Explicit binding to messageClass |
SendEmailHandler → SendEmailMessage |
Naming convention auto-wire |
#[ThrunJob] on invokable class |
Class registers as its own handler |
#[ThrunEventListener('event.name')] |
Binds listener to event name/pattern |
Middleware
You can register worker middleware per supervisor via config. Classes are resolved through the Laravel container (constructor injection is supported).
'supervisors' => [ 'default' => [ // ... 'middleware' => [ \App\Middleware\LogMiddleware::class, \App\Middleware\MetricsMiddleware::class, ], ], ],
A middleware must implement WorkerMiddlewareInterface from the core thrun package:
namespace App\Middleware; use Thrun\Worker\Acknowledger; use Thrun\Contract\WorkerMiddlewareInterface; final class LogMiddleware implements WorkerMiddlewareInterface { public function handle(object $message, Acknowledger $ack, \Closure $next): void { try { $next($message, $ack); } catch (\Throwable $e) { // log, metrics, etc. throw $e; } } }
Failed Jobs & CLI
When a message exhausts all retries, it is persisted to the failed job store (configured in config/thrun.php under failed).
List failed jobs
php artisan thrun:failed php artisan thrun:failed --queue=emails php artisan thrun:failed --limit=100
Show details of a failed job
php artisan thrun:failed:show 019e9c83-c3d7-7216-b37d-04b1c154a5c8
Shows: type, queue, exception, message, file, line, full trace, payload, stamps.
Retry a failed job
php artisan thrun:retry 019e9c83-c3d7-7216-b37d-04b1c154a5c8 php artisan thrun:retry --all
Retry creates a new message with a fresh JobIdStamp but preserves MessageIdStamp.
Flush failed jobs
php artisan thrun:failed:flush
Flush queues
# Flush a specific queue (ready, processing, delayed) php artisan thrun:flush emails # Flush all configured queues php artisan thrun:flush # Flush queues + failed jobs php artisan thrun:flush --failed
Running the Worker
php artisan thrun:work # all supervisors + RPC server php artisan thrun:work --supervisor=X # single supervisor php artisan thrun:work --stats # with real-time throughput stats php artisan thrun:work --no-rpc # without RPC server
Commands Reference
| Command | Description |
|---|---|
thrun:work |
Start all supervisors + RPC server |
thrun:work --supervisor=X |
Start a single supervisor + RPC server |
thrun:work --stats |
Real-time throughput stats |
thrun:work --no-rpc |
Disable RPC server for this process |
thrun:event |
Listen and dispatch incoming events |
thrun:event --subscribe=X |
Subscribe to specific pattern |
thrun:failed |
List failed jobs |
thrun:failed:show {id} |
Show details of a failed job |
thrun:retry {id} |
Retry a specific failed job |
thrun:retry --all |
Retry all failed jobs |
thrun:failed:flush |
Delete all failed jobs |
thrun:flush {queue?} |
Flush queue(s) |
thrun:flush --failed |
Flush queues + failed jobs |
DispatchOptions
Explicit stamp control when dispatching (overrides class attributes):
use Thrun\Laravel\Bus\DispatchOptions; $bus->dispatch($message, 'emails', new DispatchOptions( messageId: 'uuid-42', delayMs: 5000, retryBackoff: [1000, 2000, 4000], maxAttempts: 3, timeoutMs: 30000, ));
For edge cases you can use dispatchCustom() with a ready-made Envelope:
use Thrun\Envelope\Envelope; use Thrun\Envelope\Stamp\QueueStamp; $bus->dispatchCustom( Envelope::wrap($message, new QueueStamp('custom')), 'emails', );
Message IDs
You can generate dynamic message IDs directly from the message payload using IdentifiableMessage:
use Thrun\Laravel\Contract\IdentifiableMessage; #[Queue('emails')] final readonly class SendEmailMessage implements IdentifiableMessage { public function __construct( public string $to, public string $subject, public int $userId, public int $productId, ) {} public function getId(): string { return "{$this->userId}-{$this->productId}"; } }
$bus->dispatch(new SendEmailMessage('a@b.com', 'Hi', 42, 7)); // messageId = "42-7" automatically
Priority:
DispatchOptions->messageIdIdentifiableMessage->getId()null(no ID)
Fluent Builder
For quick one-off overrides without creating a DispatchOptions object:
$bus->builder() ->id('custom-42') ->retry([1000, 2000], 3) ->delay(5000) ->timeout(30000) ->send($message, 'emails');
Extending Transports
You can register custom transports (e.g. RabbitMQ) via closures or config drivers.
Closure driver
use Thrun\Laravel\Transport\TransportFactory; $factory = app(TransportFactory::class); $factory->extend('rabbitmq', function (string $name, array $config) { return new RabbitMQTransport( host: $config['host'], port: $config['port'], queue: $name, ); });
Config-based driver
'queues' => [ 'orders' => [ 'transport' => 'custom', 'driver' => \App\Transport\RabbitMQTransport::class, 'host' => 'localhost', ], ],
The factory will try to resolve the class via Laravel Container::make() with ['name' => ..., 'config' => ...], or fall back to new $driverClass($name, $config).
Requirements
- PHP (TrueAsync Core) ^8.6
- Laravel ^11.0
- ext-async (TrueAsync extension)
- ext-phpredis (TrueAsync fork) (if using Redis transport)