Skip to content

DataProcessCore 使用指南

📋 概述

DataProcessCore 是一个统一的文件处理核心类,支持多种文件格式的自动检测和处理,提供灵活的分块策略和多种输入源支持。

⭐ 主要功能

1. 核心处理方法:file_process()

函数签名:

python
def file_process(self, 
                file_path_or_url: Optional[str] = None, 
                file_data: Optional[bytes] = None, 
                chunking_strategy: str = "basic", 
                destination: str = "local", 
                filename: Optional[str] = None, 
                **params) -> List[Dict]

参数说明:

参数名类型必需描述可选值
file_path_or_urlstr否*本地文件路径或远程URL任何有效的文件路径或URL
file_databytes否*文件的字节数据(用于内存处理)任何有效的字节数据
chunking_strategystr分块策略"basic", "by_title", "none"
destinationstr目标类型,指示文件来源"local", "minio", "url"
filenamestr否**文件名任何有效的文件名
**paramsdict额外的处理参数见下方参数详情

*注:file_path_or_urlfile_data 必须提供其中一个 **注:使用 file_data 时,filename 为必需参数

分块策略 (chunking_strategy) 详解:

策略值描述适用场景输出特点
"basic"基础分块策略大多数文档处理场景根据内容长度自动分块
"by_title"按标题分块结构化文档(如技术文档、报告)以标题为界限进行分块
"none"不分块短文档或需要完整内容的场景返回单个包含全部内容的块

目标类型 (destination) 详解:

目标值描述使用场景要求
"local"本地文件处理本地存储的文件提供有效的本地文件路径
"minio"MinIO存储处理云存储中的文件需要数据库依赖
"url"远程URL处理网络资源需要数据库依赖

额外参数 (**params) 详解:

参数名类型默认值描述适用处理器
max_charactersint1500每个块的最大字符数Generic
new_after_n_charsint1200达到此字符数后开始新块Generic
strategystr"fast"处理策略Generic
skip_infer_table_typeslist[]跳过推断的表格类型Generic
task_idstr""任务标识符Generic

返回值格式:

返回 List[Dict],每个字典包含以下字段:

通用字段:

字段名类型描述示例
contentstr文本内容"这是文档的第一段..."
path_or_urlstr文件路径或URL"/path/to/file.pdf"
filenamestr文件名"document.pdf"

Excel文件额外字段:

字段名类型描述示例
metadatadict元数据信息{"chunk_index": 0, "file_type": "xlsx"}

Generic文件额外字段:

字段名类型描述示例
languagestr语言标识(可选)"en"
metadatadict元数据信息(可选){"chunk_index": 0}

📁 支持的文件类型

Excel文件

  • .xlsx - Excel 2007及更高版本
  • .xls - Excel 97-2003版本

通用文件

  • .txt - 纯文本文件
  • .pdf - PDF文档
  • .docx - Word 2007及更高版本
  • .doc - Word 97-2003版本
  • .html, .htm - HTML文档
  • .md - Markdown文件
  • .rtf - 富文本格式
  • .odt - OpenDocument文本
  • .pptx - PowerPoint 2007及更高版本
  • .ppt - PowerPoint 97-2003版本

💡 使用示例

示例1:处理本地文本文件

python
from nexent.data_process import DataProcessCore

core = DataProcessCore()

# 基础处理
result = core.file_process(
    file_path_or_url="/path/to/document.txt",
    destination="local",
    chunking_strategy="basic"
)

print(f"处理得到 {len(result)} 个块")
for i, chunk in enumerate(result):
    print(f"块 {i}: {chunk['content'][:100]}...")

示例2:处理Excel文件

python
# 处理Excel文件
result = core.file_process(
    file_path_or_url="/path/to/spreadsheet.xlsx",
    destination="local",
    chunking_strategy="none"  # Excel通常不需要分块
)

for chunk in result:
    print(f"文件: {chunk['filename']}")
    print(f"内容: {chunk['content']}")
    print(f"元数据: {chunk['metadata']}")

示例3:处理内存中的文件

python
# 读取文件到内存
with open("/path/to/document.pdf", "rb") as f:
    file_bytes = f.read()

# 处理内存中的文件
result = core.file_process(
    file_data=file_bytes,
    filename="document.pdf",
    chunking_strategy="by_title",
    max_characters=2000  # 自定义参数
)

示例4:处理远程文件(需要数据库依赖)

python
# 处理MinIO中的文件
result = core.file_process(
    file_path_or_url="minio://bucket/path/to/file.docx",
    destination="minio",
    filename="file.docx",
    chunking_strategy="basic"
)

🛠️ 辅助方法

1. 获取支持的文件类型

python
core = DataProcessCore()
supported_types = core.get_supported_file_types()
print("Excel文件:", supported_types["excel"])
print("通用文件:", supported_types["generic"])

2. 验证文件类型

python
is_supported = core.validate_file_type("document.pdf")
print(f"PDF文件是否支持: {is_supported}")

3. 获取处理器信息

python
info = core.get_processor_info("spreadsheet.xlsx")
print(f"处理器类型: {info['processor_type']}")
print(f"文件扩展名: {info['file_extension']}")
print(f"是否支持: {info['is_supported']}")

4. 获取支持的策略和目标类型

python
strategies = core.get_supported_strategies()
destinations = core.get_supported_destinations()
print(f"支持的分块策略: {strategies}")
print(f"支持的目标类型: {destinations}")

⚠️ 错误处理

常见异常

异常类型触发条件解决方案
ValueError参数无效(如同时提供file_path_or_url和file_data)检查参数组合
FileNotFoundError本地文件不存在或远程文件无法获取验证文件路径
ImportError处理远程文件时缺少数据库依赖安装相关依赖

错误处理示例

python
try:
    result = core.file_process(
        file_path_or_url="/nonexistent/file.txt",
        destination="local"
    )
except FileNotFoundError as e:
    print(f"文件未找到: {e}")
except ValueError as e:
    print(f"参数错误: {e}")
except Exception as e:
    print(f"处理失败: {e}")

🚀 性能优化建议

  1. 选择合适的分块策略

    • 小文件使用 "none"
    • 大文件使用 "basic"
    • 结构化文档使用 "by_title"
  2. 调整分块参数

    • 根据下游处理需求调整 max_characters
    • 平衡处理速度和内存使用
  3. 文件类型优化

    • Excel文件通常不需要分块
    • PDF文件建议使用较大的 max_characters
  4. 批量处理

    • 复用 DataProcessCore 实例
    • 避免重复初始化

🔄 数据流架构

Nexent 系统中的数据处理遵循以下流程模式:

1. 用户请求流程

用户输入 → 前端验证 → API调用 → 后端路由 → 业务服务 → 数据访问 → 数据库

2. AI Agent执行流程

用户消息 → Agent创建 → 工具调用 → 模型推理 → 流式响应 → 结果保存

3. 知识库文件处理流程

文件上传 → 临时存储 → 数据处理 → 向量化 → 知识库存储 → 索引更新

详细步骤

  1. 文件上传: 前端接收用户上传的文件
  2. 临时存储: 文件存储到临时位置或MinIO
  3. 数据处理: 使用 DataProcessCore 进行格式转换和分块
  4. 向量化: 通过嵌入模型生成向量表示
  5. 知识库存储: 将处理后的内容存储到Elasticsearch
  6. 索引更新: 更新搜索索引以支持检索

4. 实时文件处理流程

文件上传 → 临时存储 → 数据处理 → Agent处理 → 实时回答

详细步骤

  1. 文件上传: 用户在对话中上传文件
  2. 临时存储: 文件临时保存用于处理
  3. 数据处理: 实时提取文件内容和结构
  4. Agent处理: AI智能体分析文件内容
  5. 实时回答: 基于文件内容提供即时回复

数据处理优化策略

  • 异步处理: 大文件处理使用异步任务队列
  • 批量操作: 多文件处理时使用批量优化
  • 缓存机制: 重复文件的处理结果缓存
  • 流式处理: 大文件的内存流式处理