掘金 人工智能 前天 08:10
使用TypeScript和AI SDK构建文档处理工作流
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

该项目利用TypeScript、NestJS框架以及AI SDK构建了一个强大的文档处理工作流。核心功能包括PDF文本提取、基于AI的文本清洗与段落拆分,以及最终将处理结果导出为结构化Markdown和TXT文件。通过定义清晰的提示词和模块化的服务,该系统能够高效处理条例类文档,确保输出内容的准确性和规范性,为后续的AI应用打下坚实基础。

🛠️ **项目架构与技术栈**:项目采用TypeScript、NestJS框架及AI SDK(如@ai-sdk/openai)构建,并依赖`pdf-parse`进行PDF内容提取。`package.json`和`tsconfig.json`文件详细定义了项目依赖和编译配置,确保了开发环境的标准化和高效性。

🧠 **AI驱动的文本处理**:核心在于`QwenService`,它通过精心设计的Prompt,利用阿里云的Qwen大模型对文本进行数据清洗(过滤页码、页眉页脚、乱码等)和子段拆分。该服务严格遵循输出JSON格式,支持章节与条目的合并,确保了文本数据的纯净与结构化。

📜 **条例类文档的精细化分块**:`ChunkService`专注于处理如法律法规等条例类文档。它通过`hierarchicalMerge`算法,识别文档的层级结构(章、节、条),将每一条法律条文独立切分,并携带章节信息。对于无明显层级结构的文档,则采用`simpleChunk`策略进行段落合并。

📄 **文件生成与管理**:`FileService`负责将AI处理后的JSON数据转化为易于阅读的Markdown文件(`partX.md`),并提供`integrateMdFiles`方法将所有Markdown文件合并为一个完整的TXT文件,使用`####`和`@@@`作为分隔符,便于后续分析和使用。同时,也提供了临时文件的清理功能。

首先,本次项目采用typescript+ai-sdk+nestjs框架搭建,我们给出项目的总体结构,大致如下图所示:

其中,首先创建package.json文件,文件内容为:

  "name": "workflow-chunking",  "version": "1.0.0",  "description": "Document chunking and processing workflow based on Dify iteration mechanism",  "scripts": {    "start": "ts-node src/main.ts",    "build": "tsc",    "test": "ts-node test.ts"  },  "dependencies": {    "@nestjs/common": "^10.3.0",    "@nestjs/core": "^10.3.0",    "@nestjs/platform-express": "^10.3.0",    "@nestjs/config": "^3.1.1",    "reflect-metadata": "^0.2.1",    "rxjs": "^7.8.1",    "ai": "^3.0.0",    "@ai-sdk/openai": "^0.0.66",    "pdf-parse": "^1.1.1",    "p-limit": "^5.0.0",    "axios": "^1.6.0"  },  "devDependencies": {    "@types/node": "^20.11.0",    "typescript": "^5.3.3",    "ts-node": "^10.9.2"  }}

对应的tsconfig.json文件如下所示:

  "compilerOptions": {    "module": "commonjs",    "target": "ES2021",    "lib": ["ES2021"],    "declaration": true,    "outDir": "./dist",    "rootDir": "./src",    "strict": true,    "esModuleInterop": true,    "skipLibCheck": true,    "forceConsistentCasingInFileNames": true,    "experimentalDecorators": true,    "emitDecoratorMetadata": true,    "resolveJsonModule": true  },  "include": ["src/**/*"],  "exclude": ["node_modules", "dist"]}

之后我们创建一个.env文件,存放大模型的api-key,本次数据清洗使用的大模型是qwen-max,因此设置对应的api,如下图所示:

之后我们输入:

pnpm install

下载json文件中所对应的包,之后构造我们的项目主要文件,首先是大模型服务qwen.service.ts文件,主要内容为给大模型的提示词以及模型相关配置,代码如下:

import { generateText } from 'ai';import { createOpenAI } from '@ai-sdk/openai';/** * 千问大模型服务 - 使用ai-sdk调用千问API */@Injectable()export class QwenService {  private readonly qwen;  private readonly prompt: string;  constructor() {    // 配置千问大模型    this.qwen = createOpenAI({      apiKey: process.env.DASHSCOPE_API_KEY || '',      baseURL: 'https://dashscope.aliyuncs.com/compatible-mode/v1',    });    // 从prompt.md加载的提示词    this.prompt = `你是一位专业的数据清洗与段落拆分工程师。任务:处理单条文本内容并生成JSON格式输出(只包含一个segment_01),将文本按“句子/子句”进行子段拆分,粒度适中(避免拆到词语级)。仅输出 text 与 tags,tags 内放子段字符串;若同时存在“章标题”和“条标题”,须将二者合并为一个首个子段(如“第二章保险合同 第十条”)。<文本内容>:{{context}}处理规则:一、数据清洗(重要)1. 过滤噪声内容:   - 页码标记:如"—1—"、"—2—"、"第1页"等   - 页眉页脚:重复出现的文档标题、作者信息等   - 空白行和无效数据   - 乱码字符   - 版权声明、广告信息2. 提取纯净的正文内容3. 保持有效文本的原文不变(不改写、不增删)二、子段拆分(替代关键词提取)1. 拆分单位:句子/子句级,不要细到词语2. 拆分依据:中文标点(。!?;:、)、换行和明显的语义停顿3. 粒度控制:单个子段建议约20–80个汉字;过短子段应与相邻合并;过长子段可在自然停顿处分割4. 保序:子段顺序必须与原文一致,覆盖全部正文且不重叠、不遗漏5. 不得改写文本;子段必须是原文的连续片段6. 若清洗后的正文整体较短(如不足20个汉字),则 tags 仅包含一个元素,等于 text 本身7. 标题合并:若检测到“章标题”(如“第二章保险合同”)与“条标题”(如“第十条”)均存在,则在 tags 中放置一个合并后的首个子段,格式为“{章标题} {条标题}”;正文内容另行拆分为后续子段,不重复包含标题文本。若仅存在其中之一,则该标题作为一个独立子段置于首位。三、输出字段1. segment_01.text:清洗后的完整正文(不改写)2. segment_01.tags:子段数组(按顺序,字符串数组),每个元素是一个子段原文输出格式(单条内容):示例:{    "segment_01": {        "text": "第二章保险合同\n第十条 保险合同是投保人与保险人约定保险权利义务关系的协议。",        "tags": [            "第二章保险合同 第十条",            "保险合同是投保人与保险人约定保险权利义务关系的协议。"        ]    }}【关键要求 - 必须遵守】1. 输入是单条内容,输出也必须是单条(只有segment_01)2. text字段:必须过滤掉页码、页眉页脚、乱码、空白行等噪声;只保留纯净正文;不要改写正文内容3. tags:覆盖正文全部内容;顺序一致;不重叠;每个子段是原文连续片段;若正文整体较短可仅包含一个等于 text 的元素;若存在“章+条”,首个子段必须为“{章标题} {条标题}”,正文另行拆分4. 只输出规定字段(text, tags),不要添加其它键5. 只输出纯JSON对象,不要任何额外说明文字6. 不要使用markdown代码块标记(如\`\`\`json)7. 不要输出"好的"、"以下是"、"根据要求"等话语8. 输出必须能被JSON.parse()直接解析现在开始处理,只输出JSON:`;  }  /**   * 从文本中提取JSON对象   */  private extractJSON(text: string): string {    // 移除可能的markdown代码块标记    text = text.replace(/```json\s*/g, '').replace(/```\s*/g, '');        // 尝试找到第一个{和最后一个}    const firstBrace = text.indexOf('{');    const lastBrace = text.lastIndexOf('}');        if (firstBrace === -1 || lastBrace === -1 || firstBrace >= lastBrace) {      throw new Error('无法在响应中找到有效的JSON对象');    }        // 提取JSON部分    const jsonStr = text.substring(firstBrace, lastBrace + 1);        // 验证是否为有效JSON    try {      JSON.parse(jsonStr);      return jsonStr;    } catch (error) {      console.error('提取的JSON无效:', jsonStr.substring(0, 200));      throw new Error('提取的JSON格式无效');    }  }  /**   * 调用千问大模型处理文本段落   */  async processSegment(context: string): Promise<string> {    try {      const { text } = await generateText({        model: this.qwen('qwen-max-latest'),        prompt: this.prompt.replace('{{context}}', context),        temperature: 0.7,        maxTokens: 4000,      });      // 提取并验证JSON      const cleanJSON = this.extractJSON(text);            return cleanJSON;    } catch (error) {      console.error('千问API调用失败:', error);      throw error;    }  }}

设置完提示词之后进入我们的主要分块代码chunk.service.ts文件,该代码文件主要进行条例类文档的分块,输入一个条例类pdf文件,例如法律法规,学生守则等,pdf中有着明确的第几章第几条的规范,该代码会将该pdf按照每一条进行切分,并在每一条的前面加入章节与条例信息,代码如下所示:

import * as fs from 'fs/promises';import pdfParse from 'pdf-parse';@Injectable()export class ChunkService {  // 文档的层次化模式匹配  private readonly BULLET_PATTERN = [    [      /^第[零一二三四五六七八九十百0-9]+(分?编|部分)/,      /^第[零一二三四五六七八九十百0-9]+章/,      /^第[零一二三四五六七八九十百0-9]+节/,      /^第[零一二三四五六七八九十百0-9]+条/,      /^[\((][零一二三四五六七八九十百]+[\))]/,    ],    [      /^第[0-9]+章/,      /^第[0-9]+节/,      /^[0-9]{1,2}[\. 、]/,      /^[0-9]{1,2}\.[0-9]{1,2}[^a-zA-Z/%~-]/,      /^[0-9]{1,2}\.[0-9]{1,2}\.[0-9]{1,2}/,      /^[0-9]{1,2}\.[0-9]{1,2}\.[0-9]{1,2}\.[0-9]{1,2}/,    ],    [      /^第[零一二三四五六七八九十百0-9]+章/,      /^第[零一二三四五六七八九十百0-9]+节/,      /^[零一二三四五六七八九十百]+[ 、]/,      /^[\((][零一二三四五六七八九十百]+[\))]/,      /^[\((][0-9]{1,2}[\))]/,    ],    [      /^PART (ONE|TWO|THREE|FOUR|FIVE|SIX|SEVEN|EIGHT|NINE|TEN)/,      /^Chapter (I+V?|VI*|XI|IX|X)/,      /^Section [0-9]+/,      /^Article [0-9]+/,    ],  ];  /**   * 判断是否不是有效的项目符号   */  private notBullet(line: string): boolean {    const patterns = [/^0$/, /^[0-9]+ +[0-9~个只-]/, /^[0-9]+\.{2,}/];    return patterns.some((p) => p.test(line));  }  /**   * 识别文档使用的项目符号类别   */  private bulletsCategory(sections: string[]): number {    const hits = new Array(this.BULLET_PATTERN.length).fill(0);    for (let i = 0; i < this.BULLET_PATTERN.length; i++) {      const patternGroup = this.BULLET_PATTERN[i];      for (const section of sections) {        for (const pattern of patternGroup) {          if (pattern.test(section) && !this.notBullet(section)) {            hits[i]++;            break;          }        }      }    }    let maxHits = 0;    let result = -1;    for (let i = 0; i < hits.length; i++) {      if (hits[i] > maxHits) {        result = i;        maxHits = hits[i];      }    }    return result;  }  /**   * 判断文本是否包含中文   */  private isChinese(text: string): boolean {    if (!text) return false;    const chineseCount = (text.match(/[\u4e00-\u9fff]/g) || []).length;    return chineseCount / text.length > 0.2;  }  /**   * 计算token数量   */  private numTokensFromString(text: string): number {    if (this.isChinese(text)) {      const chineseChars = (text.match(/[\u4e00-\u9fff]/g) || []).length;      const englishWords = (text.match(/[a-zA-Z]+/g) || []).length;      return chineseChars + englishWords;    } else {      return text.split(/\s+/).length;    }  }  /**   * 细粒度分块算法 - 每条法律条文单独成块,并携带章节信息   */  private hierarchicalMerge(    bulletType: number,    sections: string[],    depth: number = 5,  ): string[][] {    if (!sections.length || bulletType < 0) {      return this.simpleChunk(sections);    }    // 过滤空白和无效内容    const filteredSections = sections.filter(      (text) => text && text.split('@')[0].trim().length > 1 && !/^[0-9]+$/.test(text.split('@')[0].trim()),    );    // 为每个段落分配层级    const sectionLevels: number[] = new Array(filteredSections.length).fill(-1);        for (let i = 0; i < filteredSections.length; i++) {      const text = filteredSections[i];            for (let level = 0; level < this.BULLET_PATTERN[bulletType].length; level++) {        if (this.BULLET_PATTERN[bulletType][level].test(text.trim())) {          sectionLevels[i] = level;          break;        }      }    }    // 细粒度分块:每个"条"级别的内容单独成块,只携带章信息    const chunks: string[][] = [];    let currentChapter = '';        // 当前章标题    let currentArticleChunk: string[] = [];  // 当前条的内容        for (let i = 0; i < filteredSections.length; i++) {      const level = sectionLevels[i];      const text = filteredSections[i];            // 判断是否是"章"级别(可能带标题,如"第一章总则")      const chapterMatch = text.trim().match(/^(第[零一二三四五六七八九十百0-9]+(分?编|部分|章)[^\n]*)/);      const isChapter = !!chapterMatch;            // 判断是否是"节"级别(节单独作为一个chunk)      const sectionMatch = text.trim().match(/^(第[零一二三四五六七八九十百0-9]+节[^\n]*)/);      const isSection = !!sectionMatch;            // 判断是否是"条"级别      const isArticleLevel = level === 3 || /^第[零一二三四五六七八九十百0-9]+条/.test(text.trim());            if (isChapter) {        // 遇到新章,保存之前的条        if (currentArticleChunk.length > 0) {          const chunkContent = this.buildChunkWithChapter(currentChapter, currentArticleChunk);          chunks.push(chunkContent);          currentArticleChunk = [];        }                // 更新章标题        currentChapter = chapterMatch![1].trim();              } else if (isSection) {        // 遇到新节,保存之前的条,但不生成节标题的chunk        if (currentArticleChunk.length > 0) {          const chunkContent = this.buildChunkWithChapter(currentChapter, currentArticleChunk);          chunks.push(chunkContent);          currentArticleChunk = [];        }                // 节标题直接跳过,不生成单独的chunk              } else if (isArticleLevel) {        // 遇到新的条,保存之前的条        if (currentArticleChunk.length > 0) {          const chunkContent = this.buildChunkWithChapter(currentChapter, currentArticleChunk);          chunks.push(chunkContent);        }                // 开始新的条        currentArticleChunk = [text];              } else {        // 普通内容        if (currentArticleChunk.length > 0) {          currentArticleChunk.push(text);        }      }    }        // 添加最后一个chunk    if (currentArticleChunk.length > 0) {      const chunkContent = this.buildChunkWithChapter(currentChapter, currentArticleChunk);      chunks.push(chunkContent);    }    return chunks.filter((c) => c.length > 0);  }  /**   * 构建带章信息的chunk(不包含节)   */  private buildChunkWithChapter(chapter: string, content: string[]): string[] {    if (chapter) {      return [chapter, ...content];    }    return content;  }  /**   * 简单分块策略 - 用于没有明显层级结构的文档   * 按段落数量分块,每3-5个段落一块   */  private simpleChunk(sections: string[]): string[][] {    const chunks: string[][] = [];    let currentChunk: string[] = [];    const maxParagraphs = 5;  // 每个chunk最多5个段落    for (const section of sections) {      currentChunk.push(section);            if (currentChunk.length >= maxParagraphs) {        chunks.push([...currentChunk]);        currentChunk = [];      }    }        if (currentChunk.length > 0) {      chunks.push(currentChunk);    }    return chunks;  }  /**   * 移除目录部分   */  private removeContentsTable(sections: string[]): void {    let i = 0;    while (i < sections.length) {      const text = sections[i].trim();      if (!/^(contents|目录|目次|table of contents|致谢|acknowledge)$/i.test(text.replace(/[ \u3000]+/g, ''))) {        i++;        continue;      }      sections.splice(i, 1);      if (i >= sections.length) break;      let removedCount = 0;      while (i < sections.length && removedCount < 100) {        const line = sections[i].trim();        // 检测目录条目特征:包含页码、省略号、或特定格式        const isTableOfContentsLine =           line.length < 50 && (            /[\.。]{2,}/.test(line) ||  // 包含多个点(省略号)            /\d+\s*$/.test(line) ||      // 以数字结尾(页码)            /第[零一二三四五六七八九十百0-9]+[章节条]/g.test(line) // 章节标题          );                if (isTableOfContentsLine) {          sections.splice(i, 1);          removedCount++;        } else {          // 遇到正文内容,停止删除          break;        }                if (i >= sections.length) break;      }    }  }  /**   * 对纯文本进行法律文档切分   */  async chunkText(text: string): Promise<{ result: string[] }> {    // 按行分割    let sections = text      .split('\n')      .map((s) => s.trim())      .filter((s) => s);        if (!sections.length) {      return { result: [] };    }    // 移除目录    this.removeContentsTable(sections);    // 识别项目符号类型    const bulletType = this.bulletsCategory(sections);    // 层次化合并    const chunks = this.hierarchicalMerge(bulletType, sections, 5);    if (!chunks.length) {      return { result: [] };    }    // 转换为字符串列表    const resultSegments = chunks.map((chunk) => chunk.join('\n'));    return { result: resultSegments };  }  /**   * 从PDF文件提取文本   */  async extractTextFromPdf(filePath: string): Promise<string> {    const dataBuffer = await fs.readFile(filePath);    const data = await pdfParse(dataBuffer);        // 清洗PDF文本中的不必要换行    return this.cleanPdfText(data.text);  }  /**   * 清洗PDF文本,合并不必要的换行   */  private cleanPdfText(text: string): string {    // 将文本按行分割    const lines = text.split('\n');    const cleanedLines: string[] = [];        for (let i = 0; i < lines.length; i++) {      const currentLine = lines[i].trim();            // 跳过空行      if (!currentLine) {        cleanedLines.push('');        continue;      }            // 判断是否是标题行(章节条等)      const isTitleLine = /^(第[零一二三四五六七八九十百0-9]+(分?编|部分|章|节|条)|—\d+—)/.test(currentLine);            // 如果是标题行,单独成行      if (isTitleLine) {        cleanedLines.push(currentLine);      } else {        // 判断当前行是否应该与上一行合并        const lastLine = cleanedLines[cleanedLines.length - 1];        const lastLineIsTitleOrEmpty = !lastLine ||                                        lastLine.trim() === '' ||                                        /^(第[零一二三四五六七八九十百0-9]+(分?编|部分|章|节|条)|—\d+—)/.test(lastLine);                if (lastLineIsTitleOrEmpty) {          // 上一行是标题或空行,开始新行          cleanedLines.push(currentLine);        } else {          // 合并到上一行(不管上一行是否以句号结尾)          cleanedLines[cleanedLines.length - 1] = lastLine + currentLine;        }      }    }        return cleanedLines.join('\n');  }  /**   * 对文件进行文档切分   */  async chunkFile(filePath: string): Promise<{ result: string[] }> {    let text: string;    if (filePath.toLowerCase().endsWith('.pdf')) {      text = await this.extractTextFromPdf(filePath);    } else {      text = await fs.readFile(filePath, 'utf-8');    }    if (!text.trim()) {      return { result: [] };    }    const result = await this.chunkText(text);    console.log(`文档分割完成: 共生成 ${result.result.length} 个段落`);        return result;  }}

之后这些信息会传给大模型进行处理,转为我们需要的格式,将每一条信息的原文作为text,为了防止原文过长,将原文分为子段tag,tag的内容便是将原段落拆解后的内容,每个tag中都包含第几章第几条的信息。

之后创建文件生成与合并文件file.service.ts,该文件主要负责将大模型处理后的json格式文件写为md文件,并在不同text之间生成####分隔符,不同tag之间生成@@@分隔符,之后合并这些文件,生成一个完整的txt文件,该代码文件的主要内容如下:

import { Injectable } from '@nestjs/common';import * as fs from 'fs/promises';import * as path from 'path';/** * 文件服务 - 负责文件的生成和合并 */@Injectable()export class FileService {  private readonly outputDir = path.join(process.cwd(), 'output');  constructor() {    this.ensureOutputDir();  }  /**   * 确保输出目录存在   */  private async ensureOutputDir(): Promise<void> {    try {      await fs.mkdir(this.outputDir, { recursive: true });    } catch (error) {      console.error('创建输出目录失败:', error);    }  }  /**   * 生成单个MD文件   */  async generateMdFile(content: string, index: number): Promise<string> {    const filePath = path.join(this.outputDir, `part${index}.md`);    // 验证content是否为有效JSON    try {      JSON.parse(content);    } catch (error) {      console.error(`part${index}.md 的内容不是有效的JSON,尝试修复...`);      console.error('原始内容前200字符:', content.substring(0, 200));    }    // 移除管道符号(如generate.py中的处理)    const cleanContent = content.replace(/\|/g, '');    await fs.writeFile(filePath, cleanContent, 'utf-8');    return `文件 part${index}.md 生成完毕`;  }  /**   * 合并所有MD文件为TXT文件   */  async integrateMdFiles(outputFileName: string): Promise<string> {    const outputPath = path.join(this.outputDir, `${outputFileName}_combine.txt`);    // 获取所有part*.md文件    const files = await fs.readdir(this.outputDir);    const mdFiles = files      .filter((f) => f.toLowerCase().startsWith('part') && f.endsWith('.md'))      .sort((a, b) => {        const numA = parseInt(a.match(/\d+/)?.[0] || '0');        const numB = parseInt(b.match(/\d+/)?.[0] || '0');        return numA - numB;      });    const outputLines: string[] = [];    for (const file of mdFiles) {      const filePath = path.join(this.outputDir, file);      const content = await fs.readFile(filePath, 'utf-8');      try {        const data = JSON.parse(content);        // 遍历所有segment        for (const segment of Object.values(data)) {          const seg = segment as any;          // 写入text内容          if (seg.text) {            outputLines.push(seg.text);          }          // 写入 @@@tags(在同一行)          if (seg.tags && Array.isArray(seg.tags)) {            outputLines.push('@@@' + seg.tags.join('@@@'));          } else {            outputLines.push('@@@');          }          // 写入####分隔行          outputLines.push('####\n');        }      } catch (error) {        console.error(`解析文件 ${file} 失败:`, error);        console.error(`文件内容前500字符:\n${content.substring(0, 500)}`);        throw new Error(`文件 ${file} 的JSON格式无效,请检查大模型输出`);      }    }    await fs.writeFile(outputPath, outputLines.join('\n'), 'utf-8');    return '标注文件生成完毕!';  }  /**   * 清理输出目录中的临时文件   */  async cleanupTempFiles(): Promise<void> {    const files = await fs.readdir(this.outputDir);    const mdFiles = files.filter((f) => f.toLowerCase().startsWith('part') && f.endsWith('.md'));    for (const file of mdFiles) {      await fs.unlink(path.join(this.outputDir, file));    }  }}

之后便是迭代器的代码,为了并行的执行任务提升效率,创建iteration.service.ts文件,内容如下:

import pLimit from 'p-limit';/* * 支持并行处理和错误处理 */@Injectable()export class IterationService {  /**   * 并行处理迭代任务   * @param items 待处理的项目列表   * @param processor 处理函数   * @param parallelNums 并行数量,默认10   */  async runParallel<T, R>(    items: T[],    processor: (item: T, index: number) => Promise<R>,    parallelNums: number = 10,  ): Promise<R[]> {    if (!items || items.length === 0) {      return [];    }    // 使用p-limit控制并发数量    const limit = pLimit(parallelNums);    // 创建所有任务    const tasks = items.map((item, index) =>      limit(async () => {        try {          console.log(`开始处理第 ${index + 1}/${items.length} 个任务`);          const result = await processor(item, index);          console.log(`完成处理第 ${index + 1}/${items.length} 个任务`);          return result;        } catch (error) {          console.error(`处理第 ${index + 1} 个任务时出错:`, error);          throw error;        }      }),    );    // 等待所有任务完成    return Promise.all(tasks);  }  /**   * 串行处理迭代任务   * @param items 待处理的项目列表   * @param processor 处理函数   */  async runSequential<T, R>(items: T[], processor: (item: T, index: number) => Promise<R>): Promise<R[]> {    if (!items || items.length === 0) {      return [];    }    const results: R[] = [];    for (let index = 0; index < items.length; index++) {      try {        console.log(`开始处理第 ${index + 1}/${items.length} 个任务`);        const result = await processor(items[index], index);        results.push(result);        console.log(`完成处理第 ${index + 1}/${items.length} 个任务`);      } catch (error) {        console.error(`处理第 ${index + 1} 个任务时出错:`, error);        throw error;      }    }    return results;  }  /**   * 运行迭代任务(自动选择并行或串行)   * @param items 待处理的项目列表   * @param processor 处理函数   * @param isParallel 是否并行,默认true   * @param parallelNums 并行数量,默认10   */  async run<T, R>(    items: T[],    processor: (item: T, index: number) => Promise<R>,    isParallel: boolean = true,    parallelNums: number = 10,  ): Promise<R[]> {    if (isParallel) {      return this.runParallel(items, processor, parallelNums);    } else {      return this.runSequential(items, processor);    }  }}

之后便是形成一个工作流,代码分为工作流控制器与工作流服务,首先创建workflow.controller.ts文件,内容如下:

import { Controller, Post, Body } from '@nestjs/common';import { WorkflowService } from './workflow.service';/** * 工作流控制器 */@Controller('workflow')export class WorkflowController {  constructor(private readonly workflowService: WorkflowService) {}  /**   * 处理文档的API端点   */  @Post('process')  async processDocument(    @Body()    body: {      pdfPath: string;      outputFileName?: string;      parallelNums?: number;    },  ) {    const { pdfPath, outputFileName = 'output', parallelNums = 10 } = body;    try {      const result = await this.workflowService.processDocument(pdfPath, outputFileName, parallelNums);      return {        success: true,        message: result,      };    } catch (error: any) {      return {        success: false,        error: error.message,      };    }  }}

工作流服务workflow.service.ts文件内容如下:

import { ChunkService } from '../chunking/chunk.service';import { QwenService } from '../ai/qwen.service';import { IterationService } from '../iteration/iteration.service';import { FileService } from '../file/file.service';/** * 工作流服务 - 整合所有步骤的主要服务 */@Injectable()export class WorkflowService {  constructor(    private readonly chunkService: ChunkService,    private readonly qwenService: QwenService,    private readonly iterationService: IterationService,    private readonly fileService: FileService,  ) {}  /**   * 执行完整的文档处理工作流   * @param pdfPath PDF文件路径   * @param outputFileName 输出文件名   * @param parallelNums 并行数量,默认10   */  async processDocument(pdfPath: string, outputFileName: string = 'output', parallelNums: number = 10): Promise<string> {    console.log('=== 开始文档处理工作流 ===');    // 步骤1: 使用chunk.py算法分割文档    console.log('\n步骤1: 分割PDF文档...');    const chunkResult = await this.chunkService.chunkFile(pdfPath);    const segments = chunkResult.result;    if (!segments || segments.length === 0) {      throw new Error('文档分割失败,没有生成任何段落');    }    console.log(`文档分割完成,共生成 ${segments.length} 个段落\n`);    // 步骤2: 使用迭代并行模式处理每个段落    console.log(`步骤2: 使用大模型处理段落(并行度: ${parallelNums})...`);    await this.iterationService.runParallel(      segments,      async (segment: string, index: number) => {        // 2.1: 调用千问大模型处理        const llmResult = await this.qwenService.processSegment(segment);        // 2.2: 生成MD文件        await this.fileService.generateMdFile(llmResult, index);        return llmResult;      },      parallelNums,    );    console.log('所有段落处理完成\n');    // 步骤3: 合并所有MD文件为TXT文件    console.log('步骤3: 合并生成最终文件...');    const integrateResult = await this.fileService.integrateMdFiles(outputFileName);    console.log(integrateResult);    // 步骤4: 清理临时文件    console.log('\n步骤4: 清理临时文件...');    await this.fileService.cleanupTempFiles();    console.log('临时文件清理完成');    console.log('\n=== 文档处理工作流完成 ===');    return `处理完成!共处理 ${segments.length} 个段落,输出文件: ${outputFileName}_combine.txt`;  }}

主体服务已经搭建完毕,之后便是简单的app.module.ts文件与main.ts文件,app.module.ts文件主要内容如下:

import { ConfigModule } from '@nestjs/config';import { ChunkService } from './chunking/chunk.service';import { QwenService } from './ai/qwen.service';import { IterationService } from './iteration/iteration.service';import { FileService } from './file/file.service';import { WorkflowService } from './workflow/workflow.service';import { WorkflowController } from './workflow/workflow.controller';@Module({  imports: [    ConfigModule.forRoot({      isGlobal: true, // 使配置在整个应用中全局可用      envFilePath: '.env', // 指定.env文件路径    }),  ],  controllers: [WorkflowController],  providers: [ChunkService, QwenService, IterationService, FileService, WorkflowService],})export class AppModule {}

服务启动文件main.ts主要内容如下:

import { AppModule } from './app.module';async function bootstrap() {  const app = await NestFactory.create(AppModule);  // 启用CORS  app.enableCors();  const port = process.env.PORT || 3000;  await app.listen(port);  console.log(`应用程序运行在: http://localhost:${port}`);  console.log(`API端点: POST http://localhost:${port}/workflow/process`);  console.log(`请求体示例:{  "pdfPath": "/path/to/your/document.pdf",  "outputFileName": "output",  "parallelNums": 10}  `);}bootstrap();

至此,所有服务搭建完毕,可以启动main.ts进行测试,首先确保下载了ts-node或者其它ts环境输入,这里使用该指令进行测试:

ts-node src/main.ts

当输出以下信息时,证明启动成功,服务启动在本机的3000端口

接下来,我们使用一个post请求来进行测试,在根目录编辑一个测试脚本test.ts,内容如下:

import * as path from 'path';/** * 测试脚本 - 调用文档处理工作流API */const API_URL = 'http://localhost:3000/workflow/process';interface WorkflowRequest {  pdfPath: string;  outputFileName?: string;  parallelNums?: number;}interface WorkflowResponse {  success: boolean;  message?: string;  error?: string;}/** * 发送工作流处理请求 */async function testWorkflow(pdfPath: string, outputFileName: string = 'output', parallelNums: number = 10) {  console.log('=== 测试文档处理工作流 ===\n');  console.log(`PDF路径: ${pdfPath}`);  console.log(`输出文件名: ${outputFileName}`);  console.log(`并行数量: ${parallelNums}`);  console.log('\n发送请求...\n');  try {    const requestData: WorkflowRequest = {      pdfPath,      outputFileName,      parallelNums,    };    const response = await axios.post<WorkflowResponse>(API_URL, requestData, {      headers: {        'Content-Type': 'application/json',      },      timeout: 600000, // 10分钟超时    });    console.log('响应状态:', response.status);    console.log('响应数据:', JSON.stringify(response.data, null, 2));    if (response.data.success) {      console.log('\n处理成功!');      console.log(`${response.data.message}`);      console.log(`\n输出文件位置: output/${outputFileName}_combine.txt`);    } else {      console.log('\n处理失败!');      console.log(`错误: ${response.data.error}`);    }  } catch (error: any) {    console.error('\n请求失败!');    if (error.response) {      // 服务器响应了错误状态码      console.error('状态码:', error.response.status);      console.error('响应数据:', error.response.data);    } else if (error.request) {      // 请求已发送但没有收到响应      console.error('无法连接到服务器,请确保服务已启动 (npm start)');    } else {      // 其他错误      console.error('错误信息:', error.message);    }  }}/** * 主函数 */async function main() {  // 从命令行参数获取PDF路径,或使用默认路径  const args = process.argv.slice(2);    if (args.length === 0) {    console.log('使用方法:');    console.log('  npm run test [PDF路径] [输出文件名] [并行数量]');    console.log('\n示例:');    console.log('  npm run test ./sample.pdf');    console.log('  npm run test ./sample.pdf output 10');    console.log('  npm run test "C:\\Documents\\test.pdf" result 5');    console.log('\n或者使用默认测试路径:');    console.log('  npm run test\n');  }  const pdfPath = args[0] || './1.pdf';  const outputFileName = args[1] || 'output';  const parallelNums = args[2] ? parseInt(args[2]) : 10;  // 检查服务器是否运行  try {    await axios.get('http://localhost:3000', { timeout: 3000 });  } catch (error) {    console.error('警告: 无法连接到服务器 (http://localhost:3000)');    console.error('请先启动服务: npm start\n');  }  await testWorkflow(pdfPath, outputFileName, parallelNums);}// 运行主函数main().catch((error) => {  console.error('脚本执行失败:', error);  process.exit(1);});

启动该脚本,命令行会出现以下信息,证明大模型正在处理数据:

之后会出现output文件夹,逐渐产生md文件,最后会合并为txt文件,大致如下图所示:

生成的数据每个段落之间用####符号表示,每个段落的子段用@@@表示,在后续构建RAG索引时可以进行父子索引构建,以此平衡召回率与精确率。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

TypeScript NestJS AI SDK Document Processing PDF Extraction Text Cleaning Paragraph Segmentation Large Language Models Qwen Workflow Automation Markdown JSON
相关文章