流水线编排
使用 DataMate 进行可视化流程编排
流水线编排模块提供拖拽式可视化界面,帮助您设计和管理复杂的数据处理流程。
功能概述
流水线编排模块提供:
- 可视化设计器:拖拽式流程设计
- 丰富的节点类型:数据处理、条件判断、循环等
- 流程执行:自动执行和监控流程
- 模板管理:保存和复用流程模板
- 版本管理:流程版本控制
节点类型
数据节点
| 节点 | 功能 | 配置 |
|---|---|---|
| 输入数据集 | 从数据集读取数据 | 选择数据集 |
| 输出数据集 | 写入数据到数据集 | 选择数据集 |
| 数据采集 | 执行数据采集任务 | 选择采集任务 |
| 数据清洗 | 执行数据清洗任务 | 选择清洗任务 |
| 数据标注 | 执行数据标注任务 | 选择标注任务 |
| 数据合成 | 执行数据合成任务 | 选择合成任务 |
| 数据评估 | 执行数据评估任务 | 选择评估任务 |
逻辑节点
| 节点 | 功能 | 配置 |
|---|---|---|
| 条件分支 | 根据条件执行不同分支 | 条件表达式 |
| 循环 | 重复执行节点 | 循环次数/条件 |
| 并行 | 并行执行多个分支 | 分支数量 |
| 等待 | 等待指定时间 | 等待时长 |
| 跳转 | 跳转到指定节点 | 目标节点 |
算子节点
| 节点 | 功能 | 配置 |
|---|---|---|
| 清洗算子 | 执行单个清洗算子 | 算子类型、参数 |
| 合成算子 | 执行数据合成 | 指令模板、参数 |
| 自定义算子 | 执行自定义算子 | 算子文件、参数 |
集成节点
| 节点 | 功能 | 配置 |
|---|---|---|
| HTTP 请求 | 发送 HTTP 请求 | URL、方法、参数 |
| API 调用 | 调用 DataMate API | API 端点、参数 |
| 消息队列 | 发送/接收消息 | 队列配置 |
| Webhook | 触发 Webhook | Webhook URL |
快速开始
1. 创建流水线
步骤 1:进入流水线编排页面
在左侧导航栏选择 流水线编排。
步骤 2:创建流水线
点击右上角 创建流水线 按钮。
步骤 3:填写基本信息
- 流水线名称:例如
data_processing_pipeline - 流水线描述:描述流水线用途(可选)
- 标签:添加标签(可选)
步骤 4:进入流程设计器
点击 创建 后,自动进入流程设计器。
2. 设计流程
步骤 1:添加节点
- 从左侧节点库拖拽节点到画布
- 或双击节点库中的节点
示例流程:
输入数据集 → 数据清洗 → 条件分支
├── 满足条件 → 数据标注 → 输出数据集
└── 不满足条件 → 数据合成 → 输出数据集
步骤 2:配置节点
点击节点,在右侧配置面板填写参数:
输入数据集节点:
- 选择数据集:
raw_data - 数据过滤:(可选)
数据清洗节点:
- 清洗任务:选择已创建的清洗任务
- 失败策略:跳过 / 终止
条件分支节点:
- 条件表达式:
{quality_score} > 0.8
步骤 3:连接节点
- 从源节点的输出端口拖拽到目标节点的输入端口
- 或选择两个节点后点击连接按钮
步骤 4:保存流程
点击右上角 保存 按钮保存流程。
3. 执行流水线
步骤 1:进入执行页面
在流水线列表页,点击流水线名称进入详情。
步骤 2:执行流水线
点击 立即执行 按钮。
步骤 3:监控执行
在执行监控页面可以看到:
- 执行状态:运行中 / 已完成 / 失败
- 执行进度:当前节点、总进度
- 节点状态:每个节点的执行状态
- 执行日志:详细的执行日志
示例执行状态:
[✓] 输入数据集 - 已完成 (1000 条记录)
[✓] 数据清洗 - 已完成 (950 条记录)
[→] 条件分支 - 运行中
[→] 分支 1 (满足条件) - 等待中
[→] 分支 2 (不满足条件) - 运行中
[✓] 数据合成 - 已完成 (500 条记录)
[→] 输出数据集 - 运行中
4. 管理流水线
查看流水线列表
在流水线编排页面,可以看到所有流水线:
| 流水线名称 | 状态 | 最后执行时间 | 执行次数 | 操作 |
|---|---|---|---|---|
| 数据处理流程 | 已启用 | 2024-01-15 | 15 | 执行 编辑 |
| 模型训练流程 | 已禁用 | 2024-01-14 | 8 | 执行 编辑 |
编辑流水线
点击 编辑 按钮可以修改流程设计。
启用/禁用流水线
点击 启用/禁用 按钮切换流水线状态。
删除流水线
点击 删除 按钮删除流水线(需确认)。
高级功能
流程模板
保存为模板
- 设计好流程后
- 点击 保存为模板
- 输入模板名称和描述
- 点击保存
使用模板
- 创建流水线时,点击 使用模板
- 从模板列表选择模板
- 系统加载模板到设计器
参数化流程
流程中可以使用参数,实现动态配置:
定义参数
在流水线配置中定义参数:
{
"parameters": [
{
"name": "input_dataset",
"type": "dataset",
"required": true,
"description": "输入数据集"
},
{
"name": "threshold",
"type": "number",
"default": 0.8,
"description": "质量阈值"
}
]
}
使用参数
在节点配置中使用参数:
条件表达式:{quality_score} > ${threshold}
定时执行
配置定时执行流水线:
- 在流水线详情页,点击 定时执行
- 添加定时规则:
- Cron 表达式:
0 0 2 * * ?(每天凌晨 2 点) - 执行参数:填写参数值
- Cron 表达式:
- 启用定时任务
事件触发
配置事件触发:
- 在流水线详情页,点击 事件触发
- 添加触发事件:
- 数据上传完成
- 任务执行完成
- Webhook 触发
- 配置触发条件
错误处理
节点级错误处理
为每个节点配置失败策略:
- 跳过:跳过失败节点,继续执行
- 重试:自动重试(可配置重试次数)
- 终止:终止整个流程
流程级错误处理
配置全局错误处理:
- 失败通知:发送通知(邮件、Webhook)
- 回滚操作:执行回滚流程
- 记录日志:详细记录错误信息
版本管理
创建版本
在流水线编辑页,点击 保存版本:
- 输入版本号:v1.0.0
- 输入版本说明
- 点击保存
查看版本历史
在流水线详情页,点击 版本历史:
| 版本 | 创建时间 | 创建者 | 说明 | 操作 |
|---|---|---|---|---|
| v1.0.0 | 2024-01-15 | admin | 初始版本 | 查看 恢复 |
| v1.1.0 | 2024-01-16 | admin | 添加数据清洗 | 查看 恢复 |
恢复版本
点击版本的 恢复 按钮,将流程恢复到该版本。
最佳实践
1. 流程设计
推荐的设计原则:
- 模块化:将复杂流程拆分为多个子流程
- 可重用:使用模板和参数化
- 可维护:添加注释和说明
- 可测试:先测试单个节点再组合
2. 性能优化
优化流程性能:
- 并行执行:使用并行节点提高效率
- 减少数据传输:尽量在数据所在位置处理
- 批量处理:使用批量操作代替循环
- 缓存中间结果:避免重复计算
3. 错误处理
完善的错误处理:
- 设置超时:为每个节点设置合理的超时时间
- 配置重试:对可能失败的操作配置重试
- 记录日志:详细记录执行日志
- 通知机制:关键节点失败时发送通知
4. 监控和调试
有效的监控调试:
- 实时监控:监控流程执行状态
- 查看日志:查看详细的执行日志
- 断点调试:在关键节点设置断点
- 数据预览:预览中间结果
常见问题
Q: 流程执行失败怎么办?
A: 排查步骤:
- 查看执行日志:获取详细错误信息
- 检查节点配置:确认节点参数正确
- 检查数据:确认输入数据格式正确
- 单独测试:单独执行问题节点
- 检查依赖:确认依赖服务正常
Q: 如何优化流程性能?
A: 优化建议:
- 并行化:使用并行节点并行执行
- 批量操作:合并多个小操作
- 减少数据传输:尽量在本地处理
- 缓存结果:缓存中间结果避免重复计算
- 优化节点顺序:先过滤数据再处理
Q: 如何处理复杂的条件逻辑?
A: 处理复杂逻辑:
- 使用多个条件节点:拆分复杂条件
- 使用表达式:编写复杂的条件表达式
- 自定义脚本:使用自定义算子处理复杂逻辑
- 子流程:将复杂逻辑封装为子流程
Q: 流程可以嵌套吗?
A: 支持流程嵌套:
- 子流程节点:调用其他流程作为子流程
- 参数传递:在父子流程间传递参数
- 返回值:子流程可以返回结果给父流程
Q: 如何调试流程?
A: 调试方法:
- 单步执行:逐步执行流程观察中间结果
- 数据预览:预览每个节点的输入输出
- 日志查看:查看详细的执行日志
- 断点设置:在关键节点设置断点
- 模拟执行:使用测试数据模拟执行
相关文档
意见反馈
这个页面对您有帮助吗?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.