算子市场

使用和管理 DataMate 算子

算子市场提供丰富的数据处理算子,支持自定义算子开发。

功能概述

算子市场模块提供:

  • 内置算子:丰富的内置数据处理算子
  • 算子发布:发布和分享自定义算子
  • 算子安装:安装第三方算子
  • 自定义开发:开发自定义算子

内置算子

数据清洗算子

算子功能输入输出
去重算子删除重复数据数据集去重数据
空值处理处理空值数据集填充数据
格式转换转换数据格式原格式新格式
类型转换转换数据类型原类型新类型

文本处理算子

算子功能
文本分词中文分词
去除停用词删除常见停用词
文本清洗清洗特殊字符
情感分析分析文本情感

图像处理算子

算子功能
格式转换转换图像格式
尺寸调整调整图像大小
质量检测检测图像质量
特征提取提取图像特征

数据增强算子

算子功能
文本增强同义词替换、回译
图像增强旋转、裁剪、颜色调整
数据合成基于模型的数据合成

快速开始

1. 浏览算子

步骤 1:进入算子市场

在左侧导航栏选择 算子市场

步骤 2:浏览算子

在算子市场页面,可以看到所有可用算子:

算子名称类别功能描述评分安装数操作
文本清洗算子文本处理清洗文本特殊字符⭐⭐⭐⭐⭐1.2K查看 安装
图像去重算子图像处理基于内容去重⭐⭐⭐⭐856查看 安装
数据合成算子数据增强使用 LLM 合成数据⭐⭐⭐⭐⭐2.3K查看 安装

步骤 3:查看算子详情

点击算子名称或 查看 按钮,可以看到:

  • 算子简介:功能和用途
  • 使用示例:如何使用算子
  • 输入输出:输入输出格式
  • 参数说明:可配置参数
  • 用户评价:其他用户的评价

2. 安装算子

安装内置算子

内置算子默认已安装,可以直接使用。

安装第三方算子

  1. 在算子详情页,点击 安装 按钮
  2. 等待安装完成
  3. 安装后可以在数据清洗等模块中使用

3. 使用算子

安装算子后,可以在以下场景使用:

数据清洗

  1. 创建数据清洗任务
  2. 在清洗流程中添加算子节点
  3. 配置算子参数
  4. 执行清洗任务

流水线编排

  1. 创建流水线
  2. 添加算子节点
  3. 配置算子参数
  4. 执行流水线

4. 开发自定义算子

创建算子

  1. 在算子市场页,点击 创建算子 按钮
  2. 填写算子信息:
    • 算子名称:例如 my_text_cleaner
    • 算子类别:选择类别
    • 功能描述:描述算子功能
    • 版本号:例如 1.0.0

编写算子代码

Python 算子示例

from typing import Dict, Any

class MyTextCleaner:
    """自定义文本清洗算子"""

    def __init__(self, config: Dict[str, Any]):
        """
        初始化算子

        Args:
            config: 算子配置
                - remove_special_chars: 是否去除特殊字符
                - to_lowercase: 是否转为小写
        """
        self.remove_special_chars = config.get('remove_special_chars', True)
        self.to_lowercase = config.get('to_lowercase', True)

    def process(self, data: Any) -> Any:
        """
        处理数据

        Args:
            data: 输入数据

        Returns:
            处理后的数据
        """
        if isinstance(data, str):
            result = data
            if self.to_lowercase:
                result = result.lower()
            if self.remove_special_chars:
                # 去除特殊字符
                import re
                result = re.sub(r'[^\w\s]', '', result)
            return result
        return data

    def get_config_schema(self) -> Dict[str, Any]:
        """
        获取配置模式

        Returns:
            配置模式定义
        """
        return {
            "type": "object",
            "properties": {
                "remove_special_chars": {
                    "type": "boolean",
                    "title": "去除特殊字符",
                    "default": True
                },
                "to_lowercase": {
                    "type": "boolean",
                    "title": "转为小写",
                    "default": True
                }
            }
        }

打包算子

  1. 创建算子目录结构:
my_text_cleaner/
├── __init__.py
├── operator.py       # 算子代码
├── config.json       # 配置文件
├── README.md         # 说明文档
└── requirements.txt  # 依赖列表
  1. 打包为 ZIP 文件

发布算子

  1. 在创建算子页,上传算子包
  2. 填写算子信息
  3. 点击 发布 按钮

发布选项

  • 私有算子:仅自己可见
  • 公开算子:所有人可见
  • 团队共享:团队成员可见

高级功能

算子版本管理

创建新版本

  1. 进入算子详情页
  2. 点击 版本 标签
  3. 点击 创建版本
  4. 上传新版本代码
  5. 填写版本说明

版本回退

如果新版本有问题,可以回退到旧版本。

算子测试

单元测试

为算子编写单元测试:

import unittest

class TestMyTextCleaner(unittest.TestCase):
    def setUp(self):
        config = {
            'remove_special_chars': True,
            'to_lowercase': True
        }
        self.cleaner = MyTextCleaner(config)

    def test_lowercase(self):
        input_text = "Hello World"
        output = self.cleaner.process(input_text)
        self.assertEqual(output, "hello world")

    def test_remove_special_chars(self):
        input_text = "Hello!!! World..."
        output = self.cleaner.process(input_text)
        self.assertEqual(output, "hello world")

在线测试

在算详情页,点击 测试 按钮:

  1. 输入测试数据
  2. 配置算子参数
  3. 执行测试
  4. 查看输出结果

算子权限管理

权限级别

级别说明
私有仅创建者可见和使用
团队团队成员可见和使用
公开所有用户可见和使用

权限设置

  1. 进入算子详情页
  2. 点击 设置 标签
  3. 修改权限级别
  4. 保存设置

最佳实践

1. 算子设计

好的算子设计:

  • 单一职责:每个算子只做一件事
  • 可配置:提供丰富的配置选项
  • 错误处理:完善的错误处理机制
  • 性能优化:考虑大数据量场景

2. 文档编写

完善的文档应该包含:

  • 功能说明:清晰描述算子功能
  • 使用示例:提供使用示例
  • 参数说明:详细说明每个参数
  • 注意事项:说明使用注意事项

3. 测试覆盖

充分的测试:

  • 单元测试:测试核心功能
  • 边界测试:测试边界情况
  • 性能测试:测试大数据量场景

常见问题

Q: 算子执行失败怎么办?

A: 排查步骤:

  1. 查看日志:查看详细错误信息
  2. 检查配置:确认参数配置正确
  3. 检查数据:确认输入数据格式正确
  4. 本地测试:在本地环境测试算子

Q: 如何优化算子性能?

A: 优化建议:

  1. 减少内存使用:使用生成器、迭代器
  2. 并行处理:使用多进程/多线程
  3. 批量处理:批量处理数据
  4. 缓存结果:缓存重复计算

Q: 如何分享算子?

A: 分享方式:

  1. 公开发布:发布为公开算子
  2. 团队共享:团队内共享
  3. 导出分享:导出算子包文件分享

相关文档