流水线编排

使用 DataMate 进行可视化流程编排

流水线编排模块提供拖拽式可视化界面,帮助您设计和管理复杂的数据处理流程。

功能概述

流水线编排模块提供:

  • 可视化设计器:拖拽式流程设计
  • 丰富的节点类型:数据处理、条件判断、循环等
  • 流程执行:自动执行和监控流程
  • 模板管理:保存和复用流程模板
  • 版本管理:流程版本控制

节点类型

数据节点

节点功能配置
输入数据集从数据集读取数据选择数据集
输出数据集写入数据到数据集选择数据集
数据采集执行数据采集任务选择采集任务
数据清洗执行数据清洗任务选择清洗任务
数据标注执行数据标注任务选择标注任务
数据合成执行数据合成任务选择合成任务
数据评估执行数据评估任务选择评估任务

逻辑节点

节点功能配置
条件分支根据条件执行不同分支条件表达式
循环重复执行节点循环次数/条件
并行并行执行多个分支分支数量
等待等待指定时间等待时长
跳转跳转到指定节点目标节点

算子节点

节点功能配置
清洗算子执行单个清洗算子算子类型、参数
合成算子执行数据合成指令模板、参数
自定义算子执行自定义算子算子文件、参数

集成节点

节点功能配置
HTTP 请求发送 HTTP 请求URL、方法、参数
API 调用调用 DataMate APIAPI 端点、参数
消息队列发送/接收消息队列配置
Webhook触发 WebhookWebhook URL

快速开始

1. 创建流水线

步骤 1:进入流水线编排页面

在左侧导航栏选择 流水线编排

步骤 2:创建流水线

点击右上角 创建流水线 按钮。

步骤 3:填写基本信息

  • 流水线名称:例如 data_processing_pipeline
  • 流水线描述:描述流水线用途(可选)
  • 标签:添加标签(可选)

步骤 4:进入流程设计器

点击 创建 后,自动进入流程设计器。

2. 设计流程

步骤 1:添加节点

  1. 从左侧节点库拖拽节点到画布
  2. 或双击节点库中的节点

示例流程:

输入数据集 → 数据清洗 → 条件分支
                              ├── 满足条件 → 数据标注 → 输出数据集
                              └── 不满足条件 → 数据合成 → 输出数据集

步骤 2:配置节点

点击节点,在右侧配置面板填写参数:

输入数据集节点

  • 选择数据集:raw_data
  • 数据过滤:(可选)

数据清洗节点

  • 清洗任务:选择已创建的清洗任务
  • 失败策略:跳过 / 终止

条件分支节点

  • 条件表达式:{quality_score} > 0.8

步骤 3:连接节点

  1. 从源节点的输出端口拖拽到目标节点的输入端口
  2. 或选择两个节点后点击连接按钮

步骤 4:保存流程

点击右上角 保存 按钮保存流程。

3. 执行流水线

步骤 1:进入执行页面

在流水线列表页,点击流水线名称进入详情。

步骤 2:执行流水线

点击 立即执行 按钮。

步骤 3:监控执行

在执行监控页面可以看到:

  • 执行状态:运行中 / 已完成 / 失败
  • 执行进度:当前节点、总进度
  • 节点状态:每个节点的执行状态
  • 执行日志:详细的执行日志

示例执行状态:

[✓] 输入数据集 - 已完成 (1000 条记录)
[✓] 数据清洗 - 已完成 (950 条记录)
[→] 条件分支 - 运行中
  [→] 分支 1 (满足条件) - 等待中
  [→] 分支 2 (不满足条件) - 运行中
    [✓] 数据合成 - 已完成 (500 条记录)
    [→] 输出数据集 - 运行中

4. 管理流水线

查看流水线列表

在流水线编排页面,可以看到所有流水线:

流水线名称状态最后执行时间执行次数操作
数据处理流程已启用2024-01-1515执行 编辑
模型训练流程已禁用2024-01-148执行 编辑

编辑流水线

点击 编辑 按钮可以修改流程设计。

启用/禁用流水线

点击 启用/禁用 按钮切换流水线状态。

删除流水线

点击 删除 按钮删除流水线(需确认)。

高级功能

流程模板

保存为模板

  1. 设计好流程后
  2. 点击 保存为模板
  3. 输入模板名称和描述
  4. 点击保存

使用模板

  1. 创建流水线时,点击 使用模板
  2. 从模板列表选择模板
  3. 系统加载模板到设计器

参数化流程

流程中可以使用参数,实现动态配置:

定义参数

在流水线配置中定义参数:

{
  "parameters": [
    {
      "name": "input_dataset",
      "type": "dataset",
      "required": true,
      "description": "输入数据集"
    },
    {
      "name": "threshold",
      "type": "number",
      "default": 0.8,
      "description": "质量阈值"
    }
  ]
}

使用参数

在节点配置中使用参数:

条件表达式:{quality_score} > ${threshold}

定时执行

配置定时执行流水线:

  1. 在流水线详情页,点击 定时执行
  2. 添加定时规则:
    • Cron 表达式:0 0 2 * * ?(每天凌晨 2 点)
    • 执行参数:填写参数值
  3. 启用定时任务

事件触发

配置事件触发:

  1. 在流水线详情页,点击 事件触发
  2. 添加触发事件:
    • 数据上传完成
    • 任务执行完成
    • Webhook 触发
  3. 配置触发条件

错误处理

节点级错误处理

为每个节点配置失败策略:

  • 跳过:跳过失败节点,继续执行
  • 重试:自动重试(可配置重试次数)
  • 终止:终止整个流程

流程级错误处理

配置全局错误处理:

  • 失败通知:发送通知(邮件、Webhook)
  • 回滚操作:执行回滚流程
  • 记录日志:详细记录错误信息

版本管理

创建版本

在流水线编辑页,点击 保存版本

  • 输入版本号:v1.0.0
  • 输入版本说明
  • 点击保存

查看版本历史

在流水线详情页,点击 版本历史

版本创建时间创建者说明操作
v1.0.02024-01-15admin初始版本查看 恢复
v1.1.02024-01-16admin添加数据清洗查看 恢复

恢复版本

点击版本的 恢复 按钮,将流程恢复到该版本。

最佳实践

1. 流程设计

推荐的设计原则:

  • 模块化:将复杂流程拆分为多个子流程
  • 可重用:使用模板和参数化
  • 可维护:添加注释和说明
  • 可测试:先测试单个节点再组合

2. 性能优化

优化流程性能:

  • 并行执行:使用并行节点提高效率
  • 减少数据传输:尽量在数据所在位置处理
  • 批量处理:使用批量操作代替循环
  • 缓存中间结果:避免重复计算

3. 错误处理

完善的错误处理:

  • 设置超时:为每个节点设置合理的超时时间
  • 配置重试:对可能失败的操作配置重试
  • 记录日志:详细记录执行日志
  • 通知机制:关键节点失败时发送通知

4. 监控和调试

有效的监控调试:

  • 实时监控:监控流程执行状态
  • 查看日志:查看详细的执行日志
  • 断点调试:在关键节点设置断点
  • 数据预览:预览中间结果

常见问题

Q: 流程执行失败怎么办?

A: 排查步骤:

  1. 查看执行日志:获取详细错误信息
  2. 检查节点配置:确认节点参数正确
  3. 检查数据:确认输入数据格式正确
  4. 单独测试:单独执行问题节点
  5. 检查依赖:确认依赖服务正常

Q: 如何优化流程性能?

A: 优化建议:

  1. 并行化:使用并行节点并行执行
  2. 批量操作:合并多个小操作
  3. 减少数据传输:尽量在本地处理
  4. 缓存结果:缓存中间结果避免重复计算
  5. 优化节点顺序:先过滤数据再处理

Q: 如何处理复杂的条件逻辑?

A: 处理复杂逻辑:

  1. 使用多个条件节点:拆分复杂条件
  2. 使用表达式:编写复杂的条件表达式
  3. 自定义脚本:使用自定义算子处理复杂逻辑
  4. 子流程:将复杂逻辑封装为子流程

Q: 流程可以嵌套吗?

A: 支持流程嵌套:

  1. 子流程节点:调用其他流程作为子流程
  2. 参数传递:在父子流程间传递参数
  3. 返回值:子流程可以返回结果给父流程

Q: 如何调试流程?

A: 调试方法:

  1. 单步执行:逐步执行流程观察中间结果
  2. 数据预览:预览每个节点的输入输出
  3. 日志查看:查看详细的执行日志
  4. 断点设置:在关键节点设置断点
  5. 模拟执行:使用测试数据模拟执行

相关文档


最后修改 February 6, 2026: :tada: add full featured docs (bf83ee1)