drnasin/workflow-orchestrator

A lightweight, stateless workflow orchestration library for PHP

Maintainers

Package info

github.com/drnasin/workflow-orchestrator

pkg:composer/drnasin/workflow-orchestrator

Statistics

Installs: 2

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

v1.7.0 2026-05-28 07:15 UTC

README

Tests License: MIT PHP Version

Workflow Orchestrator

A lightweight, stateless workflow orchestration library for PHP 8.3+.

Installation

composer require drnasin/workflow-orchestrator

Quick Start

1. Define Your Workflow

use WorkflowOrchestrator\WorkflowOrchestrator;
use WorkflowOrchestrator\Attributes\Orchestrator;
use WorkflowOrchestrator\Attributes\Handler;

class OrderProcessor
{
    #[Orchestrator(channel: 'process.order')]
    public function processOrder(Order $order): array
    {
        $steps = ['validate', 'payment'];

        if ($order->getCustomer()->isPremium()) {
            $steps[] = 'apply-discount';
        }

        $steps[] = 'confirmation';
        return $steps;
    }

    #[Handler(channel: 'validate')]
    public function validate(Order $order): Order
    {
        // Validation logic
        if (!$order->hasItems()) {
            throw new \InvalidArgumentException('Order must have items');
        }
        return $order;
    }

    #[Handler(channel: 'payment')]
    public function processPayment(Order $order): Order
    {
        // Payment processing logic
        $order->markAsPaid();
        return $order;
    }

    #[Handler(channel: 'apply-discount')]
    public function applyDiscount(Order $order): Order
    {
        // Apply premium discount
        $order->applyDiscount(0.15);
        return $order;
    }

    #[Handler(channel: 'confirmation')]
    public function sendConfirmation(Order $order): Order
    {
        // Send confirmation email
        mail($order->getCustomer()->getEmail(), 'Order Confirmed', '...');
        return $order;
    }
}

2. Execute the Workflow

// Simple usage
$orchestrator = WorkflowOrchestrator::create()
    ->register(OrderProcessor::class);

$result = $orchestrator->execute('process.order', $order);

That's it! The workflow will:

  1. Validate the order
  2. Process payment
  3. Apply discount (if premium customer)
  4. Send confirmation

Features

  • Stateless workflow execution - No database storage required
  • Dynamic step routing - Workflows adapt based on data
  • Async step processing - Heavy operations run in background with configurable retry
  • Header/metadata support - Pass context between steps
  • Middleware support - Add cross-cutting concerns
  • Step timeouts - Enforce time limits on individual handlers
  • Event listeners - Track step execution for logging, metrics, and monitoring
  • Clean PHP 8+ API - Uses modern attributes and readonly properties
  • Container integration - Works with any PSR-11 container
  • Zero dependencies - Only requires PSR interfaces

Why Workflow Orchestrator?

Traditional Approach Problems

// ❌ Hard to maintain, test, and modify
class OrderService
{
    public function process(Order $order): void
    {
        $this->validate($order);
        $this->processPayment($order);     // What if this fails?
        $this->updateInventory($order);    // What if this is slow?
        $this->sendEmail($order);          // What if email is down?
        $this->generateInvoice($order);    // Getting complex...
    }
}

Workflow Orchestrator Approach

// ✅ Clear, testable, and flexible
#[Orchestrator(channel: 'process.order')]
public function processOrder(Order $order): array
{
    return ['validate', 'payment', 'inventory', 'email', 'invoice'];
}

// Each step is isolated, testable, and can be async
#[Handler(channel: 'email', async: true)]
public function sendEmail(Order $order): Order { ... }

Benefits Over State Machines

  • Focus on behavior, not state - Define what happens, not state transitions
  • No database storage - Workflows are stateless and self-contained
  • Zero migrations - Deploy workflow changes instantly
  • Easy testing - Each step is independently testable
  • Horizontal scaling - Any server can process any step

Performance

  • Stateless execution - No database queries for workflow state
  • Async support - Heavy operations don't block
  • Memory efficient - No persistent workflow instances
  • Scales horizontally - Add more servers without coordination

Error Handling

Workflow steps that fail are automatically wrapped with context:

try {
    $result = $orchestrator->execute('process.order', $order);
} catch (WorkflowException $e) {
    // Exception message: "Step 'payment' failed: Card declined"
    Log::error('Workflow failed', [
        'step' => $e->getFailedStep(), // Available if you extend the exception
        'message' => $e->getMessage(),
        'original' => $e->getPrevious(),
    ]);
}

Testing

Test workflows in isolation:

use PHPUnit\Framework\TestCase;

class OrderProcessorTest extends TestCase
{
    public function test_premium_customer_gets_discount(): void
    {
        $container = new SimpleContainer();
        $orchestrator = WorkflowOrchestrator::create($container)
            ->register(OrderProcessor::class);

        $order = new Order($premiumCustomer, $items);

        $result = $orchestrator->execute('process.order', $order);

        $this->assertTrue($result->hasDiscount());
        $this->assertTrue($result->isPaid());
    }

    public function test_regular_customer_no_discount(): void
    {
        $container = new SimpleContainer();
        $orchestrator = WorkflowOrchestrator::create($container)
            ->register(OrderProcessor::class);

        $order = new Order($regularCustomer, $items);

        $result = $orchestrator->execute('process.order', $order);

        $this->assertFalse($result->hasDiscount());
        $this->assertTrue($result->isPaid());
    }
}

Advanced Usage

Dynamic Workflows

Build different workflows based on your business rules:

#[Orchestrator(channel: 'process.user.registration')]
public function processUserRegistration(User $user): array
{
    $steps = ['validate-email', 'create-account'];

    // Different flow for enterprise users
    if ($user->isEnterprise()) {
        $steps[] = 'setup-organization';
        $steps[] = 'assign-account-manager';
        $steps[] = 'configure-billing';
    } else {
        $steps[] = 'setup-trial';
        $steps[] = 'send-welcome-email';
    }

    // International users need additional steps
    if ($user->isInternational()) {
        $steps[] = 'setup-localization';
        $steps[] = 'configure-timezone';
    }

    $steps[] = 'activate-account';
    $steps[] = 'track-registration-metrics';

    return $steps;
}

Headers and Context

Pass metadata between steps without modifying your main payload:

#[Handler(channel: 'enrich-customer-data', returnsHeaders: true)]
public function enrichCustomerData(Order $order): array
{
    return [
        'customer_tier' => $order->getCustomer()->getTier(),
        'loyalty_points' => $order->getCustomer()->getLoyaltyPoints(),
        'region' => $order->getShippingAddress()->getCountry(),
    ];
}

#[Handler(channel: 'apply-regional-pricing')]
public function applyRegionalPricing(
    Order $order,
    #[Header('customer_tier')] string $tier,
    #[Header('region')] string $region
): Order {
    // Use header values for business logic
    $discount = $this->calculateRegionalDiscount($tier, $region);
    return $order->applyDiscount($discount);
}

Async Steps

For heavy processing, mark steps as async:

#[Handler(channel: 'generate-invoice', async: true)]
public function generateInvoice(Order $order): Order
{
    // This will be queued for background processing
    $this->invoiceGenerator->generate($order);
    return $order;
}

Process queued steps with configurable retry:

// Process with default 3 retries
$orchestrator->processAsyncStep('generate-invoice');

// Process with custom retry limit
$orchestrator->processAsyncStep('generate-invoice', maxRetries: 5);

// No retries — fail immediately on error
$orchestrator->processAsyncStep('generate-invoice', maxRetries: 0);

Failed steps are automatically re-queued until retries are exhausted, then a WorkflowException is thrown.

Step Timeouts

Enforce time limits on individual handlers:

#[Handler(channel: 'call-external-api', timeout: 30)]
public function callExternalApi(Order $order): Order
{
    // If this takes longer than 30 seconds, a WorkflowException is thrown
    $response = $this->apiClient->submit($order);
    return $order->withResponse($response);
}

The timeout is measured in seconds using wall-clock time. A value of 0 (the default) means no time limit.

Middleware

Add cross-cutting concerns that run before every workflow:

use WorkflowOrchestrator\Contracts\MiddlewareInterface;
use WorkflowOrchestrator\Message\WorkflowMessage;

class LoggingMiddleware implements MiddlewareInterface
{
    public function handle(WorkflowMessage $message, callable $next): WorkflowMessage
    {
        Log::info('Workflow started', ['id' => $message->getId()]);
        return $next($message);
    }
}

class AuthorizationMiddleware implements MiddlewareInterface
{
    public function handle(WorkflowMessage $message, callable $next): WorkflowMessage
    {
        if (!$message->getHeader('authorized')) {
            throw new \RuntimeException('Unauthorized workflow execution');
        }
        return $next($message);
    }
}

$orchestrator = WorkflowOrchestrator::create()
    ->withMiddleware(new LoggingMiddleware())
    ->withMiddleware(new AuthorizationMiddleware())
    ->register(OrderProcessor::class);

Middleware executes in the order added. Each call to withMiddleware() returns a new immutable instance.

Event Listeners

Track step execution for logging, metrics, or monitoring:

use WorkflowOrchestrator\Contracts\EventListenerInterface;
use WorkflowOrchestrator\Message\WorkflowMessage;

class MetricsListener implements EventListenerInterface
{
    public function onStepStarted(string $stepName, WorkflowMessage $message): void
    {
        Metrics::increment("workflow.step.{$stepName}.started");
    }

    public function onStepCompleted(string $stepName, WorkflowMessage $message, float $duration): void
    {
        Metrics::timing("workflow.step.{$stepName}.duration", $duration);
        Metrics::increment("workflow.step.{$stepName}.completed");
    }

    public function onStepFailed(string $stepName, WorkflowMessage $message, \Throwable $error, float $duration): void
    {
        Metrics::increment("workflow.step.{$stepName}.failed");
        Log::error("Step {$stepName} failed after {$duration}s", [
            'error' => $error->getMessage(),
            'workflow_id' => $message->getId(),
        ]);
    }
}

$orchestrator = WorkflowOrchestrator::create()
    ->withEventListener(new MetricsListener())
    ->register(OrderProcessor::class);

Events fire for every step execution, including async steps. The $duration parameter is measured in seconds with nanosecond precision.

Container Integration

Use with your preferred dependency injection container:

use Psr\Container\ContainerInterface;

// With PSR-11 container
$orchestrator = new WorkflowOrchestrator($myContainer);

// With custom queue for async processing
$orchestrator = new WorkflowOrchestrator(
    container: $myContainer,
    queue: new RedisQueue($redisConnection)
);

Queue Implementations

The Workflow Orchestrator supports multiple queue implementations for async step processing. Choose the one that best fits your infrastructure needs.

In-Memory Queue (Default)

The default queue implementation stores messages in memory and is suitable for development and testing:

use WorkflowOrchestrator\Queue\InMemoryQueue;

$orchestrator = WorkflowOrchestrator::create()
    ->withQueue(new InMemoryQueue())
    ->register(OrderProcessor::class);

Note: In-memory queues lose data when the process ends and don't support distributed processing.

SQLite Queue

For persistent storage without external dependencies, use the SQLite queue:

use WorkflowOrchestrator\Queue\SqliteQueue;

// Create PDO connection
$pdo = new PDO('sqlite:/path/to/your/database.sqlite');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// Create SQLite queue
$queue = new SqliteQueue($pdo);

$orchestrator = WorkflowOrchestrator::create()
    ->withQueue($queue)
    ->register(OrderProcessor::class);

Features:

  • ✅ Persistent storage
  • ✅ ACID transactions
  • ✅ Automatic table creation
  • ✅ No external dependencies
  • ✅ Single-file database
  • ✅ Concurrency-safe pop() across multiple workers

Setup Requirements:

  • PHP PDO extension (included by default)
  • Write permissions for SQLite database file

Custom Table Name:

$queue = new SqliteQueue($pdo, 'my_custom_queue_table');

Concurrent Workers:

pop() takes the write lock up front (BEGIN IMMEDIATE), so two workers can never claim the same message. To make competing workers wait for the lock instead of failing on contention, the queue sets a busy timeout (default 5000 ms), configurable via the third constructor argument:

// Wait up to 2 seconds for the write lock before giving up
$queue = new SqliteQueue($pdo, 'workflow_queue', busyTimeoutMs: 2000);

// Fail fast on contention (no wait)
$queue = new SqliteQueue($pdo, 'workflow_queue', busyTimeoutMs: 0);

Tip: create the PDO connection with PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION (as shown above). The queue relies on lock failures surfacing as exceptions for its full concurrency guarantees.

Redis Queue

For high-performance, distributed queue processing:

use WorkflowOrchestrator\Queue\RedisQueue;

// Create Redis connection
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// Optional: $redis->auth('your-password');
// Optional: $redis->select(1); // Use specific database

// Create Redis queue
$queue = new RedisQueue($redis);

$orchestrator = WorkflowOrchestrator::create()
    ->withQueue($queue)
    ->register(OrderProcessor::class);

Features:

  • ✅ High performance
  • ✅ Distributed processing
  • ✅ Blocking operations
  • ✅ Multiple queue support
  • ✅ Automatic cleanup
  • ✅ Queue introspection

Setup Requirements:

  • Redis server
  • PHP Redis extension: composer require ext-redis or install via system package manager

Install Redis Extension:

# Ubuntu/Debian
sudo apt-get install php-redis

# macOS (via Homebrew)
brew install php
pecl install redis

# Windows (via PECL)
pecl install redis

Custom Key Prefix:

$queue = new RedisQueue($redis, 'my_app:queue:');

Advanced Redis Usage:

// Blocking pop with timeout
$message = $queue->blockingPop('high-priority', 30); // 30 second timeout

// Check queue size
$size = $queue->size('email-notifications');

// Get all queue names
$queues = $queue->getQueueNames();

// Peek at next message without removing it
$nextMessage = $queue->peek('data-processing');

Queue Selection Guide

Feature InMemory SQLite Redis
Persistence
Distributed
Performance ⚡⚡⚡ ⚡⚡ ⚡⚡⚡
Setup Complexity ⚡⚡⚡ ⚡⚡
External Dependencies
Blocking Operations

Recommendations:

  • Development/Testing: InMemoryQueue
  • Single-server production: SqliteQueue
  • Multi-server/high-volume: RedisQueue

Custom Queue Implementation

Implement your own queue by creating a class that implements QueueInterface:

use WorkflowOrchestrator\Contracts\QueueInterface;
use WorkflowOrchestrator\Message\WorkflowMessage;

class MyCustomQueue implements QueueInterface
{
    public function push(string $queue, WorkflowMessage $message): void
    {
        // Your implementation
    }

    public function pop(string $queue): ?WorkflowMessage
    {
        // Your implementation
    }

    public function size(string $queue): int
    {
        // Your implementation
    }

    public function clear(string $queue): void
    {
        // Your implementation
    }
}

Requirements

  • PHP 8.3+
  • PSR-11 container (optional, includes simple container)
  • ext-pdo for SqliteQueue
  • ext-redis for RedisQueue (optional)

Changelog

See CHANGELOG.md for version history.

License

MIT

Contributing

Pull requests welcome! Please ensure:

  • Tests pass (vendor/bin/phpunit)
  • Code follows PSR-12 standards
  • New features include tests and documentation