drnasin / workflow-orchestrator
A lightweight, stateless workflow orchestration library for PHP
Requires
- php: ^8.3
- psr/container: ^2.0
Requires (Dev)
- phpunit/phpunit: ^12
This package is auto-updated.
Last update: 2026-02-10 18:33:46 UTC
README
Workflow Orchestrator
A lightweight, stateless workflow orchestration library for PHP 8.3+.
Installation
composer require drnasin/workflow-orchestrator
Quick Start
1. Define Your Workflow
use WorkflowOrchestrator\WorkflowOrchestrator; use WorkflowOrchestrator\Attributes\Orchestrator; use WorkflowOrchestrator\Attributes\Handler; class OrderProcessor { #[Orchestrator(channel: 'process.order')] public function processOrder(Order $order): array { $steps = ['validate', 'payment']; if ($order->getCustomer()->isPremium()) { $steps[] = 'apply-discount'; } $steps[] = 'confirmation'; return $steps; } #[Handler(channel: 'validate')] public function validate(Order $order): Order { // Validation logic if (!$order->hasItems()) { throw new \InvalidArgumentException('Order must have items'); } return $order; } #[Handler(channel: 'payment')] public function processPayment(Order $order): Order { // Payment processing logic $order->markAsPaid(); return $order; } #[Handler(channel: 'apply-discount')] public function applyDiscount(Order $order): Order { // Apply premium discount $order->applyDiscount(0.15); return $order; } #[Handler(channel: 'confirmation')] public function sendConfirmation(Order $order): Order { // Send confirmation email mail($order->getCustomer()->getEmail(), 'Order Confirmed', '...'); return $order; } }
2. Execute the Workflow
// Simple usage $orchestrator = WorkflowOrchestrator::create() ->register(OrderProcessor::class); $result = $orchestrator->execute('process.order', $order);
That's it! The workflow will:
- Validate the order
- Process payment
- Apply discount (if premium customer)
- Send confirmation
Features
- ✅ Stateless workflow execution - No database storage required
- ✅ Dynamic step routing - Workflows adapt based on data
- ✅ Async step processing - Heavy operations run in background with configurable retry
- ✅ Header/metadata support - Pass context between steps
- ✅ Middleware support - Add cross-cutting concerns
- ✅ Step timeouts - Enforce time limits on individual handlers
- ✅ Event listeners - Track step execution for logging, metrics, and monitoring
- ✅ Clean PHP 8+ API - Uses modern attributes and readonly properties
- ✅ Container integration - Works with any PSR-11 container
- ✅ Zero dependencies - Only requires PSR interfaces
Why Workflow Orchestrator?
Traditional Approach Problems
// ❌ Hard to maintain, test, and modify class OrderService { public function process(Order $order): void { $this->validate($order); $this->processPayment($order); // What if this fails? $this->updateInventory($order); // What if this is slow? $this->sendEmail($order); // What if email is down? $this->generateInvoice($order); // Getting complex... } }
Workflow Orchestrator Approach
// ✅ Clear, testable, and flexible #[Orchestrator(channel: 'process.order')] public function processOrder(Order $order): array { return ['validate', 'payment', 'inventory', 'email', 'invoice']; } // Each step is isolated, testable, and can be async #[Handler(channel: 'email', async: true)] public function sendEmail(Order $order): Order { ... }
Benefits Over State Machines
- Focus on behavior, not state - Define what happens, not state transitions
- No database storage - Workflows are stateless and self-contained
- Zero migrations - Deploy workflow changes instantly
- Easy testing - Each step is independently testable
- Horizontal scaling - Any server can process any step
Performance
- Stateless execution - No database queries for workflow state
- Async support - Heavy operations don't block
- Memory efficient - No persistent workflow instances
- Scales horizontally - Add more servers without coordination
Error Handling
Workflow steps that fail are automatically wrapped with context:
try { $result = $orchestrator->execute('process.order', $order); } catch (WorkflowException $e) { // Exception message: "Step 'payment' failed: Card declined" Log::error('Workflow failed', [ 'step' => $e->getFailedStep(), // Available if you extend the exception 'message' => $e->getMessage(), 'original' => $e->getPrevious(), ]); }
Testing
Test workflows in isolation:
use PHPUnit\Framework\TestCase; class OrderProcessorTest extends TestCase { public function test_premium_customer_gets_discount(): void { $container = new SimpleContainer(); $orchestrator = WorkflowOrchestrator::create($container) ->register(OrderProcessor::class); $order = new Order($premiumCustomer, $items); $result = $orchestrator->execute('process.order', $order); $this->assertTrue($result->hasDiscount()); $this->assertTrue($result->isPaid()); } public function test_regular_customer_no_discount(): void { $container = new SimpleContainer(); $orchestrator = WorkflowOrchestrator::create($container) ->register(OrderProcessor::class); $order = new Order($regularCustomer, $items); $result = $orchestrator->execute('process.order', $order); $this->assertFalse($result->hasDiscount()); $this->assertTrue($result->isPaid()); } }
Advanced Usage
Dynamic Workflows
Build different workflows based on your business rules:
#[Orchestrator(channel: 'process.user.registration')] public function processUserRegistration(User $user): array { $steps = ['validate-email', 'create-account']; // Different flow for enterprise users if ($user->isEnterprise()) { $steps[] = 'setup-organization'; $steps[] = 'assign-account-manager'; $steps[] = 'configure-billing'; } else { $steps[] = 'setup-trial'; $steps[] = 'send-welcome-email'; } // International users need additional steps if ($user->isInternational()) { $steps[] = 'setup-localization'; $steps[] = 'configure-timezone'; } $steps[] = 'activate-account'; $steps[] = 'track-registration-metrics'; return $steps; }
Headers and Context
Pass metadata between steps without modifying your main payload:
#[Handler(channel: 'enrich-customer-data', returnsHeaders: true)] public function enrichCustomerData(Order $order): array { return [ 'customer_tier' => $order->getCustomer()->getTier(), 'loyalty_points' => $order->getCustomer()->getLoyaltyPoints(), 'region' => $order->getShippingAddress()->getCountry(), ]; } #[Handler(channel: 'apply-regional-pricing')] public function applyRegionalPricing( Order $order, #[Header('customer_tier')] string $tier, #[Header('region')] string $region ): Order { // Use header values for business logic $discount = $this->calculateRegionalDiscount($tier, $region); return $order->applyDiscount($discount); }
Async Steps
For heavy processing, mark steps as async:
#[Handler(channel: 'generate-invoice', async: true)] public function generateInvoice(Order $order): Order { // This will be queued for background processing $this->invoiceGenerator->generate($order); return $order; }
Process queued steps with configurable retry:
// Process with default 3 retries $orchestrator->processAsyncStep('generate-invoice'); // Process with custom retry limit $orchestrator->processAsyncStep('generate-invoice', maxRetries: 5); // No retries — fail immediately on error $orchestrator->processAsyncStep('generate-invoice', maxRetries: 0);
Failed steps are automatically re-queued until retries are exhausted, then a WorkflowException is thrown.
Step Timeouts
Enforce time limits on individual handlers:
#[Handler(channel: 'call-external-api', timeout: 30)] public function callExternalApi(Order $order): Order { // If this takes longer than 30 seconds, a WorkflowException is thrown $response = $this->apiClient->submit($order); return $order->withResponse($response); }
The timeout is measured in seconds using wall-clock time. A value of 0 (the default) means no time limit.
Middleware
Add cross-cutting concerns that run before every workflow:
use WorkflowOrchestrator\Contracts\MiddlewareInterface; use WorkflowOrchestrator\Message\WorkflowMessage; class LoggingMiddleware implements MiddlewareInterface { public function handle(WorkflowMessage $message, callable $next): WorkflowMessage { Log::info('Workflow started', ['id' => $message->getId()]); return $next($message); } } class AuthorizationMiddleware implements MiddlewareInterface { public function handle(WorkflowMessage $message, callable $next): WorkflowMessage { if (!$message->getHeader('authorized')) { throw new \RuntimeException('Unauthorized workflow execution'); } return $next($message); } } $orchestrator = WorkflowOrchestrator::create() ->withMiddleware(new LoggingMiddleware()) ->withMiddleware(new AuthorizationMiddleware()) ->register(OrderProcessor::class);
Middleware executes in the order added. Each call to withMiddleware() returns a new immutable instance.
Event Listeners
Track step execution for logging, metrics, or monitoring:
use WorkflowOrchestrator\Contracts\EventListenerInterface; use WorkflowOrchestrator\Message\WorkflowMessage; class MetricsListener implements EventListenerInterface { public function onStepStarted(string $stepName, WorkflowMessage $message): void { Metrics::increment("workflow.step.{$stepName}.started"); } public function onStepCompleted(string $stepName, WorkflowMessage $message, float $duration): void { Metrics::timing("workflow.step.{$stepName}.duration", $duration); Metrics::increment("workflow.step.{$stepName}.completed"); } public function onStepFailed(string $stepName, WorkflowMessage $message, \Throwable $error, float $duration): void { Metrics::increment("workflow.step.{$stepName}.failed"); Log::error("Step {$stepName} failed after {$duration}s", [ 'error' => $error->getMessage(), 'workflow_id' => $message->getId(), ]); } } $orchestrator = WorkflowOrchestrator::create() ->withEventListener(new MetricsListener()) ->register(OrderProcessor::class);
Events fire for every step execution, including async steps. The $duration parameter is measured in seconds with nanosecond precision.
Container Integration
Use with your preferred dependency injection container:
use Psr\Container\ContainerInterface; // With PSR-11 container $orchestrator = new WorkflowOrchestrator($myContainer); // With custom queue for async processing $orchestrator = new WorkflowOrchestrator( container: $myContainer, queue: new RedisQueue($redisConnection) );
Queue Implementations
The Workflow Orchestrator supports multiple queue implementations for async step processing. Choose the one that best fits your infrastructure needs.
In-Memory Queue (Default)
The default queue implementation stores messages in memory and is suitable for development and testing:
use WorkflowOrchestrator\Queue\InMemoryQueue; $orchestrator = WorkflowOrchestrator::create() ->withQueue(new InMemoryQueue()) ->register(OrderProcessor::class);
Note: In-memory queues lose data when the process ends and don't support distributed processing.
SQLite Queue
For persistent storage without external dependencies, use the SQLite queue:
use WorkflowOrchestrator\Queue\SqliteQueue; // Create PDO connection $pdo = new PDO('sqlite:/path/to/your/database.sqlite'); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); // Create SQLite queue $queue = new SqliteQueue($pdo); $orchestrator = WorkflowOrchestrator::create() ->withQueue($queue) ->register(OrderProcessor::class);
Features:
- ✅ Persistent storage
- ✅ ACID transactions
- ✅ Automatic table creation
- ✅ No external dependencies
- ✅ Single-file database
Setup Requirements:
- PHP PDO extension (included by default)
- Write permissions for SQLite database file
Custom Table Name:
$queue = new SqliteQueue($pdo, 'my_custom_queue_table');
Redis Queue
For high-performance, distributed queue processing:
use WorkflowOrchestrator\Queue\RedisQueue; // Create Redis connection $redis = new Redis(); $redis->connect('127.0.0.1', 6379); // Optional: $redis->auth('your-password'); // Optional: $redis->select(1); // Use specific database // Create Redis queue $queue = new RedisQueue($redis); $orchestrator = WorkflowOrchestrator::create() ->withQueue($queue) ->register(OrderProcessor::class);
Features:
- ✅ High performance
- ✅ Distributed processing
- ✅ Blocking operations
- ✅ Multiple queue support
- ✅ Automatic cleanup
- ✅ Queue introspection
Setup Requirements:
- Redis server
- PHP Redis extension:
composer require ext-redisor install via system package manager
Install Redis Extension:
# Ubuntu/Debian sudo apt-get install php-redis # macOS (via Homebrew) brew install php pecl install redis # Windows (via PECL) pecl install redis
Custom Key Prefix:
$queue = new RedisQueue($redis, 'my_app:queue:');
Advanced Redis Usage:
// Blocking pop with timeout $message = $queue->blockingPop('high-priority', 30); // 30 second timeout // Check queue size $size = $queue->size('email-notifications'); // Get all queue names $queues = $queue->getQueueNames(); // Peek at next message without removing it $nextMessage = $queue->peek('data-processing');
Queue Selection Guide
| Feature | InMemory | SQLite | Redis |
|---|---|---|---|
| Persistence | ❌ | ✅ | ✅ |
| Distributed | ❌ | ❌ | ✅ |
| Performance | ⚡⚡⚡ | ⚡⚡ | ⚡⚡⚡ |
| Setup Complexity | ⚡⚡⚡ | ⚡⚡ | ⚡ |
| External Dependencies | ❌ | ❌ | ✅ |
| Blocking Operations | ❌ | ❌ | ✅ |
Recommendations:
- Development/Testing: InMemoryQueue
- Single-server production: SqliteQueue
- Multi-server/high-volume: RedisQueue
Custom Queue Implementation
Implement your own queue by creating a class that implements QueueInterface:
use WorkflowOrchestrator\Contracts\QueueInterface; use WorkflowOrchestrator\Message\WorkflowMessage; class MyCustomQueue implements QueueInterface { public function push(string $queue, WorkflowMessage $message): void { // Your implementation } public function pop(string $queue): ?WorkflowMessage { // Your implementation } public function size(string $queue): int { // Your implementation } public function clear(string $queue): void { // Your implementation } }
Requirements
- PHP 8.3+
- PSR-11 container (optional, includes simple container)
ext-pdofor SqliteQueueext-redisfor RedisQueue (optional)
Changelog
See CHANGELOG.md for version history.
License
MIT
Contributing
Pull requests welcome! Please ensure:
- Tests pass (
vendor/bin/phpunit) - Code follows PSR-12 standards
- New features include tests and documentation