convoy/core

Expression-based async coordination for PHP 8.4+

Maintainers

Package info

github.com/jhavenz/convoy-core

pkg:composer/convoy/core

Statistics

Installs: 1

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

v1.0.0-alpha 2026-03-16 13:17 UTC

This package is auto-updated.

Last update: 2026-03-17 08:35:42 UTC


README

Convoy is an async coordination library for PHP 8.4+. It replaces callbacks with typed tasks—named computations that carry their own identity, behavior, and lifecycle through a unified execution model built on ReactPHP.

Substack write up

Table of Contents

Installation

composer require convoy/core

Requires PHP 8.4+.

Quick Start

<?php

$app = Application::starting()->providers(new AppBundle())->compile();
$app->startup();

$scope = $app->createScope();

$result = $scope->execute(Task::of(static fn(ExecutionScope $s) =>
    $s->service(OrderService::class)->process(42)
));

$scope->dispose();
$app->shutdown();

Note: Service classes like OrderService, UserRepo, DatabasePool in these examples are illustrative. Convoy Core provides the coordination primitives—your application brings the domain logic.

How It Works

Convoy's model: Application -> Scope -> Tasks.

Application::starting($context)
    -> compile()           // Validate service graph, create app
    -> startup()           // Run startup hooks, enable shutdown handlers
    -> createScope()       // Create ExecutionScope
    -> execute(Task)       // Run typed tasks
    -> dispose()           // Cleanup scope resources
    -> shutdown()          // Cleanup app resources

Every task implements Scopeable or Executable—single-method interfaces:

<?php

// Tasks needing only service resolution
interface Scopeable {
    public function __invoke(Scope $scope): mixed;
}

// Tasks needing execution primitives (concurrency, cancellation)
interface Executable {
    public function __invoke(ExecutionScope $scope): mixed;
}

Scope Hierarchy

Convoy splits scope into two interfaces:

Interface Purpose
Scope Service resolution, attributes, tracing
ExecutionScope Extends Scope with concurrency, cancellation, disposal
CommandScope Decorates ExecutionScope with typed $args and $options
HttpScope Decorates ExecutionScope with typed $params and $query

Scope is the minimal interface for most code—service access and attribute passing.

ExecutionScope adds execution capabilities: concurrent(), race(), execute(), throwIfCancelled(), dispose().

<?php

// Scope: minimal interface
interface Scope {
    public function service(string $type): object;
    public function attribute(string $key, mixed $default = null): mixed;
    public function withAttribute(string $key, mixed $value): Scope;
    public function trace(): Trace;
}

// ExecutionScope: full execution capabilities
interface ExecutionScope extends Scope {
    public bool $isCancelled { get; }
    public function execute(Scopeable|Executable $task): mixed;
    public function concurrent(array $tasks): array;
    public function throwIfCancelled(): void;
    public function onDispose(Closure $callback): void;
    // ... and more
}

Handler closures receive ExecutionScope for full concurrency control. File loading receives Scope for service resolution.

The Task System

Two Ways to Define Tasks

Quick tasks for one-offs:

<?php

$task = Task::of(static fn(ExecutionScope $s) => $s->service(UserRepo::class)->find($id));
$user = $scope->execute($task);

Invokable classes for everything else:

<?php

final readonly class FetchUser implements Scopeable
{
    public function __construct(private int $id) {}

    public function __invoke(Scope $scope): User
    {
        return $scope->service(UserRepo::class)->find($this->id);
    }
}

$user = $scope->execute(new FetchUser(42));

The invokable approach gives you:

  • Traceable: Stack traces show FetchUser::__invoke, not Closure@handler.php:47
  • Testable: Mock the scope, invoke the task, assert the result
  • Serializable: Constructor args are data—queue jobs, distribute across workers
  • Inspectable: The class name is the identity; constructor args are the inputs

Behavior via Interfaces

Tasks declare behavior through PHP 8.4 property hooks:

<?php

final class DatabaseQuery implements Scopeable, Retryable, HasTimeout
{
    public RetryPolicy $retryPolicy {
        get => RetryPolicy::exponential(3);
    }

    public float $timeout {
        get => 5.0;
    }

    public function __invoke(Scope $scope): array
    {
        return $scope->service(Database::class)->query($this->sql);
    }
}

The behavior pipeline applies automatically: timeout wraps retry wraps trace wraps work.

Interface Property Purpose
Retryable RetryPolicy $retryPolicy { get; } Automatic retry with policy
HasTimeout float $timeout { get; } Automatic timeout in seconds
HasPriority int $priority { get; } Priority queue ordering
UsesPool UnitEnum $pool { get; } Pool-aware scheduling
Traceable string $traceName { get; } Custom trace label

Concurrency Primitives

Method Behavior Returns
concurrent($tasks) Run all in parallel, wait for all Array of results
race($tasks) First to settle (success or failure) Single result
any($tasks) First success (ignores failures) Single result
map($items, $fn, $limit) Bounded parallelism over collection Array of results
settle($tasks) Run all, collect outcomes including failures SettlementBag
timeout($seconds, $task) Run with deadline Result or throws
series($tasks) Sequential execution Array of results
waterfall($tasks) Sequential, passing result forward Final result
<?php

// Parallel fetch
[$customer, $inventory] = $scope->concurrent([
    new FetchCustomer($customerId),
    new ValidateInventory($items),
]);

// First successful response wins (fallback pattern)
$data = $scope->any([
    new FetchFromPrimary($key),
    new FetchFromFallback($key),
]);

// 10,000 items. 10 concurrent workers.
$results = $scope->map($items, fn($item) => new ProcessItem($item), limit: 10);

Route Groups

Typed collections of HTTP routes with RouteGroup. Handler closures receive HttpScope—a scope decorator with typed route parameters and query strings:

<?php
// routes/api.php

use Convoy\Http\HttpScope;
use Convoy\Http\Route;
use Convoy\Http\RouteGroup;
use Convoy\Scope;

return static fn(Scope $s): RouteGroup => RouteGroup::of([
    'GET /users' => new Route(
        fn: static fn(HttpScope $hs) => $hs->service(UserRepo::class)->all(),
    ),
    'GET /users/{id}' => new Route(
        fn: static fn(HttpScope $hs) => $hs->service(UserRepo::class)->find(
            (int) $hs->params->required('id')
        ),
    ),
    'POST /users' => new Route(
        fn: static fn(HttpScope $hs) => $hs->service(UserRepo::class)->create(
            $hs->attribute('request.body')
        ),
        config: new RouteConfig(timeout: 5.0),
    ),
]);

Loading Routes

<?php

use Convoy\Handler\HandlerLoader;
use Convoy\Runner\HttpRunner;

$routes = HandlerLoader::loadRouteDirectory(__DIR__ . '/routes', $app->scope());
$runner = HttpRunner::withRoutes($app, $routes, requestTimeout: 30.0);
$runner->run('0.0.0.0:8080');

Composing Route Groups

<?php

$api = RouteGroup::create()
    ->merge($publicRoutes)
    ->mount('/admin', $adminRoutes)
    ->wrap(new AuthMiddleware());

Command Groups

Typed collections of CLI commands with CommandGroup. Handler closures receive CommandScope—a scope decorator with typed arguments and options:

<?php
// commands/db.php

use Convoy\Console\Command;
use Convoy\Console\CommandGroup;
use Convoy\Console\CommandConfig;
use Convoy\Console\CommandScope;
use Convoy\Scope;

return static fn(Scope $s): CommandGroup => CommandGroup::of([
    'migrate' => new Command(
        fn: static fn(CommandScope $cs) => $cs->service(Migrator::class)->run(),
        config: new CommandConfig(description: 'Run database migrations'),
    ),
    'db:seed' => new Command(
        fn: static fn(CommandScope $cs) => $cs->service(Seeder::class)->run(
            $cs->options->flag('fresh')
        ),
        config: (new CommandConfig(description: 'Seed the database'))
            ->withOption('fresh', shorthand: 'f', description: 'Truncate tables first'),
    ),
]);

Running Commands

<?php

use Convoy\Handler\HandlerLoader;
use Convoy\Runner\ConsoleRunner;

$commands = HandlerLoader::loadCommandDirectory(__DIR__ . '/commands', $app->scope());
$runner = ConsoleRunner::withCommands($app, $commands);
exit($runner->run($argv));

Services

<?php

use Convoy\Service\ServiceBundle;
use Convoy\Service\Services;

class AppBundle implements ServiceBundle
{
    public function services(Services $services, array $context): void
    {
        $services->singleton(DatabasePool::class)
            ->factory(fn() => new DatabasePool($context['db_url']))
            ->onStartup(fn($pool) => $pool->warmUp(5))
            ->onShutdown(fn($pool) => $pool->drain());

        $services->scoped(RequestLogger::class)
            ->lazy()
            ->onDispose(fn($log) => $log->flush());
    }
}
Method Lifecycle
singleton() One instance per application
scoped() One instance per scope, disposed with scope
lazy() Defer creation until first access (PHP 8.4 lazy ghosts)

Cancellation & Retry

<?php

use Convoy\Concurrency\CancellationToken;
use Convoy\Concurrency\RetryPolicy;

// Timeout for entire scope
$scope = $app->createScope(CancellationToken::timeout(30.0));

// Task-level timeout
$result = $scope->timeout(5.0, new SlowApiCall($id));

// Retry with exponential backoff
$result = $scope->retry(
    new FetchFromApi($url),
    RetryPolicy::exponential(attempts: 3)
);

// Check cancellation within tasks (use Executable when you need ExecutionScope)
final class LongRunningTask implements Executable
{
    public function __invoke(ExecutionScope $scope): mixed
    {
        foreach ($this->chunks as $chunk) {
            $scope->throwIfCancelled();
            $this->process($chunk);
        }
        return $this->result;
    }
}

Tracing

CONVOY_TRACE=1 php server.php
    0ms  STRT  compiling
    4ms  STRT  startup
    6ms  CON>    concurrent(2)
    7ms  EXEC    FetchCustomer
    8ms  DONE    FetchCustomer  +0.61ms
   19ms  CON<    concurrent(2) joined  +12.8ms

0 svc  4.0MB peak  0 gc  39.8ms total

Deterministic Cleanup

<?php

$scope = $app->createScope();
$scope->onDispose(fn() => $connection->close());

// Your task code...

$scope->dispose();  // Cleanup fires in reverse order

Examples

Docker CLI

A practical CLI app demonstrating CommandScope with typed arguments and options, ServiceBundle wiring, and invokable task classes.

php examples/04-apps/docker-cli/run.php ps -a
php examples/04-apps/docker-cli/run.php up nginx --name my-nginx
php examples/04-apps/docker-cli/run.php help up

See examples/README.md for full details.