yangusik / thrun
Async queue worker engine for PHP with native threads + coroutines via TrueAsync.
Requires
- php: >=8.4
- ext-pcntl: *
- ramsey/uuid: ^4.9
Requires (Dev)
- testo/testo: ^0.10.2
This package is auto-updated.
Last update: 2026-06-06 10:43:58 UTC
README
⚠️ Work in progress
This library is actively developed. APIs may change between commits.
Async queue worker for PHP built on TrueAsync — an alternative PHP core that implements true asynchrony by modifying the Zend engine, I/O libraries, database and socket handling.
Goal
The fastest async queue worker for PHP — one worker process that handles both IO-bound and CPU-bound tasks efficiently. Uses real OS threads instead of forked processes, consumes significantly less memory, and aims to outperform Symfony Messenger and Laravel Horizon.
Benchmarks
Measured on WSL2, 8GB RAM, PHP 8.6 TrueAsync fork:
| Scenario | IO throughput | CPU throughput | Stable RSS |
|---|---|---|---|
| Horizon 1 worker | 18/s | 73/s | 72 MB |
| Horizon 12 workers | 210/s | 514/s | 949 MB |
| TrueAsync 1x100 | 1,869/s | 452/s | 44 MB |
| TrueAsync 12x10 | 2,355/s | 2,059/s | 54 MB |
TrueAsync 12x10 uses 17x less RSS than Horizon 12 workers, 11x more IO throughput.
Requirements
- TrueAsync PHP 8.6+ (
trueasync/php-true-asyncDocker image) - ext-pcntl (for signal handling)
- ext-redis (Edmond's TrueAsync-compatible fork for Redis transport)
Installation
composer require yangusik/thrun
Package is in development and may change name.
Quick Start
use Thrun\Envelope\Envelope; use Thrun\Supervisor\Supervisor; use Thrun\Supervisor\SupervisorOptions; use Thrun\Transport\InMemory\InMemoryTransport; use Thrun\Worker\Worker; use Thrun\Worker\WorkerOptions; $transport = new InMemoryTransport(); // Send messages $transport->send(Envelope::wrap(new SendEmailMessage('user@example.com', 'Hello'))); $supervisor = new Supervisor( workerFactory: fn() => new Worker( transport: $transport, handlers: [ SendEmailMessage::class => fn(SendEmailMessage $m) => mail($m->to, $m->subject, '...'), ], options: new WorkerOptions(threads: 2, concurrency: 4), ), options: new SupervisorOptions(), ); $supervisor->run();
Features
N OS Threads × M Coroutines
Each Worker spawns N real OS threads. Each thread runs a TaskGroup of M coroutines. Total concurrency is N × M. ThreadChannel provides backpressure and blocks when all coroutines are busy.
Per-Message Timeout (Hard Cancel)
Attach a TimeoutStamp to any message. If the handler exceeds the limit, it receives a hard cancellation that interrupts blocking operations like sleep, file_get_contents, or DB queries. finally blocks still execute.
use Thrun\Envelope\Stamp\TimeoutStamp; $transport->send(Envelope::wrap( new SlowApiCall(), new TimeoutStamp(5000), // 5 seconds ));
Retry with Delay
Two stamps work together:
RetryStamp— defines the retry policy (backoff intervals and max attempts).RedeliveryStamp— added automatically by the worker on every redelivery. Keeps history of attempts and timestamps.
use Thrun\Envelope\Stamp\RetryStamp; use Thrun\Envelope\Stamp\MessageIdStamp; $transport->send(Envelope::wrap( new SendEmailMessage('user@example.com', 'Hello'), new RetryStamp(backoff: [1000, 2000, 4000], maxAttempts: 3), new MessageIdStamp('msg-42'), ));
Alternatively, the handler can trigger a retry explicitly via Acknowledger without pre-attaching a stamp (see examples/retry_without_stamp.php).
Metrics
Inject a MetricsInterface to track throughput, failures, retries, timeouts, and average processing time.
use Thrun\Worker\Metrics\InMemoryMetrics; $metrics = new InMemoryMetrics(); $worker = new Worker(transport: $transport, handlers: $handlers, metrics: $metrics); // $metrics->processed, $metrics->failed, $metrics->retried, $metrics->timedOut // $metrics->averageTime()
Multi-Queue and Scheduling
One Worker can serve multiple queues simultaneously with pluggable scheduling:
RoundRobinStrategy— equal distribution across queuesPriorityStrategy— weighted credits, proportional distribution
Dispatch Policies
Limit concurrency per partition (tenant, user, etc.) via PartitionStamp:
use Thrun\Envelope\Stamp\PartitionStamp; use Thrun\Transport\Policy\MaxConcurrencyPolicy; use Thrun\Transport\PolicyAwareReceiver; $receiver = new PolicyAwareReceiver( inner: $transport, policy: new MaxConcurrencyPolicy(maxPerPartition: 5), ); $transport->send(Envelope::wrap($msg, new PartitionStamp('tenant-42')));
Graceful Shutdown
Supervisor handles SIGINT/SIGTERM gracefully. Worker::stop() cancels the scope and closes internal resources. The producer unblocks from receive(). Pending jobs complete or timeout.
Explicit Acknowledgement
Handlers can accept an Acknowledger for explicit control:
use Thrun\Worker\Acknowledger; SendEmailMessage::class => function (SendEmailMessage $m, Acknowledger $ack) { if ($m->to === 'blocked@example.com') { $ack->fail(new \RuntimeException('Blocked')); return; } // process... $ack->ack(); }
Methods: $ack->ack(), $ack->retry(int $delayMs), $ack->fail(?Throwable).
Middleware
Wrap message processing in reusable middleware:
use Thrun\Middleware\CatchMessageMiddleware; $worker = new Worker( transport: $transport, handlers: $handlers, options: new WorkerOptions(middleware: [new CatchMessageMiddleware()]), );
Stamps
Messages carry stamps for cross-cutting concerns:
| Stamp | Purpose |
|---|---|
DelayStamp |
Defer message delivery |
ErrorDetailsStamp |
Captures exception info on failure |
MessageIdStamp |
Unique message identifier |
PartitionStamp |
Partition key for dispatch policies |
QueueStamp |
Queue name (set by MultiQueueReceiver) |
RedeliveryStamp |
History of redelivery attempts |
RetryStamp |
Retry policy configuration |
TimeoutStamp |
Hard timeout for handler execution |
Redis Transport
At-least-once delivery via LMOVE from ready to processing. Delayed messages use a sorted set (ZADD). Reclaims the processing list on startup for crash recovery.
Uses a custom TrueAsync-compatible fork of
phpredis(early stage). Connection pooling must be enabled:
use Thrun\Transport\Redis\RedisTransport; use Thrun\Transport\Redis\RedisConnection; use Thrun\Serialization\JsonSerializer; $redis = new \Redis([ 'pool' => [ 'enabled' => true, 'min' => 0, 'max' => 1, 'mux' => 0, ], ]); $redis->connect('redis', 6379); $transport = new RedisTransport( connection: new RedisConnection($redis, 'thrun:queue'), serializer: new JsonSerializer(), queue: 'emails', );
Failure Transport (Dead Letter)
Exhausted retries can be sent to a separate transport for inspection:
$failureTransport = new InMemoryTransport(); // or RedisTransport $worker = new Worker( transport: $transport, handlers: $handlers, failureTransport: $failureTransport, );
Failed messages carry an ErrorDetailsStamp with the exception class, message, code, and trace.
Examples
Run examples directly if you have TrueAsync PHP installed, or via Docker (see below).
one_queue.php — basic single-queue worker
Sends six emails to one queue and processes them sequentially.
[Email] - one@example.com processed
[Email] - two@example.com processed
[Email] - three@example.com processed
[Email] - four@example.com processed
[Email] - five@example.com processed
[Email] - six@example.com processed
two_queue.php — two queues with priority
Emails and notifications share one worker. Priority strategy favors emails (weight 3) over notifications (weight 1).
[Email] - one@example.com processed
[Email] - two@example.com processed
[Email] - three@example.com processed
[Notification] - 1 processed
[Email] - four@example.com processed
[Email] - five@example.com processed
[Email] - six@example.com processed
[Notification] - 2 processed
round_robin.php — round-robin scheduling
Emails and notifications alternate evenly.
[Email] - one@example.com processed
[Notification] - 1 processed
[Email] - two@example.com processed
[Notification] - 2 processed
[Email] - three@example.com processed
...
priority.php — three queues with credits
Emails (3), ping (2), and notifications (1) compete. Higher-priority queues get more slots.
[Email] - one@example.com processed
[Ping] Pong
[Email] - two@example.com processed
[Email] - three@example.com processed
[Ping] Pong
[Email] - four@example.com processed
[Email] - five@example.com processed
[Email] - six@example.com processed
[Notification] - 1 processed
[Notification] - 2 processed
max_concurrency.php — per-partition limits
Max 2 concurrent jobs per partition. With 2 threads, the worker processes two messages in parallel, then waits for completion before taking the next batch.
[10:29:46][Email] - one@example.com pending
[10:29:46][Notification] - 1 pending
[10:29:46][Email] - one@example.com completed
[10:29:46][Notification] - 1 completed
[10:29:47][Email] - two@example.com pending
[10:29:47][Notification] - 2 pending
...
retry.php — retry with backoff
Messages retry on failure with configurable delays. RetryStamp sets the policy; RedeliveryStamp tracks each attempt.
[13:44:55][Email][times:0] - one@example.com error
[13:44:55][Email][times:0] - two@example.com error
[13:44:55][Email][times:0] - three@example.com error
[13:44:56][Email][times:1] - one@example.com error
[13:44:56][Email][times:1] - two@example.com error
[13:44:56][Email][times:1] - three@example.com error
[13:44:57][Email][times:2] - one@example.com error
[13:44:57][Email][times:2] - two@example.com processed
[13:44:57][Email][times:2] - three@example.com processed
[13:44:58][Email][times:3] - one@example.com processed
retry_without_stamp.php — explicit retry via Acknowledger
Handler decides to retry on its own; no RetryStamp is attached upfront.
job retry
job retry (1,2026-06-05T11:39:44+00:00)(first retry: 1,2026-06-05T11:39:44+00:00)
job retry (2,2026-06-05T11:39:45+00:00)(first retry: 1,2026-06-05T11:39:44+00:00)
...
middleware.php — error logging middleware
CatchMessageMiddleware prints every exception with message class and location before retry logic kicks in.
[WorkerThread][SendEmailMessage:one] Exception: Custom error (middleware.php:32)
[WorkerThread][SendEmailMessage:two] Exception: Custom error (middleware.php:32)
...
metrics.php — live metrics reporter
Real-time console output of processed / failed / retried / timed out counts plus average processing time.
Thrun Metrics Live (stop: Ctrl+C or wait 15s)
processed: 5 failed: 5 retried: 5 timed out: 0 avg: 0.04ms
processed: 9 failed: 6 retried: 5 timed out: 0 avg: 0.07ms
...
Done. Processed: 10, Failed: 6, Retried: 5
redis.php — Redis-backed transport
Pushes 20 CPU-bound jobs to Redis, then processes them across 12 threads with live progress reporting.
Connected to Redis at redis:6379
Pushed 20 jobs
pending: 20 active: 0 processed: 0 failed: 0 (this run pushed: 20)
pending: 8 active: 12 processed: 0 failed: 0 (this run pushed: 20)
...
All jobs done.
Supervisor finished.
Testing
Tests use testo. No mocking of TrueAsync internals — tests run with InMemoryTransport and real async execution.
Some transport tests require Redis running on localhost:6379.
vendor/bin/testo
Docker
If you want to test this project in Docker, here is an example setup.
Dockerfile:
FROM trueasync/php-true-async:0.7.0-alpha.9-php8.6 RUN apt-get update && apt-get install -y \ build-essential \ autoconf \ libtool \ curl \ git \ && rm -rf /var/lib/apt/lists/* # Build and install phpredis from trueasync fork RUN git clone --depth 1 --branch true-async https://github.com/true-async/phpredis.git /tmp/phpredis \ && cd /tmp/phpredis \ && phpize \ && ./configure \ && make -j$(nproc) \ && make install \ && echo 'extension=redis.so' > /etc/php.d/redis.ini \ && rm -rf /tmp/phpredis
docker-compose.yml:
services: dev: build: context: . dockerfile: Dockerfile working_dir: /app volumes: - .:/app redis: image: redis:7-alpine ports: - "6379:6379"
License
MIT