Zilliz 10月09日 22:29
RAG-Anything简化多模态PDF处理
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

RAG-Anything项目通过整合开源Milvus向量数据库,简化了传统RAG系统处理图文并茂PDF的复杂性。它采用“1+3+N”架构,包含一个核心引擎和三大模态处理器,支持文本、图像、表格和公式等多模态内容分析。项目实现了并行处理机制,大幅提升了文档处理速度,并通过分层存储优化检索效率。文章还提供了一个最小可运行示例,展示了如何基于RAG-Anything框架实现支持文本和图像处理的多模态问答系统。

📚 RAG-Anything项目通过整合开源Milvus向量数据库,简化了传统RAG系统处理图文并茂PDF的复杂性,无需依赖PyPDF2、OpenCV等工具。

🖼️ 它采用“1+3+N”架构,包含一个基于LightRAG的知识图谱构建引擎和三大模态处理器,分别处理图像、表格和公式,实现多模态内容分析。

⚡ 项目实现了并行处理机制,大幅提升了文档处理速度,并通过分层存储优化检索效率,为不同类型查询提供最适合的检索策略。

📊 文章提供了一个最小可运行示例,展示了如何基于RAG-Anything框架实现支持文本和图像处理的多模态问答系统,包括环境准备、配置、模型调用和Milvus集成等步骤。

原创 尹珉 2025-10-09 17:59 上海

PyPDF2、OpenCV、Camelot、Tesseract全都不需要了

AI落地主流场景之一是知识库,而做知识库,必定少不了PDF文件。

传统RAG要想精准读取这些图文并茂的PDF,就需要集成PyPDF2、OpenCV、Camelot、Tesseract等多个工具,系统庞杂且低效。此外,不同 PDF 各有侧重:报告重图表、财报重表格、论文重公式,如何精准调用这些工具同样难度不低。

香港大学数据科学学院刚刚开源的RAG-Anything项目,结合开源的Milvus向量数据库,让我们逐渐看到了解决这个问题的曙光。

通过将所有功能封装在一个框架内,并引入VLM增强查询机制,RAG-Anything可以在一个框架内实现分析文本、自动理解图像、表格等多模态内容,提供更全面的答案。

下面,我们将从架构以及实验案例出发,解释为什么它能处理文本、图像、表格与公式,并且在性能上站得住脚。

01 

1+3+N技术架构深度解析

RAG-Anything的核心架构可以用"1+3+N"来概括:

1个核心引擎基于LightRAG的知识图谱构建引擎,负责实体关系抽取和向量化存储。这个引擎的特别之处在于,它不仅处理文本实体,还能理解图像中的对象、表格中的数据关系。

3大模态处理器

N种解析器:支持MinerU和Docling两大解析引擎,可以根据文档类型自动选择最优解析策略。

在"1+3+N"技术架构的基础上,RAG-Anything实现了处理机制的性能突破。把传统传统RAG系统的串行处理方式(先解析文本,再处理图像,最后处理表格)升级成为了并行处理

    # 核心配置展示了并行处理的设计思路

    config = RAGAnythingConfig(

        working_dir="./rag_storage",

        parser="mineru",

        parse_method="auto",  # 自动选择最优解析策略

        enable_image_processing=True,

        enable_table_processing=True

        enable_equation_processing=True,

        max_workers=8  # 支持多线程并行处理

    )


    这种并行架构带来显著效果:处理大型技术文档的速度有了明显提升。测试表明,随着CPU核心数增加,系统处理能力几乎呈线性提升,大幅缩短了文档处理时间。

    除此之外,RAG-Anything还采用了分层存储与检索优化

    这种分层设计的优势在于,系统可以为不同类型的查询使用最适合的检索策略,而非简单地采用一刀切的向量相似度搜索。

    原理与架构已经讲清楚了。下面我们用一个最小可运行的示例,在 5 分钟内把文本与图像检索问答跑通。

    02 

    五分钟快速开始(最小可运行示例)

    2.1实验目标:

    本实验展示如何基于RAG-Anything框架集成Milvus向量数据库以及阿里通义大模型,实现支持文本和图像处理的多模态问答系统。(展示核心代码实现部分,并非完整代码)

    2.2为什么选择Milvus?

    Milvus的核心优势在于其存算分离的云原生架构,带来极致的弹性伸缩能力和成本效益。通过读写分离流批一体设计,它在保证高并发性能的同时,还能实现插入即可查的实时性能。此外,无单点故障的设计确保了企业级的高可用与高可靠性

    2.3运行步骤

    环境准备

      - python -m venv .venv && source .venv/bin/activate  # Windows 使用 .venvScriptsactivate

      - pip install -r requirements-min.txt

      cp .env.example .env 并填写 DASHSCOPE_API_KEY


        python minimal_[main.py](<http://main.py>)


        2.4目录结构

          .

          ├─ requirements-min.txt

          ├─ .env.example

          ├─ [config.py](<http://config.py>)

          ├─ milvus_[store.py](<http://store.py>)

          ├─ [adapters.py](<http://adapters.py>)

          ├─ minimal_[main.py](<http://main.py>)

          └─ sample

             ├─ docs

             │  └─ faq_milvus.txt

             └─ images

                └─ milvus_arch.png


          2.4.1项目依赖

            raganything

            lightrag

            pymilvus[lite]>=2.3.0

            aiohttp>=3.8.0

            orjson>=3.8.0

            python-dotenv>=1.0.0

            Pillow>=9.0.0

            numpy>=1.21.0,<2.0.0

            rich>=12.0.0


            2.4.2环境变量

              # 阿里云 DashScope

              DASHSCOPE_API_KEY=your_api_key_here

              # 端点如官方变更,请按需替换

              ALIYUN_LLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions

              ALIYUN_VLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions

              ALIYUN_EMBED_URL=https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding

              # 模型名(统一处配置)

              LLM_TEXT_MODEL=qwen-max

              LLM_VLM_MODEL=qwen-vl-max

              EMBED_MODEL=tongyi-embedding-vision-plus

              # Milvus Lite

              MILVUS_URI=milvus_lite.db

              MILVUS_COLLECTION=rag_multimodal_collection

              EMBED_DIM=1152


              2.4.3config配置

                import os

                from dotenv import load_dotenv

                load_dotenv()

                DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY""")

                LLM_TEXT_MODEL = os.getenv("LLM_TEXT_MODEL""qwen-max")

                LLM_VLM_MODEL = os.getenv("LLM_VLM_MODEL""qwen-vl-max")

                EMBED_MODEL = os.getenv("EMBED_MODEL""tongyi-embedding-vision-plus")

                ALIYUN_LLM_URL = os.getenv("ALIYUN_LLM_URL")

                ALIYUN_VLM_URL = os.getenv("ALIYUN_VLM_URL")

                ALIYUN_EMBED_URL = os.getenv("ALIYUN_EMBED_URL")

                MILVUS_URI = os.getenv("MILVUS_URI""milvus_lite.db")

                MILVUS_COLLECTION = os.getenv("MILVUS_COLLECTION""rag_multimodal_collection")

                EMBED_DIM = int(os.getenv("EMBED_DIM""1152"))

                # 基础运行参数

                TIMEOUT = 60

                MAX_RETRIES = 2


                2.4.4模型调用

                  import os

                  import base64

                  import aiohttp

                  import asyncio

                  from typing import ListDictAnyOptional

                  from config import (

                      DASHSCOPE_API_KEY, LLM_TEXT_MODEL, LLM_VLM_MODEL, EMBED_MODEL,

                      ALIYUN_LLM_URL, ALIYUN_VLM_URL, ALIYUN_EMBED_URL, EMBED_DIM, TIMEOUT

                  )

                  HEADERS = {

                      "Authorization"f"Bearer {DASHSCOPE_API_KEY}",

                      "Content-Type""application/json",

                  }

                  class AliyunLLMAdapter:

                      def __init__(self):

                          self.text_url = ALIYUN_LLM_URL

                          self.vlm_url = ALIYUN_VLM_URL

                          self.text_model = LLM_TEXT_MODEL

                          self.vlm_model = LLM_VLM_MODEL

                      async def chat(self, prompt: str) -> str:

                          payload = {

                              "model": self.text_model,

                              "input": {"messages": [{"role""user""content": prompt}]},

                              "parameters": {"max_tokens"1024"temperature"0.5},

                          }

                          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:

                              async with [s.post](<http://s.post>)(self.text_url, json=payload, headers=HEADERS) as r:

                                  r.raise_for_status()

                                  data = await r.json()

                                  return data["output"]["choices"][0]["message"]["content"]

                      async def chat_vlm_with_image(self, prompt: str, image_path: str) -> str:

                          with open(image_path, "rb"as f:

                              image_b64 = base64.b64encode([f.read](<http://f.read>)()).decode("utf-8")

                          payload = {

                              "model": self.vlm_model,

                              "input": {"messages": [{"role""user""content": [

                                  {"text": prompt},

                                  {"image"f"data:image/png;base64,{image_b64}"}

                              ]}]},

                              "parameters": {"max_tokens"1024"temperature"0.2},

                          }

                          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:

                              async with [s.post](<http://s.post>)(self.vlm_url, json=payload, headers=HEADERS) as r:

                                  r.raise_for_status()

                                  data = await r.json()

                                  return data["output"]["choices"][0]["message"]["content"]

                  class AliyunEmbeddingAdapter:

                      def __init__(self):

                          self.url = ALIYUN_EMBED_URL

                          self.model = EMBED_MODEL

                          self.dim = EMBED_DIM

                      async def embed_text(self, text: str) -> List[float]:

                          payload = {

                              "model": self.model,

                              "input": {"texts": [text]},

                              "parameters": {"text_type""query""dimensions": self.dim},

                          }

                          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:

                              async with [s.post](<http://s.post>)(self.url, json=payload, headers=HEADERS) as r:

                                  r.raise_for_status()

                                  data = await r.json()

                                  return data["output"]["embeddings"][0]["embedding"]


                  2.4.5milvus-lite集成

                    import json

                    import time

                    from typing import ListDictAnyOptional

                    from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility

                    from config import MILVUS_URI, MILVUS_COLLECTION, EMBED_DIM

                    class MilvusVectorStore:

                        def __init__(self, uri: str = MILVUS_URI, collection_name: str = MILVUS_COLLECTION, dim: int = EMBED_DIM):

                            self.uri = uri

                            self.collection_name = collection_name

                            self.dim = dim

                            self.collection: Optional[Collection] = None

                            self._connect_and_prepare()

                        def _connect_and_prepare(self):

                            connections.connect("default", uri=self.uri)

                            if utility.has_collection(self.collection_name):

                                self.collection = Collection(self.collection_name)

                            else:

                                fields = [

                                    FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=512, is_primary=True),

                                    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=self.dim),

                                    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),

                                    FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=32),

                                    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1024),

                                    FieldSchema(name="ts", dtype=[DataType.INT](<http://DataType.INT>)64),

                                ]

                                schema = CollectionSchema(fields, "Minimal multimodal collection")

                                self.collection = Collection(self.collection_name, schema)

                                self.collection.create_index("vector", {

                                    "metric_type""COSINE",

                                    "index_type""IVF_FLAT",

                                    "params": {"nlist"1024}

                                })

                            self.collection.load()

                        def upsert(self, ids: List[str], vectors: List[List[float]], contents: List[str],

                                   content_types: List[str], sources: List[str]) -> None:

                            data = [

                                ids,

                                vectors,

                                contents,

                                content_types,

                                sources,

                                [int(time.time() * 1000)] * len(ids)

                            ]

                            self.collection.upsert(data)

                            self.collection.flush()

                        def search(self, query_vectors: List[List[float]], top_k: int = 5, content_type: Optional[str] = None):

                            expr = f'content_type == "{content_type}"' if content_type else None

                            params = {"metric_type""COSINE""params": {"nprobe"16}}

                            results = [self.collection.search](<http://self.collection.search>)(

                                data=query_vectors,

                                anns_field="vector",

                                param=params,

                                limit=top_k,

                                expr=expr,

                                output_fields=["id""content""content_type""source""ts"]

                            )

                            out = []

                            for hits in results:

                                out.append([{

                                    "id": h.entity.get("id"),

                                    "content": h.entity.get("content"),

                                    "content_type": h.entity.get("content_type"),

                                    "source": h.entity.get("source"),

                                    "score": h.score

                                } for h in hits])

                            return out


                    2.4.6主入口

                      """

                      最小可运行示例:

                      - 将一段文本 FAQ 写入 LightRAG(仅作文本语境)

                      - 将一张图片描述向量写入 Milvus(图像检索语境)

                      - 执行两条查询:文本问答、图像问答

                      """

                      import asyncio

                      import uuid

                      from pathlib import Path

                      from rich import print

                      from lightrag import LightRAG, QueryParam

                      from lightrag.utils import EmbeddingFunc

                      from adapters import AliyunLLMAdapter, AliyunEmbeddingAdapter

                      from milvus_store import MilvusVectorStore

                      from config import EMBED_DIM

                      SAMPLE_DOC = Path("sample/docs/faq_milvus.txt")

                      SAMPLE_IMG = Path("sample/images/milvus_arch.png")

                      async def main():

                          # 1) 初始化组件

                          llm = AliyunLLMAdapter()

                          emb = AliyunEmbeddingAdapter()

                          store = MilvusVectorStore()

                          # 2) 初始化 LightRAG(仅文本检索)

                          async def llm_complete(prompt: str, max_tokens: int = 1024) -> str:

                              return await [llm.chat](<http://llm.chat>)(prompt)

                          async def embed_func(text: str) -> list:

                              return await emb.embed_text(text)

                          rag = LightRAG(

                              working_dir="rag_workdir_min",

                              llm_model_func=llm_complete,

                              embedding_func=EmbeddingFunc(

                                  embedding_dim=EMBED_DIM,

                                  max_token_size=8192,

                                  func=embed_func

                              ),

                          )

                          # 3) 数据插入:文本

                          if SAMPLE_DOC.exists():

                              text = SAMPLE_[DOC.read](<http://DOC.read>)_text(encoding="utf-8")

                              await rag.ainsert(text)

                              print("[green]已插入文本 FAQ 到 LightRAG[/green]")

                          else:

                              print("[yellow]未找到 sample/docs/faq_milvus.txt[/yellow]")

                          # 4) 数据插入:图像(描述存 Milvus)

                          if SAMPLE_IMG.exists():

                              # 用 VLM 生成该图片的简要描述,作为图像语义内容

                              desc = await [llm.chat](<http://llm.chat>)_vlm_with_image("请简要描述图中的 Milvus 架构要点。"str(SAMPLE_IMG))

                              vec = await emb.embed_text(desc)  # 采用文本嵌入统一维度,便于最小示例复用

                              store.upsert(

                                  ids=[str(uuid.uuid4())],

                                  vectors=[vec],

                                  contents=[desc],

                                  content_types=["image"],

                                  sources=[str(SAMPLE_IMG)]

                              )

                              print("[green]已插入图像描述到 Milvus(content_type=image)[/green]")

                          else:

                              print("[yellow]未找到 sample/images/milvus_arch.png[/yellow]")

                          # 5) 查询:文本问答(从 LightRAG)

                          q1 = "Milvus 是否支持同时插入与搜索?请给出简短回答。"

                          ans1 = await rag.aquery(q1, param=QueryParam(mode="hybrid"))

                          print("\\n[bold]文本问答[/bold]")

                          print(ans1)

                          # 6) 查询:图像相关(从 Milvus)

                          q2 = "Milvus 架构的关键组件有哪些?"

                          q2_vec = await emb.embed_text(q2)

                          img_hits = [store.search](<http://store.search>)([q2_vec], top_k=3, content_type="image")

                          print("\\n[bold]图像检索(返回图像语义描述)[/bold]")

                          print(img_hits[0if img_hits else [])

                      if __name__ == "__main__":

                          [asyncio.run](<http://asyncio.run>)(main())


                      2.4.7测试数据集(含milvus架构图)

                        Milvus 的成本是多少?

                        Milvus 是一个 100% 免费的开源项目。

                        在使用 Milvus 进行生产或发布时,请遵守Apache License 2.0

                        Milvus 背后的公司 Zilliz 还为那些不想构建和维护自己的分布式实例的用户提供完全托管的云版平台。Zilliz Cloud可自动维护数据的可靠性,并允许用户只为其使用付费。

                        Milvus 支持非 x86 架构吗?

                        Milvus不能在非x86平台上安装或运行。

                        您的 CPU 必须支持以下指令集之一才能运行 Milvus:SSE4.2AVXAVX2AVX512。这些都是 x86 专用 SIMD 指令集。

                        Milvus 在哪里存储数据?

                        Milvus 处理两种类型的数据:插入数据和元数据。

                        插入数据(包括向量数据、标量数据和特定于 Collections 的 Schema)以增量日志的形式存储在持久存储中。Milvus 支持多种对象存储后端,包括MinIO、AWS S3、谷歌云存储(GCS)、Azure Blob 存储、阿里云 OSS 和腾讯云对象存储(COS)。

                        元数据在 Milvus 内部生成。每个 Milvus 模块都有自己的元数据,这些元数据存储在 etcd 中。

                        为什么 etcd 中没有向量数据?

                        etcd 存储 Milvus 模块元数据;MinIO 存储实体。

                        Milvus 支持同时插入和搜索数据吗?

                        是的。插入操作和查询操作由两个相互独立的模块处理。从客户端的角度来看,当插入的数据进入消息队列时,插入操作符就完成了。但是,插入的数据在加载到查询节点之前是不可查询的。对于具有增量数据的增长数据段,Milvus 会自动建立临时索引,以确保高效的搜索性能,即使数据段大小未达到索引建立阈值(计算公式为dataCoord.segment.maxSize ×dataCoord.segment.sealProportion )。你可以通过Milvus 配置文件中的配置参数queryNode.segcore.interimIndex.enableIndex 来控制这种行为--将其设置为true 可启用临时索引(默认),而将其设置为false 则会禁用临时索引。

                        能否在 Milvus 中插入主键重复的向量?

                        可以。Milvus 不检查向量主键是否重复。

                        当插入主键重复的向量时,Milvus 是否将其视为更新操作?

                        Milvus目前不支持更新操作,也不检查实体主键是否重复。你有责任确保实体主键是唯一的,如果不是唯一的,Milvus 可能包含多个主键重复的实体。

                        如果出现这种情况,查询时将返回哪个数据副本仍是未知行为。这一限制将在今后的版本中修复。

                        自定义实体主键的最大长度是多少?

                        实体主键必须是非负 64 位整数。

                        每次插入操作可添加的最大数据量是多少?

                        插入操作的大小不得超过 1,024 MB。这是 gRPC 规定的限制。

                        在特定分区中搜索时,Collection 的大小会影响查询性能吗?

                        不会。如果指定了搜索的分区,Milvus 只搜索指定的分区。


                        2.4.8文本回答样例答案

                        2.4.9图像检索回答样例答案

                        到这里,我们已经能在本地跑通最小示例并理解其工作原理。

                        03 

                        技术展望:多模态RAG的未来发展方向

                        分析RAG-Anything项目后,我们不难发现,多模态RAG技术正处于关键转折点。

                        三个关键技术趋势如下:

                        第一,模态覆盖扩大。RAG-Anything当前支持文本、图像、表格和公式,未来,建立包含视频、音频和3D模型在内的统一多模态RAG框架是大势所趋。

                        第二,实时处理能力提升。目前多模态RAG主要针对静态文档,但流数据处理需求增长要求支持实时文档更新和增量索引功能。

                        第三,边缘计算将普及。随着MilvusLite等轻量方案发展,多模态RAG将延伸至移动设备和IoT领域,创造新应用可能。

                        作者介绍

                        Zilliz黄金写手:尹珉

                        推荐阅读

                        Embedding无敌,是做文档RAG最大的幻觉(含LangExtract+Milvus教程)

                        Word2Vec、 BERT、BGE-M3、LLM2Vec,embedding模型选型指南

                        全面测评LangChain vs LangGraph:谁是agent落地最优解

                        企业级向量数据库选型,Milvus 和Zilliz Cloud哪个更合适?

                        阅读原文

                        跳转微信打开

                        Fish AI Reader

                        Fish AI Reader

                        AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

                        FishAI

                        FishAI

                        鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

                        联系邮箱 441953276@qq.com

                        相关标签

                        RAG-Anything 多模态问答 知识库 PDF处理 Milvus LightRAG
                        相关文章