用LangGraph开发RAG研究多代理
在本文中,我们展示了一个使用LangGraph开发的RAG研究多代理工具。该工具旨在解决需要多个来源和迭代步骤才能得出最终答案的复杂问题。

在本文中,我们展示了一个使用LangGraph开发的RAG研究多代理工具。该工具旨在解决需要多个来源和迭代步骤才能得出最终答案的复杂问题。它采用混合搜索和Cohere重排序步骤来检索文档,并且还包含一个自我纠正机制,包括幻觉检查过程,以提高响应的可靠性,使其成为企业应用的理想选择。
GitHub仓库在这里。
1、朴素RAG vs. 主动式RAG
出于项目的需要,朴素的RAG方法不足以满足以下原因:
- 无法理解复杂查询:不能将复杂查询分解为多个可管理的小步骤,而是对查询进行单级处理,而不是分析每个步骤并得出统一的结论。
- 缺乏幻觉或错误处理:朴素的RAG管道缺乏响应验证步骤和处理幻觉的机制,因此无法通过生成新的响应来纠正错误。
- 缺乏动态工具使用:朴素的RAG系统不允许根据工作流程条件调用外部API、使用工具或与数据库交互。
因此,实现了一个多代理RAG研究系统来解决所有这些问题。基于代理的框架允许:
- 路由和使用工具:一个路由代理可以分类用户的查询并将流程导向适当的节点或工具。这使得可以根据上下文做出决策,例如确定是否需要对文档进行全面总结,是否需要更多详细信息,或者问题是否超出了范围。
- 规划子步骤:复杂的查询通常需要被分解成更小、更易管理的步骤。从一个查询开始,可以生成一系列步骤以达到结论,同时探索查询的不同方面。例如,如果查询需要比较文档中的两个不同部分,基于代理的方法将允许识别这种比较需求,分别检索两个来源,并将它们合并到最终响应中的比较分析中。
- 反思与错误修正:除了简单的响应生成外,基于代理的方法还可以添加验证步骤来解决潜在的幻觉、错误或未能准确回答用户查询的响应。这还使人类参与自动化的循环成为可能,将人类输入整合到自动化过程中。这样的功能使得基于代理的RAG系统成为企业应用中更强大和可靠的解决方案,其中可靠性是首要考虑因素。
- 共享全局状态:一个AgentWorkflow共享全局状态,简化了跨多个步骤的状态管理。这个共享状态对于保持多代理过程不同阶段之间的一致性至关重要。
2、项目概述

图步骤:
- 分析和路由查询(自适应RAG):用户的查询被分类并路由到适当的节点。然后,系统可以继续执行下一步(“研究计划生成”),请求更多信息,或者如果查询超出范围,则立即响应。
- 研究计划生成:系统生成逐步的研究计划,根据请求的复杂程度,可能是一步或多步。然后返回一个具体的步骤列表,以解决用户的问题。
- 研究子图:对于研究计划生成中定义的每个步骤,都会调用一个子图。具体来说,子图开始通过LLM生成两个查询。接下来,系统使用集成检索器(使用相似度搜索、BM25和MMR)检索与这些生成的查询相关的文档。然后,重排序步骤应用基于Cohere的上下文压缩,最终得到所有步骤中排名前k的相关文档及其相关分数。
- 生成步骤:基于相关文档,工具通过LLM生成答案。
- 幻觉检查(具有人类参与的自我纠正RAG):有一个反思步骤,系统分析生成的答案,以确定其是否由提供的上下文支持,并解决了所有方面。如果检查失败,图工作流将中断,提示用户生成修订后的答案或结束过程。
为了创建向量存储,采用了基于段落的分块方法,使用Docling和LangChain,并且向量数据库使用ChromaDB构建。
3、构建向量数据库
3.1 文档解析
对于结构复杂的PDF,包括布局复杂的表格,选择用于解析的工具至关重要。许多库在处理页面布局复杂或表格结构的PDF时缺乏精度。
为了解决这个问题,使用了开源库Docling。它能够简单高效地解析文档,允许导出到所需的格式。它可以读取并导出多种常用文档格式,包括PDF、DOCX、PPTX、XLSX、图像、HTML、AsciiDoc和Markdown。Docling提供了对PDF文档的全面理解,包括表格结构、阅读顺序和页面布局。此外,它还支持扫描PDF的OCR。

然后将PDF中的文本转换为Markdown格式,这是后续基于段落结构的分块所必需的。
from docling.document_converter import DocumentConverter
logger.info("开始文档处理。")
converter = DocumentConverter()
markdown_document = converter.convert(source).document.export_to_markdown()
提取的文本将具有类似于下图的结构。如图所示,PDF和表格解析已提取保留原始格式的文本。

基于标题和使用MarkdownHeaderTextSplitter
,输出文本随后被拆分成块,结果是一个包含332个Document
对象(LangChain Document)的列表。
from langchain_text_splitters import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "一级标题"),
("##", "二级标题")
]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on)
docs_list = markdown_splitter.split_text(markdown_document)
docs_list
# 输出示例
[Document(metadata={'二级标题': '首席可持续发展官和高级副总裁关于学习和可持续性的信件'}, page_content="...."),
...]
# len(docs_list):
332
3.2 向量存储构建
我们构建了一个向量数据库来存储句子作为向量嵌入并通过该数据库进行搜索。在这种情况下,我们使用Chroma并在本地目录‘db_vector
’中存储持久化数据库。
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embd = OpenAIEmbeddings()
vectorstore_from_documents = Chroma.from_documents(
documents=docs_list,
collection_name="rag-chroma-google-v1",
embedding=embd,
persist_directory='db_vector'
)
4、主图构建
实现的系统包括两个图:
- 一个研究者图作为一个子图,负责生成不同的查询,这些查询将用于从向量数据库中检索和重新排序前k篇文档。
- 主图,包含主要工作流程,如分析用户的查询、生成完成任务所需步骤、生成响应以及通过人类参与的循环机制检查幻觉。
主图结构

LangGraph的一个核心概念是状态。每次图执行都会创建一个状态,该状态在图中的各个节点执行时传递,并且每个状态在图中各节点执行时传递。节点通过其返回值在执行后更新其内部状态。
让我们从构建图状态开始这个项目。为此,我们定义了两个类:
- Router(路由器): 包含将用户查询分类为以下类别之一的结果:“更多详情”、“环境”或“一般”。
- GradeHallucination(分级幻觉): 包含一个二进制分数,指示响应中是否存在幻觉。
from pydantic import BaseModel, Field
class Router(TypedDict):
"""对用户查询进行分类"""
logic: str
type: Literal["more-info", "environmental", "general"]
from pydantic import BaseModel, Field
class GradeHallucinations(BaseModel):
"""生成答案中是否存在幻觉的二进制分数"""
binary_score: str = Field(
description="答案是否基于事实,'1' 或 '0'"
)
定义的图状态如下:
- InputState(输入状态): 包括用户和代理之间交换的消息列表。
- AgentState(代理状态): 包含
Router
对用户查询的分类、研究计划中要执行的步骤列表、代理可以引用的检索到的文档列表以及Gradehallucination
的二进制分数。
from dataclasses import dataclass, field
from typing import Annotated, Literal, TypedDict
from langchain_core.documents import Document
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
from utils.utils import reduce_docs
@dataclass(kw_only=True)
class InputState:
"""表示代理的输入状态。
此类定义了输入状态的结构,包括用户和代理之间的消息交换。它提供了一个受限版本的状态,对外界提供的接口比内部维护的更窄。
"""
messages: Annotated[list[AnyMessage], add_messages]
"""跟踪代理的主要执行状态。
通常会积累一种模式的人工/AI/人工/AI 消息。
返回:
一个新的消息列表,其中 `right` 中的消息合并到 `left` 中。如果 `right` 中的消息与 `left` 中的消息具有相同的ID,则 `right` 中的消息将替换 `left` 中的消息。"""
# 主代理状态
@dataclass(kw_only=True)
class AgentState(InputState):
"""检索图/代理的状态。"""
router: Router = field(default_factory=lambda: Router(type="general", logic=""))
"""路由器对用户查询的分类。"""
steps: list[str] = field(default_factory=list)
"""研究计划中的步骤列表。"""
documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)
"""由检索器填充。这是代理可以引用的文档列表。"""
hallucination: GradeHallucinations = field(default_factory=lambda: GradeHallucinations(binary_score="0"))
4.1 分析并路由查询
函数 analyze_and_route_query
返回并更新 AgentState
状态的 router
变量。函数 route_query
根据之前的查询分类确定下一步。
具体来说,这一步使用包含以下值之一的 Router
对象更新状态:"more-info"
、"environmental"
或 "general"
。根据这些信息,工作流将被路由到适当的节点("create_research_plan"
、"ask_for_more_info"
或 "respond_to_general_query"
)。
async def analyze_and_route_query(
state: AgentState, *, config: RunnableConfig
) -> dict[str, Router]:
"""分析用户的查询并确定适当的路由。
此函数使用语言模型对用户的查询进行分类,并决定如何在对话流程中路由它。
参数:
state (AgentState): 代理的当前状态,包括对话历史记录。
config (RunnableConfig): 用于查询分析的模型配置。
返回:
dict[str, Router]: 包含 'router' 键的字典,键值为分类结果(分类类型和逻辑)。
"""
model = ChatOpenAI(model=GPT_4o, temperature=TEMPERATURE, streaming=True)
messages = [
{"role": "system", "content": ROUTER_SYSTEM_PROMPT}
] + state.messages
logging.info("---ANALYZE AND ROUTE QUERY---")
response = cast(
Router, await model.with_structured_output(Router).ainvoke(messages)
)
return {"router": response}
def route_query(
state: AgentState,
) -> Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:
"""根据查询分类确定下一步。
参数:
state (AgentState): 代理的当前状态,包括路由器的分类。
返回:
Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]: 要采取的下一步。
引发:
ValueError: 如果遇到未知的路由器类型。
"""
_type = state.router["type"]
if _type == "environmental":
return "create_research_plan"
elif _type == "more-info":
return "ask_for_more_info"
elif _type == "general":
return "respond_to_general_query"
else:
raise ValueError(f"未知路由器类型 {_type}")
对于问题 “检索都柏林2019年的数据中心PUE效率值” 的输出示例:
{
"logic":"这是一个关于2019年都柏林数据中心环境效率的具体问题,与环境报告相关。",
"type":"environmental"
}
超出范围/需要更多信息
然后我们定义了函数 ask_for_more_info
和 respond_to_general_query
,它们直接通过调用LLM生成对用户的响应:第一个将在路由器确定需要从用户获取更多信息时执行,而第二个则生成与我们的主题无关的一般查询的响应。在这种情况下,需要将生成的响应连接到消息列表中,更新状态中的 messages
变量。
async def ask_for_more_info(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""生成请求用户提供更多信息的响应。
当路由器确定需要从用户获取更多信息时,调用此节点。
参数:
state (AgentState): 代理的当前状态,包括对话历史记录和路由器逻辑。
config (RunnableConfig): 用于响应的模型配置。
返回:
dict[str, list[str]]: 包含生成响应的字典,键名为 'messages'。
"""
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
system_prompt = MORE_INFO_SYSTEM_PROMPT.format(
logic=state.router["logic"]
)
messages = [{"role": "system", "content": system_prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
async def respond_to_general_query(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""生成与环境无关的一般查询的响应。
当路由器将查询分类为一般问题时,调用此节点。
参数:
state (AgentState): 代理的当前状态,包括对话历史记录和路由器逻辑。
config (RunnableConfig): 用于响应的模型配置。
返回:
dict[str, list[str]]: 包含生成响应的字典,键名为 'messages'。
"""
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
system_prompt = GENERAL_SYSTEM_PROMPT.format(
logic=state.router["logic"]
)
logging.info("---RESPONSE GENERATION---")
messages = [{"role": "system", "content": system_prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
对于问题 “阿尔塔穆拉的天气怎么样?” 的输出示例:
{
"logic":"阿尔塔穆拉的天气怎么样?",
"type":"general"
}
# ---RESPONSE GENERATION---
"感谢您的提问,但我无法提供有关天气的信息。我的重点是环境报告。如果您有任何与此主题相关的问题,请告诉我,我将很乐意帮助!"
4.2 创建研究计划
如果查询分类返回值为 "environmental"
,用户的请求与文档相关联,工作流将到达 create_research_plan
节点,该函数为回答与环境相关的查询创建逐步研究计划。
async def create_research_plan(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:
"""为回答与环境相关的查询创建逐步研究计划。
参数:
state (AgentState): 代理的当前状态,包括对话历史记录。
config (RunnableConfig): 用于生成计划的模型配置。
返回:
dict[str, list[str]]: 包含研究步骤列表的字典,键名为 'steps'。
"""
class Plan(TypedDict):
"""生成研究计划。"""
steps: list[str]
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
messages = [
{"role": "system", "content": RESEARCH_PLAN_系统提示
] + state.messages
logging.info("---PLAN GENERATION---")
response = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))
return {"steps": response["steps"], "documents": "delete"}
输出示例以回答问题“检索都柏林2019年的数据中心PUE效率值”:
{
"steps":
["查找2019年在都柏林的数据中心的PUE(能源使用效率)效率值。"
]
}
在这个例子中,用户的请求只需要一步就可以获取信息。
4.3 进行研究
此函数将研究计划的第一步作为参数,并使用它来进行研究。在这个过程中,该函数调用名为researcher_graph
的子图,该子图返回我们要在下一节中探索的文档列表。最后,我们通过移除刚刚执行的步骤来更新状态中的steps
变量。
async def conduct_research(state: AgentState) -> dict[str, Any]:
"""执行研究计划的第一步。
此函数将研究计划的第一步作为参数,并使用它来进行研究。
参数:
state (AgentState): 代理的当前状态,包括研究计划的步骤。
返回:
dict[str, list[str]]: 包含'research_results'和'steps'键的字典,分别包含检索到的文档和剩余的研究步骤。
行为:
- 使用研究计划的第一步调用researcher_graph。
- 更新状态,添加检索到的文档并移除已完成的步骤。
"""
result = await researcher_graph.ainvoke({"question": state.steps[0]}) #直接调用子图
docs = result["documents"]
step = state.steps[0]
logging.info(f"\n共检索到{len(docs)}份文档用于执行步骤:{step}.")
return {"documents": result["documents"], "steps": state.steps[1:]}
4.4 构建研究员子图

如上图所示,该图由一个查询生成步骤组成,该步骤从主图传递的步骤开始,以及一个检索相关片段的步骤。与主图一样,让我们继续定义子图中使用的状态QueryState
(retrieve_documents
节点的私有状态)和ResearcherState
(研究员图的状态)。
"""研究员子图的状态。
此模块定义了研究员子图中使用的状态结构。
"""
from dataclasses import dataclass, field
from typing import Annotated
from langchain_core.documents import Document
from utils.utils import reduce_docs
@dataclass(kw_only=True)
class QueryState:
"""研究员图中retrieve_documents节点的私有状态。"""
query: str
@dataclass(kw_only=True)
class ResearcherState:
"""研究员图/代理的状态。"""
question: str
"""研究计划中的一个步骤,由检索代理生成。"""
queries: list[str] = field(default_factory=list)
"""研究员根据问题生成的一系列搜索查询。"""
documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)
"""由检索器填充。这是代理可以引用的文档列表。"""
生成查询
此步骤基于问题(研究计划中的一个步骤)生成搜索查询。此函数使用语言模型生成多样化的搜索查询,以帮助回答问题。
async def generate_queries(
state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:
"""基于问题(研究计划中的一个步骤)生成搜索查询。
此函数使用语言模型生成多样化的搜索查询,以帮助回答问题。
参数:
state (ResearcherState): 研究员的当前状态,包括用户的问题。
config (RunnableConfig): 用于生成查询的模型配置。
返回:
dict[str, list[str]]: 包含'queries'键的字典,其中包含生成的搜索查询列表。
"""
class Response(TypedDict):
queries: list[str]
logger.info("---GENERATE QUERIES---")
model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0)
messages = [
{"role": "system", "content": GENERATE_QUERIES_SYSTEM_PROMPT},
{"role": "human", "content": state.question},
]
response = cast(Response, await model.with_structured_output(Response).ainvoke(messages))
queries = response["queries"]
queries.append(state.question)
logger.info(f"查询:{queries}")
return {"queries": response["queries"]}
输出示例以回答问题“检索都柏林2019年的数据中心PUE效率值”:
{
"queries":[
"查找2019年在都柏林的数据中心的PUE(能源使用效率)效率值。",
"都柏林2019年数据中心PUE效率值",
"都柏林2019年数据中心能源使用效率统计"
]
}
一旦生成了查询,我们可以使用之前定义的持久化数据库设置向量存储。
def _setup_vectorstore() -> Chroma:
"""
设置并返回Chroma向量存储实例。
"""
embeddings = OpenAIEmbeddings()
return Chroma(
collection_name=VECTORSTORE_COLLECTION,
embedding_function=embeddings,
persist_directory=VECTORSTORE_DIRECTORY
)
在RAG系统中,最重要的部分是文档检索过程。为此,给予了显著的关注,具体来说,选择了一种集成检索器作为混合搜索和Cohere用于重排序。
混合搜索是“关键词风格”搜索和“向量风格”搜索的结合。它具有执行关键词搜索的优势,同时也具有从嵌入和向量搜索中获得的语义搜索的优势。集成检索器是一种旨在通过组合多个单独检索器的优点来增强信息检索性能的检索算法。这种方法称为“集成检索”,使用一种称为“逆秩融合”的方法对不同检索器的结果进行重新排名和合并,从而提供比任何单一检索器更准确和相关的结果。
# 创建基本检索器
retriever_bm25 = BM25Retriever.from_documents(documents, search_kwargs={"k": TOP_K})
retriever_vanilla = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K})
retriever_mmr = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": TOP_K})
ensemble_retriever = EnsembleRetriever(
retrievers=[retriever_vanilla, retriever_mmr, retriever_bm25],
weights=ENSEMBLE_WEIGHTS,
)
重排序是一种可以用来提高RAG管道性能的技术。这是一种非常强大的方法,可以显著提升搜索系统的性能。简而言之,重排序接受一个查询和一个响应,并输出它们之间的相关性分数。这样,可以使用任何搜索系统来检索可能包含查询答案的一组文档,然后使用重排序端点对这些文档进行排序。
但是:为什么我们需要重排序步骤?
为了应对准确性方面的挑战,采用了两阶段检索作为提高搜索质量的一种手段。在这些两阶段系统中,第一阶段模型(集成检索器)从较大的数据集中检索一组候选文档。然后,第二阶段模型(重排序器)用于对第一阶段模型检索的文档进行重排序。此外,重排序模型,例如Cohere重排序模型,是一种在给定查询和文档对时会输出相似性分数的模型。这个分数可以用来重新排列最相关的文档。在重排序方法中,Cohere重排序模型因其能够显著提高搜索准确性而脱颖而出。该模型不同于传统的嵌入模型,它采用深度学习直接评估每个文档与查询之间的匹配度。Cohere重排序模型通过对查询和文档进行联合处理来输出相关性分数,这导致了一个更加细致的文档选择过程。 (完整参考链接)*
在这种情况下,检索到的文档被重排序,并返回前两个最相关的文档。
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_cohere import CohereRerank
from langchain_community.llms import Cohere
# 设置Cohere重排序
compressor = CohereRerank(top_n=2, model="rerank-english-v3.0")
# 构建压缩检索器
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=ensemble_retriever,
)
compression_retriever.invoke(
"检索都柏林2019年的数据中心PUE效率值"
)
输出示例以回答问题“检索都柏林2019年的数据中心PUE效率值”:
[Document(metadata={'Header 2': 'Endnotes', 'relevance_score': 0.27009502}, page_content="- 1 这个计算基于..."),
Document(metadata={'Header 2': '数据中心电网区域CFE', 'relevance_score': 0.20593424}, page_content="2023 \n| 国家...")]
检索和重排序文档函数
async def retrieve_and_rerank_documents(
state: QueryState, *, config: RunnableConfig
) -> dict[str, list[Document]]:
"""Retrieve documents based on a given query.
This function uses a retriever to fetch relevant documents for a given query.
Args:
state (QueryState): The current state containing the query string.
config (RunnableConfig): Configuration with the retriever used to fetch documents.
Returns:
dict[str, list[Document]]: A dictionary with a 'documents' key containing the list of retrieved documents.
"""
logger.info("---RETRIEVING DOCUMENTS---")
logger.info(f"Query for the retrieval process: {state.query}")
response = compression_retriever.invoke(state.query)
return {"documents": response}
构建子图
builder = StateGraph(ResearcherState)
builder.add_node(generate_queries)
builder.add_node(retrieve_and_rerank_documents)
builder.add_edge(START, "generate_queries")
builder.add_conditional_edges(
"generate_queries",
retrieve_in_parallel, # type: ignore
path_map=["retrieve_and_rerank_documents"],
)
builder.add_edge("retrieve_and_rerank_documents", END)
researcher_graph = builder.compile()
4.5 检查结束
使用 conditional_edge,我们构建一个循环,其结束条件由 check_finished 返回的值决定。此函数检查 create_research_plan 节点创建的步骤列表中是否没有其他步骤需要处理。完成所有步骤后,流程将继续进行到 respond 节点。
def check_finished(state: AgentState) -> Literal["respond", "conduct_research"]:
"""Determine if the research process is complete or if more research is needed.
This function checks if there are any remaining steps in the research plan:
- If there are, route back to the `conduct_research` node
- Otherwise, route to the `respond` node
Args:
state (AgentState): The current state of the agent, including the remaining research steps.
Returns:
Literal["respond", "conduct_research"]: The next step to take based on whether research is complete.
"""
if len(state.steps or []) > 0:
return "conduct_research"
else:
return "respond"
4.6 响应
根据进行的研究生成对用户查询的最终响应。此功能使用对话历史记录和研究代理检索到的文档制定综合答案。
async def respond(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""Generate a final response to the user's query based on the conducted research.
This function formulates a comprehensive answer using the conversation history and the documents retrieved by the researcher.
Args:
state (AgentState): The current state of the agent, including retrieved documents and conversation history.
config (RunnableConfig): Configuration with the model used to respond.
Returns:
dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response.
"""
print("--- RESPONSE GENERATION STEP ---")
model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0)
context = format_docs(state.documents)
prompt = RESPONSE_SYSTEM_PROMPT.format(context=context)
messages = [{"role": "system", "content": prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
4.7 检查幻觉
此步骤检查上一步中 LLM 生成的响应是否由基于检索到的文档的事实集支持,并给出二进制分数。
async def check_hallucinations(
state: AgentState, *, config: RunnableConfig
) -> dict[str, Any]:
"""Analyze the user's query and checks if the response is supported by the set of facts based on the document retrieved,
providing a binary score result.
This function uses a language model to analyze the user's query and gives a binary score result.
Args:
state (AgentState): The current state of the agent, including conversation history.
config (RunnableConfig): Configuration with the model used for query analysis.
Returns:
dict[str, Router]: A dictionary containing the 'router' key with the classification result (classification type and logic).
"""
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
system_prompt = CHECK_HALLUCINATIONS.format(
documents=state.documents,
generation=state.messages[-1]
)
messages = [
{"role": "system", "content": system_prompt}
] + state.messages
logging.info("---CHECK HALLUCINATIONS---")
response = cast(GradeHallucinations, await model.with_structured_output(GradeHallucinations).ainvoke(messages))
return {"hallucination": response}
4.8 人工批准(人机交互)
如果 LLM 的响应不受事实集的支持,则很可能包含幻觉。在这种情况下,图表被中断,用户可以控制下一步:仅重试最后一个生成步骤而不重新启动整个工作流或结束流程。此人机交互步骤可确保用户控制,同时避免意外循环或不良操作。
LangGraph 中的中断函数通过在特定节点暂停图表、向人类呈现信息以及使用他们的输入恢复图表来启用人机交互工作流。此功能对于批准、编辑或收集其他输入等任务非常有用。中断函数与 Command 对象结合使用,以使用人类提供的值恢复图表。
def human_approval(
state: AgentState,
):
_binary_score = state.hallucination.binary_score
if _binary_score == "1":
return "END"
else:
retry_generation = interrupt(
{
"question": "Is this correct?",
"llm_output": state.messages[-1]
})
if retry_generation == "y":
print("voglio continuare")
return "respond"
else:
return "END"
async def retrieve_and_rerank_documents(
state: QueryState, *, config: RunnableConfig
) -> dict[str, list[Document]]:
"""检索并重排序文档。
参数:
state (QueryState): 当前查询状态。
config (RunnableConfig): 配置项。
返回:
dict[str, list[Document]]: 包含'documents'键的字典,其中包含检索并重排序后的文档列表。
"""
# 使用向量存储检索文档
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K})
docs = retriever.invoke(state.query)
# 重排序文档
compressor = CohereRerank(top_n=2, model="rerank-english-v3.0")
compressed_docs = compressor.invoke(docs)
return {"documents": compressed_docs}
``````markdown
eConfig
) -> dict[str, list[Document]]:
"""根据给定的查询检索文档。
此函数使用检索器为给定的查询获取相关文档。
参数:
state (QueryState): 包含查询字符串的当前状态。
config (RunnableConfig): 配置使用的检索器以获取文档。
返回:
dict[str, list[Document]]: 一个字典,其中包含检索到的文档列表的 'documents' 键。
"""
logger.info("---RETRIEVING DOCUMENTS---")
logger.info(f"检索过程中的查询:{state.query}")
response = compression_retriever.invoke(state.query)
return {"documents": response}
5、结果
在以下测试中,使用了谷歌关于环境可持续性的年度报告,该报告可自由获取 此处。
5.1 实时测试
作为第一个测试,执行了以下查询以从不同表格中提取不同的值,结合多步骤方法的能力并利用Docling库的解析功能。
复杂问题:“检索新加坡第2个设施在 2019年 和 2022年 的数据中心PUE效率值。同时检索亚太地区2023年的区域平均CFE。”


完整结果是正确的,并且幻觉检查已成功通过。
聊天机器人生成的步骤:
- “查找新加坡第2个设施在2019年和2022年的PUE效率值。”,
- “查找亚太地区在2023年的区域平均CFE。”
生成的文本:*“- 新加坡第2个设施在2019年的电力使用效率(PUE)不可用,因为当年的数据未提供。但是,2022年的PUE为1.21。
- 2023年亚太地区的区域平均无碳能源(CFE)为12%。”*
完整输出:
Enter your query (type '-q' to quit):
> Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023
2025-01-10 20:39:53,381 - INFO - ---ANALYZE AND ROUTE QUERY---
2025-01-10 20:39:53,381 - INFO - MESSAGES: [HumanMessage(content='Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023 ', additional_kwargs={}, response_metadata={}, id='351a00e9-ecda-49e2-b069-19196348a82a')]
{"logic":"Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023","type":"environmental"}2025-01-10 20:39:55,586 - INFO - ---PLAN GENERATION---
{"steps":["Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.","Find the regional average CFE for the Asia Pacific region in 2023."]}2025-01-10 20:39:57,323 - INFO - ---GENERATE QUERIES---
{"queries":["PUE efficiency values Singapore 2nd facility 2019","PUE efficiency values Singapore 2nd facility 2022"]}2025-01-10 20:39:58,285 - INFO - Queries: ['PUE efficiency values Singapore 2nd facility 2019', 'PUE efficiency values Singapore 2nd facility 2022', 'Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.']
2025-01-10 20:39:58,288 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:39:58,288 - INFO - Query for the retrieval process: PUE efficiency values Singapore 2nd facility 2019
2025-01-10 20:39:59,568 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:39:59,568 - INFO - Query for the retrieval process: PUE efficiency values Singapore 2nd facility 2022
2025-01-10 20:40:00,891 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:00,891 - INFO - Query for the retrieval process: Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.
2025-01-10 20:40:01,820 - INFO -
4 documents retrieved in total for the step: Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022..
2025-01-10 20:40:01,825 - INFO - ---GENERATE QUERIES---
{"queries":["Asia Pacific regional average CFE 2023","CFE statistics Asia Pacific 2023"]}2025-01-10 20:40:02,778 - INFO - Queries: ['Asia Pacific regional average CFE 2023', 'CFE statistics Asia Pacific 2023', 'Find the regional average CFE for the Asia Pacific region in 2023.']
2025-01-10 20:40:02,780 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:02,780 - INFO - Query for the retrieval process: Asia Pacific regional average CFE 2023
2025-01-10 20:40:03,757 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:03,757 - INFO - Query for the retrieval process: CFE statistics Asia Pacific 2023
2025-01-10 20:40:04,885 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:04,885 - INFO - Query for the retrieval process: Find the regional average CFE for the Asia Pacific region in 2023.
2025-01-10 20:40:06,526 - INFO -
4 documents retrieved in total for the step: Find the regional average CFE for the Asia Pacific region in 2023..
2025-01-10 20:40:06,530 - INFO - --- RESPONSE GENERATION STEP ---
- The Power Usage Effectiveness (PUE) for the Singapore 2nd facility in 2019 is not available, as the data for that year is not provided. However, the PUE for 2022 is 1.21 [e048d08a-4ef6-77b5-20d3-352dcec590b7].
- The regional average Carbon-Free Energy (CFE) in the Asia Pacific for 2023 is 12% [9c489d2f-f16f-572b-abed-ee1d5d0ed379].2025-01-10 20:40:14,918 - INFO - ---CHECK HALLUCINATIONS---
{"binary_score":"1"}>
现在让我们在ChatGPT上试一下。将PDF文件上传到Web应用程序后,进行了相同的查询。
如图所示,ChatGPT返回的值是不正确的,并且模型表现出幻觉。在这种情况下,幻觉检查步骤将允许响应重新生成(自我反思RAG)。

6、结束语
尽管性能有所提高,但实现代理式RAG并非没有挑战:
- 延迟:代理交互的增加复杂性通常会导致更长的响应时间。在速度和准确性之间取得平衡是一个关键挑战。
- 评估和可观测性:随着代理式RAG系统变得更加复杂,持续评估和可观测性变得必要。
总之,代理式RAG标志着AI领域的重大突破。通过将大型语言模型的能力与自主推理和信息检索相结合,代理式RAG引入了一种新的智能和灵活性标准。随着AI的不断发展,代理式RAG将在各个行业中发挥基础性作用,改变我们使用技术的方式。
原文链接:Building RAG Research Multi-Agent with LangGraph
汇智网翻译整理,转载请标明出处
