rabbit/data-pipeline

1.0.0 2020-06-19 07:02 UTC

This package is auto-updated.

Last update: 2023-01-15 17:27:19 UTC


README

  • 基于插件的流程处理组件

组件配置

return [
        'scheduler' => [
        '{}' => Scheduler::class,
        '()' => [
            [
                '{}' => YamlParser::class,
                '()' => [
                    App::getAlias('@yaml')
                ]
            ]
        ],
        'senders' => arrdef([
            'worker' => definition(WorkerSender::class),
            'process' => definition(ProcessSender::class)
        ]),
        'cron' => definition('cron')
    ]
];

任务配置

lock: task lock time,default null
cron: crontab or int,default null.int is later sec, -1 is only run once

通用配置

type: Plugin Class String
start: true|false
output: Plugin Name
errHandler: callable
  • type插件完整类名
  • start是否开始插件,默认false
  • output输出到下一个插件,支持Http输出到远程插件,worker多进程任务,格式
    1. PluginName String|(http|worker):address:PluginName String,默认不等待
    2. ['PluginName String|(http|worker):address:PluginName String':true|false|int]
    • PS:true启用协程等待,不设超时;int启用协程等待,值为超时时间;false不等待
  • errHandler错误处理函数

内置插件

Sources

Common

TransForms

Sinks

插件开发

  • 继承AbstractPlugin
  • 实现run函数
  • 通过init$this->config中获取配置参数初始化配置属性
  • 插件间传值通过Message类,开启协程传递需要clone Message,输出值赋值到克隆的Message->data属性
  • PS:配置属性禁止在运行过程中修改,插件为单例模式,协程环境下会造成属性污染。可使用局部变量替代,例如$name=$this->name,使用$name