LangGraph Agentic RAG 实战教程
使用 LangGraph 构建智能检索增强生成系统的完整指南
概述
本章将通过实际代码示例,展示如何使用 LangGraph 构建一个完整的 Agentic RAG 系统。与传统 RAG 的线性流程不同,Agentic RAG 引入了智能决策、自我纠正和动态路由能力。
你将学到
- LangGraph 的核心概念:State、Node、Edge
- 如何构建带有决策能力的 RAG 工作流
- 实现文档评分和查询重写机制
- 集成外部搜索作为后备方案
前置知识
在开始之前,请确保你已经了解:
- Python 基础编程
- RAG 的基本概念(参见 13.2 章节)
- LLM API 的基本使用
环境准备
bash
# 安装必要依赖
pip install langgraph langchain langchain-openai langchain-community
pip install chromadb tiktoken beautifulsoup4python
# 设置环境变量
import os
os.environ["OPENAI_API_KEY"] = "your-api-key"一、LangGraph 核心概念
1.1 什么是 LangGraph?
LangGraph 是 LangChain 团队开发的图结构智能体编排框架,它的核心特点是:
- 状态图(StateGraph):使用图结构定义工作流
- 节点(Node):执行具体任务的函数
- 边(Edge):定义节点间的流转逻辑
- 条件边(Conditional Edge):支持动态路由
1.2 与传统 LangChain 的区别
| 特性 | LangChain | LangGraph |
|---|---|---|
| 执行流程 | 线性链式 | 图结构,支持循环 |
| 状态管理 | 隐式传递 | 显式状态对象 |
| 控制流 | 固定顺序 | 条件分支、循环 |
| 适用场景 | 简单任务 | 复杂智能体 |
1.3 核心组件
python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
# 1. 定义状态结构
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 对话历史
documents: list # 检索到的文档
question: str # 用户问题
generation: str # 生成的答案
# 2. 创建图
graph_builder = StateGraph(AgentState)
# 3. 添加节点
graph_builder.add_node("retrieve", retrieve_documents)
graph_builder.add_node("grade", grade_documents)
graph_builder.add_node("generate", generate_answer)
# 4. 添加边
graph_builder.add_edge(START, "retrieve")
graph_builder.add_conditional_edges("grade", route_decision)
graph_builder.add_edge("generate", END)
# 5. 编译图
graph = graph_builder.compile()二、构建 Agentic RAG 系统
2.1 系统架构
用户查询
│
↓
┌─────────────────────────────────────┐
│ 检索节点(Retrieve) │
│ 从向量数据库检索相关文档 │
└─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 评分节点(Grade) │
│ 评估文档与查询的相关性 │
└─────────────────────────────────────┘
│
├─→ 相关 → 生成答案
│
└─→ 不相关 → 重写查询 → 网络搜索 → 生成答案2.2 完整实现代码
步骤 1:初始化组件
python
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 加载并处理文档
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
]
docs = []
for url in urls:
loader = WebBaseLoader(url)
docs.extend(loader.load())
# 分割文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
doc_splits = text_splitter.split_documents(docs)
# 创建向量存储
vectorstore = Chroma.from_documents(
documents=doc_splits,
embedding=OpenAIEmbeddings(),
collection_name="agentic-rag-demo"
)
# 创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})步骤 2:定义状态和节点
python
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage
# 定义状态
class GraphState(TypedDict):
question: str
generation: str
documents: list
web_search_needed: bool
# 节点 1:检索文档
def retrieve(state: GraphState) -> GraphState:
"""从向量数据库检索相关文档"""
print("---RETRIEVE---")
question = state["question"]
# 执行检索
documents = retriever.invoke(question)
return {
**state,
"documents": documents
}
# 节点 2:评估文档相关性
def grade_documents(state: GraphState) -> GraphState:
"""评估检索到的文档是否与问题相关"""
print("---GRADE DOCUMENTS---")
question = state["question"]
documents = state["documents"]
# 定义评分提示
grade_prompt = """你是一个文档相关性评估专家。
给定一个用户问题和一个文档,判断该文档是否包含回答问题所需的相关信息。
问题:{question}
文档:{document}
请只回答 "yes" 或 "no"。"""
filtered_docs = []
web_search_needed = False
for doc in documents:
response = llm.invoke(
grade_prompt.format(
question=question,
document=doc.page_content
)
)
grade = response.content.strip().lower()
if grade == "yes":
filtered_docs.append(doc)
else:
print(f" - 文档不相关,已过滤")
# 如果相关文档不足,需要网络搜索
if len(filtered_docs) < 2:
web_search_needed = True
print(" - 相关文档不足,将进行网络搜索")
return {
**state,
"documents": filtered_docs,
"web_search_needed": web_search_needed
}
# 节点 3:重写查询
def rewrite_query(state: GraphState) -> GraphState:
"""优化查询以获得更好的搜索结果"""
print("---REWRITE QUERY---")
question = state["question"]
rewrite_prompt = """你是一个查询优化专家。
给定原始问题,重写它以获得更好的搜索结果。
保持问题的核心意图,但使其更具体、更易于搜索。
原始问题:{question}
优化后的问题:"""
response = llm.invoke(rewrite_prompt.format(question=question))
rewritten = response.content.strip()
print(f" 原始查询: {question}")
print(f" 重写查询: {rewritten}")
return {
**state,
"question": rewritten
}
# 节点 4:网络搜索
def web_search(state: GraphState) -> GraphState:
"""使用网络搜索补充信息"""
print("---WEB SEARCH---")
question = state["question"]
documents = state.get("documents", [])
# 这里可以集成 Tavily、Brave Search 等
# 示例使用模拟搜索
from langchain_community.tools import DuckDuckGoSearchResults
search = DuckDuckGoSearchResults()
search_results = search.invoke(question)
# 将搜索结果转换为文档格式
from langchain_core.documents import Document
web_doc = Document(
page_content=search_results,
metadata={"source": "web_search"}
)
documents.append(web_doc)
return {
**state,
"documents": documents
}
# 节点 5:生成答案
def generate(state: GraphState) -> GraphState:
"""基于检索到的文档生成答案"""
print("---GENERATE---")
question = state["question"]
documents = state["documents"]
# 构建上下文
context = "\n\n".join([doc.page_content for doc in documents])
generate_prompt = """你是一个知识助手。基于以下上下文回答用户的问题。
上下文:
{context}
问题:{question}
请提供准确、有帮助的回答。如果上下文中没有相关信息,请诚实地说明。"""
response = llm.invoke(
generate_prompt.format(
context=context,
question=question
)
)
return {
**state,
"generation": response.content
}步骤 3:定义路由逻辑
python
def decide_to_search(state: GraphState) -> Literal["rewrite", "generate"]:
"""决定是否需要网络搜索"""
if state.get("web_search_needed", False):
return "rewrite"
return "generate"步骤 4:构建图
python
from langgraph.graph import StateGraph, START, END
# 创建图
workflow = StateGraph(GraphState)
# 添加节点
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("rewrite", rewrite_query)
workflow.add_node("web_search", web_search)
workflow.add_node("generate", generate)
# 定义流程
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_search,
{
"rewrite": "rewrite",
"generate": "generate"
}
)
workflow.add_edge("rewrite", "web_search")
workflow.add_edge("web_search", "generate")
workflow.add_edge("generate", END)
# 编译图
app = workflow.compile()步骤 5:可视化工作流
python
# 生成图的可视化
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))步骤 6:运行系统
python
# 测试查询
inputs = {
"question": "什么是 ReAct 模式?它如何在 AI Agent 中应用?"
}
# 执行工作流
result = app.invoke(inputs)
print("\n" + "="*50)
print("最终答案:")
print(result["generation"])三、进阶功能:自反射 RAG
3.1 CRAG(纠正式 RAG)
CRAG 在标准 RAG 基础上增加了文档质量评估和纠正机制:
python
def crag_grade(state: GraphState) -> GraphState:
"""CRAG 风格的文档评估"""
question = state["question"]
documents = state["documents"]
crag_prompt = """评估以下文档对回答问题的价值。
问题:{question}
文档:{document}
请评分(1-5)并解释:
- 5分:完全相关,可直接回答问题
- 4分:高度相关,包含关键信息
- 3分:部分相关,需要补充
- 2分:略有关联,信息有限
- 1分:不相关
格式:
分数: X
理由: ...
"""
graded_docs = []
for doc in documents:
response = llm.invoke(
crag_prompt.format(
question=question,
document=doc.page_content
)
)
# 解析分数
content = response.content
score = int(content.split("分数:")[1].split("\n")[0].strip())
if score >= 3:
graded_docs.append({
"document": doc,
"score": score,
"action": "use" if score >= 4 else "supplement"
})
return {
**state,
"graded_documents": graded_docs
}3.2 Self-RAG(自反射 RAG)
Self-RAG 使用特殊令牌来控制检索和生成流程:
python
class SelfRAGState(TypedDict):
question: str
documents: list
generation: str
retrieve_decision: str # "yes" or "no"
relevance_scores: list # ISREL
support_scores: list # ISSUP
utility_score: float # ISUSE
def self_rag_decide_retrieve(state: SelfRAGState) -> SelfRAGState:
"""决定是否需要检索"""
question = state["question"]
decide_prompt = """判断以下问题是否需要检索外部信息才能准确回答。
问题:{question}
对于事实性问题、最新信息、专业知识,回答 "yes"。
对于观点性问题、常识性问题、简单计算,回答 "no"。
只回答 "yes" 或 "no"。"""
response = llm.invoke(decide_prompt.format(question=question))
decision = response.content.strip().lower()
return {
**state,
"retrieve_decision": decision
}
def self_rag_critique(state: SelfRAGState) -> SelfRAGState:
"""自我批评生成的答案"""
question = state["question"]
documents = state["documents"]
generation = state["generation"]
critique_prompt = """评估生成的答案质量:
问题:{question}
检索文档:{documents}
生成答案:{generation}
请评估:
1. 支持度(答案是否被文档支持):1-5分
2. 有用性(答案是否有帮助):1-5分
3. 需要改进的地方
格式:
支持度: X
有用性: X
改进建议: ...
"""
response = llm.invoke(
critique_prompt.format(
question=question,
documents=str([d.page_content[:200] for d in documents]),
generation=generation
)
)
# 解析评分
content = response.content
support = int(content.split("支持度:")[1].split("\n")[0].strip())
utility = int(content.split("有用性:")[1].split("\n")[0].strip())
return {
**state,
"support_scores": [support],
"utility_score": utility
}四、性能优化技巧
4.1 并行检索
python
import asyncio
from langchain_core.runnables import RunnableParallel
async def parallel_retrieve(state: GraphState) -> GraphState:
"""并行从多个数据源检索"""
question = state["question"]
# 定义并行任务
retrieval_tasks = RunnableParallel(
vector_docs=retriever,
keyword_docs=keyword_retriever, # BM25 等
)
results = await retrieval_tasks.ainvoke(question)
# 合并结果
all_docs = results["vector_docs"] + results["keyword_docs"]
# 去重
seen = set()
unique_docs = []
for doc in all_docs:
if doc.page_content not in seen:
seen.add(doc.page_content)
unique_docs.append(doc)
return {
**state,
"documents": unique_docs
}4.2 缓存机制
python
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def cached_retrieve(question_hash: str):
"""缓存检索结果"""
# 实际检索逻辑
pass
def retrieve_with_cache(state: GraphState) -> GraphState:
question = state["question"]
question_hash = hashlib.md5(question.encode()).hexdigest()
documents = cached_retrieve(question_hash)
return {
**state,
"documents": documents
}4.3 流式输出
python
async def stream_generate(state: GraphState):
"""流式生成答案"""
question = state["question"]
documents = state["documents"]
context = "\n\n".join([doc.page_content for doc in documents])
prompt = f"基于以下上下文回答问题:\n{context}\n\n问题:{question}"
async for chunk in llm.astream(prompt):
yield chunk.content五、调试与监控
5.1 使用 LangSmith 追踪
python
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "agentic-rag-demo"
# 所有 LangGraph 调用将自动被追踪5.2 添加日志节点
python
def log_state(state: GraphState, node_name: str) -> None:
"""记录状态变化"""
print(f"\n[{node_name}] 状态快照:")
print(f" 问题: {state.get('question', 'N/A')[:50]}...")
print(f" 文档数: {len(state.get('documents', []))}")
print(f" 需要搜索: {state.get('web_search_needed', False)}")
if state.get('generation'):
print(f" 答案长度: {len(state['generation'])} 字符")六、完整示例:多数据源 Agentic RAG
python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal
class MultiSourceState(TypedDict):
question: str
source_type: str # "technical", "general", "recent"
documents: list
generation: str
def classify_question(state: MultiSourceState) -> MultiSourceState:
"""分类问题类型"""
question = state["question"]
classify_prompt = """将以下问题分类:
问题:{question}
类别:
- technical:技术文档、API、代码相关
- general:一般知识、概念解释
- recent:需要最新信息、新闻事件
只回答类别名称。"""
response = llm.invoke(classify_prompt.format(question=question))
source_type = response.content.strip().lower()
return {
**state,
"source_type": source_type
}
def route_by_type(state: MultiSourceState) -> Literal["tech_retrieve", "general_retrieve", "web_search"]:
"""根据问题类型路由"""
source_type = state["source_type"]
if source_type == "technical":
return "tech_retrieve"
elif source_type == "general":
return "general_retrieve"
else:
return "web_search"
# 构建多源检索图
multi_source_graph = StateGraph(MultiSourceState)
multi_source_graph.add_node("classify", classify_question)
multi_source_graph.add_node("tech_retrieve", retrieve_from_tech_db)
multi_source_graph.add_node("general_retrieve", retrieve_from_general_db)
multi_source_graph.add_node("web_search", web_search)
multi_source_graph.add_node("generate", generate)
multi_source_graph.add_edge(START, "classify")
multi_source_graph.add_conditional_edges(
"classify",
route_by_type,
{
"tech_retrieve": "tech_retrieve",
"general_retrieve": "general_retrieve",
"web_search": "web_search"
}
)
multi_source_graph.add_edge("tech_retrieve", "generate")
multi_source_graph.add_edge("general_retrieve", "generate")
multi_source_graph.add_edge("web_search", "generate")
multi_source_graph.add_edge("generate", END)
app = multi_source_graph.compile()思考题
- 如何设计评分阈值以平衡召回率和准确率?
- 在什么情况下应该选择 CRAG 而不是 Self-RAG?
- 如何处理 LangGraph 中的超时和重试逻辑?