yangusik/thrun-laravel

Laravel adapter for Thrun async queue worker

Maintainers

Package info

github.com/YanGusik/thrun_laravel

pkg:composer/yangusik/thrun-laravel

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 3

Open Issues: 3

v0.2.0 2026-06-06 10:47 UTC

This package is auto-updated.

Last update: 2026-06-06 10:47:48 UTC


README

Laravel adapter for the async queue worker Thrun. Built on real OS threads (TrueAsync) and provides two workflow styles: clean architecture (recommended) and self-handling jobs (for simple tasks).

Benchmarks

Measured on WSL2, 8GB RAM, PHP 8.6 TrueAsync fork:

Scenario Config Jobs Time Throughput RSS
Horizon IO 12 workers 1,000 12.1s 83/s 872 MB
Thrun IO 1 thread, 100 coroutines 1,000 2.3s 434/s 80 MB
Horizon IO 12 workers 10,000 55.0s 182/s 1019 MB
Thrun IO 1 thread, 100 coroutines 10,000 6.3s 1580/s 84 MB
Horizon CPU 12 workers 100 18.4s 5.4/s 1022 MB
Thrun CPU 12 threads 100 16.3s 6.1/s 100 MB
Horizon CPU 12 workers 1,000 162.6s 6.2/s 1023 MB
Thrun CPU 12 threads 1,000 139.5s 7.2/s 101 MB
Horizon NOOP 12 workers 1,000 5.0s 198/s 656 MB
Thrun NOOP 12 threads 1,000 2.3s 434/s 103 MB

TrueAsync 12x10 uses 17x less RSS than Horizon 12 workers, 11x more IO throughput.

Installation

composer require yangusik/thrun-laravel

Publish the configuration file:

php artisan vendor:publish --tag=thrun-config

Configuration (config/thrun.php)

Redis

'redis' => [
    'host'    => env('THRUN_REDIS_HOST', '127.0.0.1'),
    'port'    => (int) env('THRUN_REDIS_PORT', 6379),
    'prefix'  => env('THRUN_REDIS_PREFIX', 'thrun:queue'),
    'timeout' => 1.0,
],

Note: prefix affects Redis keys. Default is thrun:queue, but you can change it when multiple environments share the same Redis instance.

Queues

Each queue is a separate transport. Currently supported: redis and memory.

'queues' => [
    'emails'        => ['transport' => 'redis'],
    'notifications' => ['transport' => 'redis'],
],

Supervisors

Each supervisor is an isolated worker group with its own queues, strategy, and policies.

'supervisors' => [
    'default' => [
        'queues'    => ['emails', 'notifications'],
        'worker'    => ['threads' => 2, 'concurrency' => 100],
        'supervisor'=> ['max_crashes' => 3, 'restart_window' => 300, 'restart_backoff' => 1.0],
        'strategy'  => ['class' => PriorityStrategy::class, 'priorities' => ['emails' => 3, 'notifications' => 1]],
        'policy'    => ['enabled' => false, 'class' => MaxConcurrencyPolicy::class, 'options' => ['max_per_partition' => 5]],
        'handlers'  => [],              // manual routing
    ],
],

Failed Jobs

When a message exhausts all retries, it is sent to the global failed job store (configured separately from supervisors):

'failed' => [
    'driver' => env('THRUN_FAILED_DRIVER', 'redis'),
    'redis' => [
        'prefix' => env('THRUN_FAILED_PREFIX', 'thrun:failed'),
    ],
],

Supported drivers: redis (default) and null (no-op).

You can register custom failed job drivers:

use Thrun\Laravel\Transport\TransportFactory;

$factory = app(TransportFactory::class);
$factory->extendFailed('database', function (array $config) {
    return new DatabaseFailedJobSender(
        table: $config['database']['table'] ?? 'thrun_failed_jobs',
    );
});

Auto-discover

'auto_discover' => [
    'App\\Handlers',   // regular Handler classes
    'App\\Jobs',       // self-handling Job classes
],

Two Workflow Styles

1. Clean Architecture (recommended) — Message + Handler

Message is a DTO with data. Handler is an invokable class with business logic. Pure Symfony-style separation of concerns.

Message:

namespace App\Messages;

use Thrun\Laravel\Handler\Attribute\Delay;
use Thrun\Laravel\Handler\Attribute\Queue;
use Thrun\Laravel\Handler\Attribute\Retry;

#[Queue('emails')]
#[Retry(backoff: [1000, 2000, 4000], maxAttempts: 3)]
#[Delay(5000)]
final readonly class SendEmailMessage
{
    public function __construct(
        public string $to,
        public string $subject,
    ) {}
}

Handler:

namespace App\Handlers;

use App\Messages\SendEmailMessage;
use Thrun\Laravel\Handler\AsThrunHandler;
use Thrun\Worker\Acknowledger;

#[AsThrunHandler] // auto-wired to SendEmailMessage
final class SendEmailHandler
{
    public function __construct(private MailerInterface $mailer) {}

    public function __invoke(SendEmailMessage $message, Acknowledger $ack): void
    {
        $this->mailer->send($message->to, $message->subject);
        $ack->ack();
    }
}

Dispatch:

use Thrun\Laravel\Bus\ThrunMessageBus;
use Thrun\Laravel\Bus\DispatchOptions;

$bus->dispatch(new SendEmailMessage('user@test.com', 'Hello'));

// or with option override:
$bus->dispatch(
    new SendEmailMessage('user@test.com', 'Hello'),
    'emails',
    new DispatchOptions(delayMs: 10_000, messageId: 'email-42'),
);

2. Alternative — Self-handling Job

A single class acts as both Message and Handler. Data goes in the constructor, logic in __invoke(). For simple tasks when you don't want extra files.

namespace App\Jobs;

use Thrun\Laravel\Handler\Attribute\Queue;
use Thrun\Laravel\Handler\Attribute\Retry;
use Thrun\Laravel\Handler\Attribute\ThrunJob;
use Thrun\Worker\Acknowledger;

#[ThrunJob]
#[Queue('emails')]
#[Retry(backoff: [1000, 2000, 4000], maxAttempts: 3)]
final readonly class SendEmailJob
{
    public function __construct(
        public string $to,
        public string $subject,
    ) {}

    public function __invoke(MailerInterface $mailer, Acknowledger $ack): void
    {
        $mailer->send($this->to, $this->subject);
        $ack->ack();
    }
}
// queue is taken from #[Queue] attribute
$bus->dispatch(new SendEmailJob('user@test.com', 'Hello'));

// or override:
$bus->dispatch(new SendEmailJob('user@test.com', 'Hello'), 'urgent-emails');

Important: the constructor accepts scalar data only (int, string, array, etc.) — these values get serialized to Redis. Never inject services or objects into the constructor. All services are injected via DI in __invoke() — Laravel Container::call() resolves them automatically.

Attributes

Attribute Purpose Applies to
#[ThrunJob] Self-handling job marker Job
#[Queue('emails')] Default queue for the message Job / Message
#[Retry(backoff: [...], maxAttempts: 3)] Retry policy Job / Message
#[Delay(5000)] Delay in ms Job / Message
#[Timeout(30000)] Hard execution timeout Job / Message
#[AsThrunHandler(messageClass: ...)] Explicit Handler → Message binding Handler

Dispatch priority:

  1. dispatch() argument (explicit)
  2. #[Queue] attribute on class
  3. 'default'

Acknowledger

Acknowledger is the explicit processing acknowledgement object.

public function __invoke(MyMessage $message, Acknowledger $ack): void
{
    // ... logic ...
    $ack->ack();   // confirm success
    // $ack->nack(); // reject (goes to retry or failure transport)
}

Recommendation: always accept Acknowledger $ack explicitly and call $ack->ack(). This gives you full control over the message lifecycle.

Auto-discover

Class scanning happens automatically for namespaces listed in auto_discover.

Handlers

  • #[AsThrunHandler] — explicit binding to messageClass
  • Naming convention: SendEmailHandlerSendEmailMessage

Self-handling Jobs

  • #[ThrunJob] on an invokable class — the class registers as its own handler

Middleware

You can register worker middleware per supervisor via config. Classes are resolved through the Laravel container (constructor injection is supported).

'supervisors' => [
    'default' => [
        // ...
        'middleware' => [
            \App\Middleware\LogMiddleware::class,
            \App\Middleware\MetricsMiddleware::class,
        ],
    ],
],

A middleware must implement WorkerMiddlewareInterface from the core thrun package:

namespace App\Middleware;

use Thrun\Worker\Acknowledger;
use Thrun\Contract\WorkerMiddlewareInterface;

final class LogMiddleware implements WorkerMiddlewareInterface
{
    public function handle(object $message, Acknowledger $ack, \Closure $next): void
    {
        try {
            $next($message, $ack);
        } catch (\Throwable $e) {
            // log, metrics, etc.
            throw $e;
        }
    }
}

Failed Jobs & CLI

When a message exhausts all retries, it is persisted to the failed job store (configured in config/thrun.php under failed).

List failed jobs

php artisan thrun:failed
php artisan thrun:failed --queue=emails
php artisan thrun:failed --limit=100

Show details of a failed job

php artisan thrun:failed:show 019e9c83-c3d7-7216-b37d-04b1c154a5c8

Shows: type, queue, exception, message, file, line, full trace, payload, stamps.

Retry a failed job

php artisan thrun:retry 019e9c83-c3d7-7216-b37d-04b1c154a5c8
php artisan thrun:retry --all

Retry creates a new message with a fresh JobIdStamp but preserves MessageIdStamp.

Flush failed jobs

php artisan thrun:failed:flush

Flush queues

# Flush a specific queue (ready, processing, delayed)
php artisan thrun:flush emails

# Flush all configured queues
php artisan thrun:flush

# Flush queues + failed jobs
php artisan thrun:flush --failed

Running the Worker

# All supervisors
php artisan thrun:work

# Single supervisor
php artisan thrun:work --supervisor=default

# With stats
php artisan thrun:work --stats

DispatchOptions

Explicit stamp control when dispatching (overrides class attributes):

use Thrun\Laravel\Bus\DispatchOptions;

$bus->dispatch($message, 'emails', new DispatchOptions(
    messageId: 'uuid-42',
    delayMs: 5000,
    retryBackoff: [1000, 2000, 4000],
    maxAttempts: 3,
    timeoutMs: 30000,
));

For edge cases you can use dispatchCustom() with a ready-made Envelope:

use Thrun\Envelope\Envelope;
use Thrun\Envelope\Stamp\QueueStamp;

$bus->dispatchCustom(
    Envelope::wrap($message, new QueueStamp('custom')),
    'emails',
);

Message IDs

You can generate dynamic message IDs directly from the message payload using IdentifiableMessage:

use Thrun\Laravel\Contract\IdentifiableMessage;

#[Queue('emails')]
final readonly class SendEmailMessage implements IdentifiableMessage
{
    public function __construct(
        public string $to,
        public string $subject,
        public int $userId,
        public int $productId,
    ) {}

    public function getId(): string
    {
        return "{$this->userId}-{$this->productId}";
    }
}
$bus->dispatch(new SendEmailMessage('a@b.com', 'Hi', 42, 7));
// messageId = "42-7" automatically

Priority:

  1. DispatchOptions->messageId
  2. IdentifiableMessage->getId()
  3. null (no ID)

Fluent Builder

For quick one-off overrides without creating a DispatchOptions object:

$bus->builder()
    ->id('custom-42')
    ->retry([1000, 2000], 3)
    ->delay(5000)
    ->timeout(30000)
    ->send($message, 'emails');

Extending Transports

You can register custom transports (e.g. RabbitMQ) via closures or config drivers.

Closure driver

use Thrun\Laravel\Transport\TransportFactory;

$factory = app(TransportFactory::class);
$factory->extend('rabbitmq', function (string $name, array $config) {
    return new RabbitMQTransport(
        host: $config['host'],
        port: $config['port'],
        queue: $name,
    );
});

Config-based driver

'queues' => [
    'orders' => [
        'transport' => 'custom',
        'driver'    => \App\Transport\RabbitMQTransport::class,
        'host'      => 'localhost',
    ],
],

The factory will try to resolve the class via Laravel Container::make() with ['name' => ..., 'config' => ...], or fall back to new $driverClass($name, $config).

Requirements

  • PHP (TrueAsync Core) ^8.6
  • Laravel ^11.0
  • ext-async (TrueAsync extension)
  • phpredis (TrueAsync fork) (if using Redis transport)