sergiumhi/laravel-flow

A lightweight job orchestrator for Laravel: composable, observable pipelines of queued jobs with revert (saga) and fan-out (subtask) support.

Maintainers

Package info

github.com/sergiumhi/laravel-flow

pkg:composer/sergiumhi/laravel-flow

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

v0.1.0 2026-06-05 11:49 UTC

This package is auto-updated.

Last update: 2026-06-05 12:10:28 UTC


README

A lightweight job orchestrator for Laravel. Wraps queued jobs into composable, observable pipelines with revert (saga) and fan-out (subtask) support.

Latest Version on Packagist Total Downloads License

A Flow is a PHP class with a run() method that returns a Generator. Each yield inside run() hands a Task (or a Signal) to the orchestrator, which dispatches it as a queued Laravel job, saves state, and goes to sleep. When the job finishes, the orchestrator wakes up and sends the result back into the generator.

This is the continuation pattern — a flow is not a long-running process. It is a short-lived job that wakes up, makes one decision, dispatches one task, saves state, and sleeps again. Because the Generator cannot be serialized, the orchestrator rebuilds it from scratch on every wake-up and fast-forwards it to the current position by replaying the outputs of completed steps.

run() must be side-effect free. It is re-executed up to the current step on every wake-up. No DB writes, no API calls, no randomness — everything with a side effect belongs in Task::handle().

Table of contents

Requirements

  • PHP 8.3+
  • Laravel 13

Installation

Install via Composer:

composer require sergiumhi/laravel-flow

The service provider is auto-discovered. Run the migrations to create the two tables the engine needs (flows and flow_tasks):

php artisan migrate

The migrations ship with the package and are loaded automatically — there is nothing to publish. If you want to customise the schema, you can copy them into your app and disable package auto-loading, but most apps never need to.

Optionally publish the config file:

php artisan vendor:publish --tag=flow-config

It works with any queue driver (sync, database, redis, sqs). On the sync driver a flow runs to completion (or to its first pause/signal) inside the start() call, which makes it trivial to try and to test.

Quick start

use App\Flows\GreetingFlow;
use Sergiumhi\LaravelFlow\Flow;

// Start a flow with an input payload.
$flow = GreetingFlow::start(['name' => 'Ada']);

$flow->publicId();   // "flow_01J..."
$flow->status();     // FlowStatus enum

// Look one up later by its public id.
$flow = Flow::find('flow_01J...');

Defining a flow

Extend Sergiumhi\LaravelFlow\Flow and implement run() as a generator. Your flow classes live wherever you like; the convention is app/Flows (App\Flows), which is what flow:run resolves short names against.

namespace App\Flows;

use App\Tasks\BuildGreetingTask;
use App\Tasks\RecordGreetingTask;
use App\Tasks\ShoutGreetingTask;
use Generator;
use Sergiumhi\LaravelFlow\Flow;

class GreetingFlow extends Flow
{
    public function run(array $payload): Generator
    {
        // run() receives the array passed to start(). Pass it (or any subset)
        // explicitly to each task via init() — tasks do not receive the flow
        // payload implicitly. $payload is also yours to branch on directly.
        $built = yield BuildGreetingTask::init($payload);

        // Later tasks are fed from the previous result (output chaining).
        $shouted = yield ShoutGreetingTask::init(['text' => $built['greeting']]);

        yield RecordGreetingTask::init(['message' => $shouted['shout']]);
    }
}

Start it:

GreetingFlow::start(['name' => 'Ada']);

Defining a task

A Task is a thin wrapper around a unit of work. The work lives in handle(). There is no separate Laravel Job class to write — the framework generates and dispatches the underlying queued job for you.

namespace App\Tasks;

use Sergiumhi\LaravelFlow\Task;

class ProcessPaymentTask extends Task
{
    public function handle(): array
    {
        // ... do the work, reading inputs from $this->payload ...
        return ['charge_id' => 'ch_1', 'requiresManualReview' => false];
    }
}

Task API

use Sergiumhi\LaravelFlow\OnFailure;

// No payload
yield ProcessPaymentTask::init();

// With a payload — handle() reads it via $this->payload
yield ProcessPaymentTask::init(['amount' => 100]);

// Register a compensating task for revert
yield ProcessPaymentTask::init($payload)->revert(RefundPaymentTask::class);

// Declare failure behaviour (default is OnFailure::PAUSE)
yield SyncAnalyticsTask::init($payload)->onFailure(OnFailure::SKIP);

The payload is passed once, to init(), and read inside handle() via $this->payload. There is no separate ->with(), and handle() takes no argument.

Retries

Override tries() to retry a task before it is considered finally failed. The framework manages retries itself (so behaviour is identical under sync and async queues); the attempts column tracks how many times it ran.

class ChargeCardTask extends Task
{
    public function tries(): int
    {
        return 3;
    }

    public function handle(): array { /* ... */ }
}

Passing data: payload & output chaining

A task reads its inputs from $this->payload — exactly the array passed to its init(...) at the yield site. Nothing is merged in implicitly. You feed that payload from two sources:

  1. Flow payloadrun(array $payload) receives the array passed to Flow::start(). Thread it (or any subset) into the tasks that need it. The same array is yours to branch on directly (e.g. if ($payload['requires_review'] ?? false)).

  2. Output chaining — the value a task returns becomes the result of its yield, so you can feed it into the next task explicitly:

    $a = yield BuildGreetingTask::init($payload);   // returns ['greeting' => '...']
    yield ShoutGreetingTask::init(['text' => $a['greeting']]);

Tasks never communicate through a shared payload — only through the generator. This keeps run() the single place that wires steps together, and makes every task's inputs visible at its call site.

Signals

A Signal is a named pause point. The flow stops at the yield and waits — indefinitely, with no timeout and no job dispatched — until an external caller sends the signal. The flow status becomes waiting.

use Sergiumhi\LaravelFlow\Signal;

// Inside run() — pause until 'manual_approval' arrives.
$approval = yield Signal::waitFor('manual_approval');
// $approval is the payload that was sent, e.g. ['approved_by' => 7]

Send the signal from outside:

Flow::find($publicId)->signal('manual_approval', ['approved_by' => $user->id]);

Rules:

  • One signal per yield.
  • No timeout — the flow waits indefinitely.
  • Wrong name is ignored — sending a name the flow is not waiting for is a no-op.
  • Not-waiting is ignored — a signal to a flow that is not currently waiting is a no-op.

Subtasks (fan-out)

A task can spawn child tasks by returning a SubTaskCollection from handle(). The parent stays running until all children finish; the parent's output is then the aggregated list of subtask outputs, which flows into the next step.

Subtasks use the same Task base class — the only difference is structural (they have a parent_id). Subtasks cannot spawn further subtasks (max depth is two).

Parallel (all at once)

use Sergiumhi\LaravelFlow\SubTaskCollection;
use Sergiumhi\LaravelFlow\Task;

class ProcessCsvTask extends Task
{
    public function handle(): SubTaskCollection
    {
        $chunks = array_chunk($this->payload['rows'], $this->payload['chunk_size']);

        return SubTaskCollection::parallel(
            array_map(fn ($chunk) => ProcessChunkTask::init(['chunk' => $chunk]), $chunks),
        );
    }
}

Sequential (one at a time, in order)

return SubTaskCollection::sequential([
    BuildSectionTask::init(['section' => 'intro']),
    BuildSectionTask::init(['section' => 'body']),
]);

The aggregated output is available on the parent's yield:

$chunks = yield ProcessCsvTask::init($payload);   // [['rows_processed' => 3], ...]
yield GenerateReportTask::init(['chunks' => $chunks]);

Failure handling

Each task declares what happens when it fails after exhausting its retries, via ->onFailure() (default OnFailure::PAUSE).

Value Behaviour
OnFailure::PAUSE Flow status → paused. Waits for retryCurrentTask(), skipCurrentTask(), or cancel().
OnFailure::SKIP Task marked skipped; the flow advances automatically.
OnFailure::REVERT Flow immediately reverts all completed tasks in reverse.
yield ProcessPaymentTask::init($payload)->onFailure(OnFailure::PAUSE);  // default
yield SyncAnalyticsTask::init($payload)->onFailure(OnFailure::SKIP);    // non-critical
yield ChargeCardTask::init($payload)->onFailure(OnFailure::REVERT);     // roll everything back

A failed subtask always pauses the flow (its parent is marked failed); subtask failures are coarse-grained on purpose — onFailure is not consulted for subtasks. The first child to exhaust its own tries() flips the parent to failed and parks the flow at paused, with current_task_id pointing at the parent (not the failed child). Siblings already finished keep their completed status.

Recovering a failed fan-out

From a paused fan-out you have two recovery paths:

Call Effect
retryCurrentTask() Re-runs the whole parent from scratch: its handle() executes again and regenerates the entire SubTaskCollection, so every chunk re-runs (the old children are discarded first).
retryFailedSubtasks() Re-dispatches only the failed children, leaving the completed ones untouched. The flow resumes only once every child has finished.

retryFailedSubtasks() is what you want for large fan-outs (e.g. importing a 5M-row CSV as dozens of 10k-row chunks): a mid-batch failure re-runs just the broken chunks rather than reprocessing the ones that already succeeded.

Revert (saga pattern)

When cancelAndRevert() is called (or OnFailure::REVERT fires), the orchestrator walks backwards through all completed top-level tasks and runs each task's revert task, if one is defined.

Completed:  ProcessPayment ✓    ChargeShipping ✓    (then revert is triggered)

Revert runs in reverse:
  1. CancelShippingTask   (reverts ChargeShipping)
  2. RefundPaymentTask    (reverts ProcessPayment)

Each revert is a normal queued task (with its own retries). The flow status is reverting during the saga. The terminal status depends on what triggered it: a user-initiated cancelAndRevert() ends reverted (and the remaining, not-yet-run tasks are marked cancelled), while a failure-driven OnFailure::REVERT ends failed.

Plain cancel() is different: it halts without reverting — the remaining tasks are marked cancelled, the flow ends cancelled, and completed work is left in place. Use it to stop a flow and keep its side effects.

The revert task receives the original task's output as its payload:

  • An associative output (e.g. ['charge_id' => ...]) is merged over the flow payload, so its fields are directly accessible.
  • The raw output is always available under the output key — useful for a subtask parent, whose aggregated output is a list.
class RefundPaymentTask extends Task
{
    public function handle(): array
    {
        return ['refunded' => $this->payload['charge_id']];   // directly accessible
    }
}

Manual control of a running flow

$flow = Flow::find($publicId);

// Re-dispatch the failed task from scratch.
$flow->retryCurrentTask();

// Skip the failed task and advance.
$flow->skipCurrentTask();

// Re-run only the failed children of a failed subtask parent (keep the rest).
$flow->retryFailedSubtasks();

// Reset a task and everything after it, then re-run from there.
$flow->retryFromTask($taskPublicId);

// Halt without reverting: remaining tasks → cancelled, flow → cancelled.
$flow->cancel();

// Halt and run revert tasks in reverse: flow → reverted.
$flow->cancelAndRevert();

// Resume a flow that is waiting on a signal.
$flow->signal('manual_approval', ['approved_by' => 7]);

Job timing

Every task and subtask records three timestamps for its underlying queued job:

Column Set when Meaning
job_dispatched_at Orchestrator dispatches the job In the queue, not yet picked up
job_started_at Worker calls handle() Execution begins
job_finished_at handle() returns or throws Execution ended

From these you can derive queue wait (started - dispatched), execution time (finished - started), and total time. For a task with subtasks, the parent's job_finished_at is set when the last subtask completes.

Status reference

Flow statuses (flows.status)

Status Meaning
pending Created, not yet started
running Actively dispatching tasks
waiting Parked at a signal, waiting for an external signal
paused A task failed, waiting for manual action
reverting Running revert tasks in reverse
completed All tasks finished successfully
failed Ended in failure (after OnFailure::REVERT compensation or skip)
cancelled Halted via cancel() — remaining tasks cancelled, no revert
reverted Halted via cancelAndRevert() — completed tasks compensated

Task statuses (flow_tasks.status)

Status Meaning
pending Known but not yet dispatched
running Dispatched (or waiting for subtasks)
completed Finished successfully
failed Failed after all retries
skipped Skipped via OnFailure::SKIP or skipCurrentTask()
waiting Signal yield, waiting for an external signal
reverting Revert task is running
reverted Revert finished
abandoned Seeded as a DAG preview but never reached (the branch was not taken)
cancelled Not-yet-run when a cancel() / cancelAndRevert() halted the flow

Artisan commands

# Start a flow by name (short name, "studly" name, or FQCN) with a JSON payload.
php artisan flow:run order --payload='{"order_id":1,"requires_review":true}'

# Inspect a flow's DAG, statuses, and timing.
php artisan flow:status flow_01J...

# Resume a flow that is waiting on a signal.
php artisan flow:signal flow_01J... manual_approval --payload='{"approved_by":7}'

# Delete finished flow runs older than the retention period.
php artisan flow:prune --days=30

flow:run resolves short names against your flows namespace (App\Flows by default — see Configuration). On the database queue driver, run a worker (php artisan queue:work) so dispatched jobs are processed. On sync everything runs inline.

Configuration

Publish the config with php artisan vendor:publish --tag=flow-config to get config/flow.php:

return [
    // Swap these for your own subclasses to extend the engine's models.
    'flow_model' => \Sergiumhi\LaravelFlow\Models\FlowModel::class,
    'flow_tasks_model' => \Sergiumhi\LaravelFlow\Models\FlowTask::class,

    // The namespace segment under app/ that holds your flow classes (App\Flows).
    // Used by flow:run to resolve a flow from a short name.
    'flows_folder' => 'Flows',

    'prune' => [
        'command' => \Sergiumhi\LaravelFlow\Console\Commands\FlowPruneCommand::class,
        'retention_days' => 30,
        // Set to true to register the daily prune on the scheduler automatically.
        // Off by default so the package never deletes data without opt-in.
        'schedule' => false,
    ],
];

Extending the models

Both Eloquent models are resolved through config, so you can extend them:

use Sergiumhi\LaravelFlow\Models\FlowModel;

class Flow extends FlowModel
{
    // your relationships, scopes, casts...
}
// config/flow.php
'flow_model' => \App\Models\Flow::class,

Pruning on a schedule

Set flow.prune.schedule to true to have the package register the daily prune on Laravel's scheduler. It is off by default — a package should never delete your data without an explicit opt-in. You can also call flow:prune from your own schedule definition instead.

How it works (orchestrator lifecycle)

1. OrderFlow::start(['order_id' => 1])
   → creates a flows row (status: pending)
   → statically analyses run() to seed every top-level yield (both sides of
     conditionals) as preview rows — for full DAG visibility up front
   → dispatches FlowOrchestratorJob

2. FlowOrchestratorJob (advance)
   → rebuilds the generator, replays completed steps, reconciles the DAG
   → next yield is a Task   → mark running, dispatch FlowTaskJob
   → next yield is a Signal → mark waiting, record signal_waited_at, stop
   → generator exhausted    → mark the flow completed

3. FlowTaskJob (run a task)
   → calls handle()
   → returns array             → task completed, dispatch FlowOrchestratorJob
   → returns SubTaskCollection → persist children, dispatch them
   → throws                    → retry if attempts remain, else route by OnFailure

4. Subtask finishes
   → all siblings done → aggregate output, complete parent, advance the flow
   → sequential        → dispatch the next sibling

At start(), the framework statically analyses run() (via nikic/php-parser) — parsing every yield SomeTask::init(...) and yield Signal::waitFor(...) in source order, including both sides of every conditional branch. Each becomes a seed row that the orchestrator promotes to a real execution row as the generator actually reaches it. Branches never taken are marked abandoned. This gives the full shape of the flow up front, before anything runs.

Testing

The package is tested against Orchestra Testbench on an in-memory SQLite database with the sync queue:

composer test

License

The MIT License (MIT). See LICENSE.md.