hyperf/dag

Dag runner for hyperf

v3.1.42 2024-09-25 02:54 UTC

README

hyperf/dag 是一个轻量级有向无环图 (Directed Acyclic Graph) 任务编排库。

场景

假设我们有一系列任务需要执行。

  • 如果他们之间存在依赖关系,则可以将他们顺序执行。
  • 如果他们并不相互依赖,那么我们可以选择并发执行,以加快执行速度。
  • 两者间还存在中间状态:一部分任务存在依赖关系,而另一些任务又可以并发执行。

我们可以将第三种复杂的场景抽象成 DAG 来解决。

示例

假设我们有一系列任务,拓扑结构如上图所示,顶点代表任务,边缘代表依赖关系。(A完成后才能完成B、C、D,B完成后才能完成H、E、F...)

通过 hyperf/dag 可以使用如下方式构建 DAG 并执行。

<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";});
$b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";});
$c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";});
$d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";});
$e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";});
$f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";});
$g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";});
$h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";});
$i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";});
$dag->addVertex($a)
    ->addVertex($b)
    ->addVertex($c)
    ->addVertex($d)
    ->addVertex($e)
    ->addVertex($f)
    ->addVertex($g)
    ->addVertex($h)
    ->addVertex($i)
    ->addEdge($a, $b)
    ->addEdge($a, $c)
    ->addEdge($a, $d)
    ->addEdge($b, $h)
    ->addEdge($b, $e)
    ->addEdge($b, $f)
    ->addEdge($c, $f)
    ->addEdge($c, $g)
    ->addEdge($d, $g)
    ->addEdge($h, $i)
    ->addEdge($e, $i)
    ->addEdge($f, $i)
    ->addEdge($g, $i);
    
// 需要在协程环境下执行
$dag->run(); 

输出:

// 1s 后
A
// 2s 后
D
C
B
// 3s 后
G
F
E
H
// 4s 后
I

DAG 会按照尽可能早的原则调度任务。尝试将 B 点的耗时调整为 2 秒,会发现 B 和 G 一起完成。

访问前步结果

每一个任务可以接收一个数组参数,数组中包含所有前置依赖的结果。DAG 执行完毕后,也会返回一个同样结构的数组,包含每一步的执行结果。

<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {return 1;});
$b = \Hyperf\Dag\Vertex::make(function($results) use ($a) {
    return $results[$a->key] + 1;
});
$results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run();
assert($results[$a->key] === 1);
assert($results[$b->key] === 2);

定义一个任务

在上述文档中,我们使用了闭包来定义一个任务。格式如下。

// Vertex::make 的第二个参数为可选参数,作为 vertex 的 key,也就是结果数组的键值。
\Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");

除了使用闭包函数定义任务外,还可以使用实现了 \Hyperf\Dag\Runner 接口的类来定义,并通过 Vertex::of 将其转化为一个顶点。

class MyJob implements \Hyperf\Dag\Runner {
    public function run($results = []) {
        return 'hello';
    }
}

\Hyperf\Dag\Vertex::of(new MyJob(), "greeting");

\Hyperf\Dag\Dag 本身也实现了 \Hyperf\Dag\Runner 接口,所以可以嵌套使用。

<?php
// 命名空间已省略
$a = Vertex::make(function () { return 1;});
$b = Vertex::make(function () { return 2;});
$c = Vertex::make(function () { return 3;});

$nestedDag = new Dag();
$nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b);
$d = Vertex::of($nestedDag);

$superDag = new Dag();
$superDag->addVertex($c)->addVertex($d)->addEdge($c, $d);
$superDag->run();

控制并发数

\Hyperf\Dag\Dag 类提供了 setConcurrency(int n) 方法控制最大并发数。默认为10。