hyperf / dag
Dag runner for hyperf
Fund package maintenance!
Open Collective
hyperf.wiki/#/zh-cn/donate
Requires
- php: >=8.1
- hyperf/support: ~3.1.0
- hyperf/utils: ~3.1.0
This package is auto-updated.
Last update: 2025-01-21 02:26:11 UTC
README
hyperf/dag
是一个轻量级有向无环图 (Directed Acyclic Graph) 任务编排库。
场景
假设我们有一系列任务需要执行。
- 如果他们之间存在依赖关系,则可以将他们顺序执行。
- 如果他们并不相互依赖,那么我们可以选择并发执行,以加快执行速度。
- 两者间还存在中间状态:一部分任务存在依赖关系,而另一些任务又可以并发执行。
我们可以将第三种复杂的场景抽象成 DAG
来解决。
示例
假设我们有一系列任务,拓扑结构如上图所示,顶点代表任务,边缘代表依赖关系。(A完成后才能完成B、C、D,B完成后才能完成H、E、F...)
通过 hyperf/dag
可以使用如下方式构建 DAG
并执行。
<?php $dag = new \Hyperf\Dag\Dag(); $a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";}); $b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";}); $c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";}); $d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";}); $e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";}); $f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";}); $g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";}); $h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";}); $i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";}); $dag->addVertex($a) ->addVertex($b) ->addVertex($c) ->addVertex($d) ->addVertex($e) ->addVertex($f) ->addVertex($g) ->addVertex($h) ->addVertex($i) ->addEdge($a, $b) ->addEdge($a, $c) ->addEdge($a, $d) ->addEdge($b, $h) ->addEdge($b, $e) ->addEdge($b, $f) ->addEdge($c, $f) ->addEdge($c, $g) ->addEdge($d, $g) ->addEdge($h, $i) ->addEdge($e, $i) ->addEdge($f, $i) ->addEdge($g, $i); // 需要在协程环境下执行 $dag->run();
输出:
// 1s 后 A // 2s 后 D C B // 3s 后 G F E H // 4s 后 I
DAG 会按照尽可能早的原则调度任务。尝试将 B 点的耗时调整为 2 秒,会发现 B 和 G 一起完成。
访问前步结果
每一个任务可以接收一个数组参数,数组中包含所有前置依赖的结果。DAG
执行完毕后,也会返回一个同样结构的数组,包含每一步的执行结果。
<?php $dag = new \Hyperf\Dag\Dag(); $a = \Hyperf\Dag\Vertex::make(function() {return 1;}); $b = \Hyperf\Dag\Vertex::make(function($results) use ($a) { return $results[$a->key] + 1; }); $results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run(); assert($results[$a->key] === 1); assert($results[$b->key] === 2);
定义一个任务
在上述文档中,我们使用了闭包来定义一个任务。格式如下。
// Vertex::make 的第二个参数为可选参数,作为 vertex 的 key,也就是结果数组的键值。 \Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");
除了使用闭包函数定义任务外,还可以使用实现了 \Hyperf\Dag\Runner
接口的类来定义,并通过 Vertex::of
将其转化为一个顶点。
class MyJob implements \Hyperf\Dag\Runner { public function run($results = []) { return 'hello'; } } \Hyperf\Dag\Vertex::of(new MyJob(), "greeting");
\Hyperf\Dag\Dag
本身也实现了 \Hyperf\Dag\Runner
接口,所以可以嵌套使用。
<?php // 命名空间已省略 $a = Vertex::make(function () { return 1;}); $b = Vertex::make(function () { return 2;}); $c = Vertex::make(function () { return 3;}); $nestedDag = new Dag(); $nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b); $d = Vertex::of($nestedDag); $superDag = new Dag(); $superDag->addVertex($c)->addVertex($d)->addEdge($c, $d); $superDag->run();
控制并发数
\Hyperf\Dag\Dag
类提供了 setConcurrency(int n)
方法控制最大并发数。默认为10。