yangusik/laravel-balanced-queue

Laravel queue management with load balancing between partitions (user groups)

Installs: 19

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/yangusik/laravel-balanced-queue

v1.0.1 2026-01-16 08:53 UTC

This package is auto-updated.

Last update: 2026-01-16 08:53:35 UTC


README

A Laravel package for queue management with load balancing between partitions (user groups). Perfect for scenarios where you need fair job distribution and concurrency control per user/tenant.

Problem Solved

Imagine you have an AI generation service where users can submit unlimited tasks. Without balanced queuing:

  • One user can flood the queue and block everyone else
  • No control over how many concurrent tasks a single user can run
  • Resource-heavy users can exhaust API rate limits

Laravel Balanced Queue solves this by:

  • Distributing jobs fairly across all users (round-robin)
  • Limiting concurrent jobs per user (e.g., max 2 AI generations per user)
  • Never rejecting jobs - they queue up and execute eventually
  • Preventing single users from monopolizing workers

How It Works

The Problem

Standard Laravel Queue (FIFO):

Queue: [A1][A2][A3][A4][A5][A6][A7][A8][B1][B2][C1][C2]
        ─────────────────────────────→ time

Execution order: A1 → A2 → A3 → A4 → A5 → A6 → A7 → A8 → B1 → B2 → C1 → C2

User A submitted 8 tasks first.
User B and C must wait until ALL of User A's tasks complete!

The Solution

Balanced Queue partitions jobs by user and rotates between them:

Balanced Queue (partitioned):

Partition A: [A1][A2][A3][A4][A5][A6][A7][A8]
Partition B: [B1][B2]
Partition C: [C1][C2]

Execution order: A1 → B1 → C1 → A2 → B2 → C2 → A3 → A4 → A5...

Everyone gets fair turns! User B doesn't wait for all 8 of User A's tasks.

Strategy Comparison

Round-Robin Strategy (recommended) — strict rotation:

Partitions:  [A: 5 jobs] [B: 2 jobs] [C: 2 jobs]

Worker 1:    A1 ──────── B1 ──────── C1 ──────── A2 ──────── B2 ────────
Worker 2:       ──────── A3 ──────── C2 ──────── A4 ──────── A5 ────────

Order: A → B → C → A → B → C → A → A → A
       └── cycles through all partitions equally

Random Strategy — unpredictable but fast:

Partitions:  [A: 5 jobs] [B: 2 jobs] [C: 2 jobs]

Worker 1:    A1 ──────── A2 ──────── B1 ──────── C1 ──────── A3 ────────
Worker 2:       ──────── C2 ──────── A4 ──────── B2 ──────── A5 ────────

Order: Random selection each time (stateless, good for high load)

Smart Strategy — prioritizes smaller queues:

Partitions:  [A: 50 jobs] [B: 3 jobs] [C: 2 jobs]
                  │            │           │
             (deprioritized)  (boost)    (boost)

Worker 1:    B1 ──────── C1 ──────── B2 ──────── C2 ──────── B3 ────────
Worker 2:       ──────── A1 ──────── A2 ──────── A3 ──────── A4 ────────

Small queues (B, C) get processed faster, preventing starvation.
User A's 50 jobs won't block users with just a few tasks.

Concurrency Limiting

With max_concurrent: 2 per partition:

Partition A: [A1][A2][A3][A4][A5] waiting
                 │   │
             ┌───┴───┴───┐
             │  Active   │
             │  A1  A2   │ ← max 2 running simultaneously
             └───────────┘

A3, A4, A5 wait until A1 or A2 completes.
Other partitions (B, C) can still run their jobs!

Installation

composer require yangusik/laravel-balanced-queue

Publish the configuration:

php artisan vendor:publish --tag=balanced-queue-config

Quick Start

Step 1: Add Queue Connection

Add to config/queue.php:

'connections' => [
    // ... your existing connections

    'balanced' => [
        'driver' => 'balanced',
        'connection' => 'default', // Redis connection from database.php
        'queue' => 'default',
        'retry_after' => 90,
    ],
],

Step 2: Create a Job

<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use YanGusik\BalancedQueue\Jobs\BalancedDispatchable;

class GenerateAIImage implements ShouldQueue
{
    use BalancedDispatchable;  // Instead of standard Dispatchable

    public function __construct(
        public int $userId,
        public string $prompt
    ) {}

    public function handle(): void
    {
        // Your AI generation logic here
    }
}

Step 3: Dispatch Jobs

// The job will automatically use $userId as partition key
GenerateAIImage::dispatch($userId, $prompt)
    ->onConnection('balanced')
    ->onQueue('ai-generation');

// Or explicitly set partition
GenerateAIImage::dispatch($userId, $prompt)
    ->onPartition($userId)
    ->onConnection('balanced');

Step 4: Run Workers

# Standard Laravel worker
php artisan queue:work balanced --queue=ai-generation

# Or with Horizon (see Horizon section below)

That's it! Jobs are now distributed fairly with max 2 concurrent per user.

Laravel Horizon Integration

Configuration

Add a supervisor for balanced queue in config/horizon.php:

'environments' => [
    'local' => [
        // Your other supervisors...

        'supervisor-balanced' => [
            'connection' => 'balanced',      // Must match connection name in queue.php
            'queue' => ['default'],          // Queue names to process
            'maxProcesses' => 4,             // Number of workers
            'tries' => 1,
            'timeout' => 300,
        ],
    ],

    'production' => [
        'supervisor-balanced' => [
            'connection' => 'balanced',
            'queue' => ['default', 'ai-generation'],
            'maxProcesses' => 10,
            'tries' => 1,
            'timeout' => 300,
            'balance' => 'auto',             // Horizon's auto-scaling
        ],
    ],
],

Important: What Works and What Doesn't with Horizon

Feature Status Notes
Job execution Works Jobs execute normally through Horizon workers
Completed jobs list Works Shows in Horizon dashboard after completion
Failed jobs list Works Failed jobs appear in Horizon
Worker metrics Works CPU, memory, throughput visible
Pending jobs count Doesn't work Horizon shows 0 pending
horizon:clear Doesn't work Use balanced-queue:clear instead

Why? Balanced Queue uses a different Redis key structure (partitioned queues) than standard Laravel queues. Horizon expects jobs in queues:{name} but we store them in balanced-queue:{queue}:{partition}.

Monitoring Commands

Use built-in commands instead of Horizon for queue management:

# View live statistics (updates every 2 seconds)
php artisan balanced-queue:table --watch

# One-time stats view
php artisan balanced-queue:table

# Clear all jobs from balanced queue
php artisan balanced-queue:clear

# Clear specific partition only
php artisan balanced-queue:clear --partition=user:123

# Force clear without confirmation
php artisan balanced-queue:clear --force

Example output of balanced-queue:table --watch:

╔══════════════════════════════════════════════════════════════╗
║            BALANCED QUEUE MONITOR - default
╚══════════════════════════════════════════════════════════════╝

+------------+--------+---------+--------+-----------+
| Partition  | Status | Pending | Active | Processed |
+------------+--------+---------+--------+-----------+
| user:456   | ●      | 15      | 2      | 45        |
| user:123   | ●      | 8       | 2      | 120       |
| user:789   | ○      | 3       | 0      | 12        |
+------------+--------+---------+--------+-----------+

  Total: 26 pending, 4 active, 3 partitions

  Strategy: round-robin | Max concurrent: 2
  Updated: 14:32:15

Configuration

Partition Strategies

Choose how partitions are selected for processing:

// config/balanced-queue.php
'strategy' => env('BALANCED_QUEUE_STRATEGY', 'round-robin'),
Strategy Description Best For
random Random partition selection (Redis SRANDMEMBER) High-load, stateless systems
round-robin Strict sequential: A→B→C→A→B→C Recommended. Fair distribution
smart Considers queue size + wait time, boosts small queues Preventing starvation of small users

Concurrency Limiters

Control how many jobs run simultaneously per partition:

// config/balanced-queue.php
'limiter' => env('BALANCED_QUEUE_LIMITER', 'simple'),

'limiters' => [
    'simple' => [
        'max_concurrent' => 2, // Max 2 jobs per user at once
    ],
],
Limiter Description Best For
null No limits, unlimited parallel jobs When you only need fair distribution
simple Fixed limit per partition (e.g., max 2) Recommended. Most use cases
adaptive Dynamic limit based on system load Auto-scaling scenarios

Environment Variables

BALANCED_QUEUE_ENABLED=true
BALANCED_QUEUE_STRATEGY=round-robin
BALANCED_QUEUE_LIMITER=simple
BALANCED_QUEUE_MAX_CONCURRENT=2
BALANCED_QUEUE_PREFIX=balanced-queue
BALANCED_QUEUE_REDIS_CONNECTION=default

Partition Keys

Automatic Detection

The BalancedDispatchable trait automatically detects partition key from common property names:

class MyJob implements ShouldQueue
{
    use BalancedDispatchable;

    public function __construct(
        public int $userId  // Automatically used as partition key
    ) {}
}

Supported auto-detected properties: $userId, $user_id, $tenantId, $tenant_id

Explicit Partition

// Set partition when dispatching
MyJob::dispatch($data)
    ->onPartition("user:{$userId}")
    ->onConnection('balanced');

// Or set in job constructor
MyJob::dispatch($data)
    ->onPartition($companyId)
    ->onConnection('balanced');

Custom Partition Logic

Override getPartitionKey() in your job:

class ProcessOrder implements ShouldQueue
{
    use BalancedDispatchable;

    public function __construct(public Order $order) {}

    public function getPartitionKey(): string
    {
        // Partition by merchant instead of user
        return "merchant:{$this->order->merchant_id}";
    }
}

Global Partition Resolver

Set a default resolver in config for all jobs:

// config/balanced-queue.php
'partition_resolver' => function ($job) {
    return $job->tenant_id ?? $job->user_id ?? 'default';
},

Advanced Usage

Programmatic Metrics

use YanGusik\BalancedQueue\Support\Metrics;

$metrics = new Metrics();

// Get queue summary
$summary = $metrics->getSummary('default');
// Returns: [
//     'partitions' => 5,
//     'total_queued' => 100,
//     'total_active' => 10,
//     'partitions_stats' => [...]
// ]

// Get per-partition stats
$stats = $metrics->getQueueStats('default');
// Returns: [
//     'user:123' => ['queued' => 10, 'active' => 2, 'metrics' => [...]],
//     'user:456' => ['queued' => 5, 'active' => 1, 'metrics' => [...]],
// ]

// Clear queue programmatically
$metrics->clearQueue('default');

Custom Strategy

use YanGusik\BalancedQueue\Contracts\PartitionStrategy;
use Illuminate\Contracts\Redis\Connection;

class PriorityStrategy implements PartitionStrategy
{
    public function selectPartition(Connection $redis, string $queue, string $partitionsKey): ?string
    {
        // Get all partitions
        $partitions = $redis->smembers($partitionsKey);

        // Your priority logic here
        // e.g., check user subscription level, queue size, etc.

        return $selectedPartition;
    }

    public function getName(): string
    {
        return 'priority';
    }
}

Register in config:

// config/balanced-queue.php
'strategies' => [
    'priority' => [
        'class' => App\Queue\PriorityStrategy::class,
    ],
],

'strategy' => 'priority',

Custom Limiter

use YanGusik\BalancedQueue\Contracts\ConcurrencyLimiter;

class PlanBasedLimiter implements ConcurrencyLimiter
{
    public function canProcess($redis, $queue, $partition): bool
    {
        $userId = str_replace('user:', '', $partition);
        $user = User::find($userId);
        $limit = $user->subscription->concurrent_limit ?? 1;

        return $this->getActiveCount($redis, $queue, $partition) < $limit;
    }

    // ... implement other interface methods
}

Redis Structure

Understanding the Redis key structure helps with debugging:

{prefix}:{queue}:partitions              → SET of partition names
{prefix}:{queue}:{partition}             → LIST of job payloads
{prefix}:{queue}:{partition}:active      → HASH of currently running job IDs
{prefix}:metrics:{queue}:{partition}     → HASH of metrics (pushed, popped counts)
{prefix}:rr-state:{queue}                → STRING round-robin counter

Example with default prefix and queue:

balanced-queue:queues:default:partitions       → {"user:123", "user:456"}
balanced-queue:queues:default:user:123         → [job1_payload, job2_payload, ...]
balanced-queue:queues:default:user:123:active  → {job_uuid: timestamp, ...}

Debugging with Redis CLI

# List all balanced queue keys
redis-cli keys "balanced-queue*"

# See all partitions
redis-cli smembers "balanced-queue:queues:default:partitions"

# Check pending jobs for a partition
redis-cli llen "balanced-queue:queues:default:user:123"

# Check active jobs for a partition
redis-cli hgetall "balanced-queue:queues:default:user:123:active"

# Clear stuck active jobs (if worker crashed)
redis-cli del "balanced-queue:queues:default:user:123:active"

Troubleshooting

Jobs not executing

  1. Check that connection name matches in queue.php and when dispatching:

    ->onConnection('balanced')  // Must match 'balanced' in queue.php
  2. Verify worker is running with correct connection:

    php artisan queue:work balanced

Jobs stuck / not completing

Check for orphaned active job entries:

# View active jobs
redis-cli hgetall "balanced-queue:queues:default:{partition}:active"

# Clear if stuck (jobs older than retry_after)
redis-cli del "balanced-queue:queues:default:{partition}:active"

Horizon shows 0 pending jobs

This is expected behavior. Use balanced-queue:table command instead:

php artisan balanced-queue:table --watch

Workers idle but jobs pending

Check if all partitions hit their concurrency limit:

php artisan balanced-queue:table

If all partitions show Active = max_concurrent, workers are waiting for slots to free up.

Testing

# Run package tests
composer test

# Test in your app
php artisan tinker

>>> use App\Jobs\MyJob;
>>> MyJob::dispatch($userId, $data)->onPartition($userId)->onConnection('balanced');

Requirements

  • PHP 8.1+
  • Laravel 10.x, 11.x, or 12.x
  • Redis with phpredis or predis
  • Laravel Horizon (optional, for worker management)

Credits

Inspired by aloware/fair-queue with improvements:

  • Multiple partition strategies (not just random)
  • Built-in concurrency limiters
  • Artisan commands for monitoring
  • Cleaner, extensible architecture

License

MIT License. See LICENSE for details.