Qdrant + LangGraph Agentic RAG 实战
使用 Qdrant 向量数据库和 LangGraph 构建生产级 Agentic RAG 系统
概述
本章将展示如何结合 Qdrant(高性能向量数据库)和 LangGraph 构建一个完整的 Agentic RAG 系统。Qdrant 提供了企业级的向量搜索能力,而 LangGraph 提供了智能体编排能力。
为什么选择 Qdrant?
| 特性 | 说明 |
|---|---|
| 高性能 | Rust 编写,支持十亿级向量毫秒级检索 |
| 丰富过滤 | 支持 payload 过滤,实现精细化检索 |
| 云端部署 | 提供托管云服务,简化运维 |
| 开源免费 | 核心功能开源,可自托管 |
前置准备
环境配置
bash
# 安装依赖
pip install qdrant-client langchain-qdrant langchain-openai langgraph
pip install beautifulsoup4 tiktokenQdrant 部署选项
选项 1:本地 Docker
bash
docker pull qdrant/qdrant
docker run -p 6333:6333 -p 6334:6334 qdrant/qdrant选项 2:Qdrant Cloud
- 访问 cloud.qdrant.io 注册账号
- 创建免费 Cluster
- 获取 API Key 和 Cluster URL
python
# 配置连接
QDRANT_URL = "https://xxx-xxx.us-east-1-0.aws.cloud.qdrant.io:6333"
QDRANT_API_KEY = "your-api-key"一、基础设置
1.1 初始化客户端和模型
python
import os
from qdrant_client import QdrantClient
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
# 环境变量
os.environ["OPENAI_API_KEY"] = "your-openai-key"
# 初始化 Qdrant 客户端
qdrant_client = QdrantClient(
url=QDRANT_URL,
api_key=QDRANT_API_KEY,
)
# 或本地连接
# qdrant_client = QdrantClient(host="localhost", port=6333)
# 初始化 LLM 和 Embeddings
llm = ChatOpenAI(model="gpt-4o", temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")1.2 创建向量集合
python
from qdrant_client.models import Distance, VectorParams
# 创建集合(如果不存在)
collection_name = "agentic_rag_docs"
# 检查集合是否存在
collections = qdrant_client.get_collections().collections
collection_names = [c.name for c in collections]
if collection_name not in collection_names:
qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=1536, # text-embedding-3-small 的维度
distance=Distance.COSINE
)
)
print(f"Created collection: {collection_name}")二、文档索引
2.1 加载和处理文档
python
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 加载文档
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = []
for url in urls:
loader = WebBaseLoader(url)
loaded = loader.load()
# 添加元数据
for doc in loaded:
doc.metadata["source_url"] = url
doc.metadata["doc_type"] = "blog"
docs.extend(loaded)
print(f"Loaded {len(docs)} documents")
# 分割文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", "。", ".", " ", ""]
)
doc_splits = text_splitter.split_documents(docs)
print(f"Split into {len(doc_splits)} chunks")2.2 创建向量存储
python
# 使用 LangChain 的 Qdrant 集成
vectorstore = QdrantVectorStore.from_documents(
documents=doc_splits,
embedding=embeddings,
url=QDRANT_URL,
api_key=QDRANT_API_KEY,
collection_name=collection_name,
force_recreate=False # 设为 True 会删除现有数据
)
print(f"Indexed {len(doc_splits)} documents to Qdrant")
# 创建检索器
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)三、构建 Agentic RAG 系统
3.1 定义状态
python
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
from langchain_core.documents import Document
class AgentState(TypedDict):
"""Agentic RAG 状态定义"""
messages: Annotated[list, add_messages]
question: str
documents: list[Document]
doc_grades: list[dict] # 文档评分
generation: str
search_needed: bool
iteration: int # 迭代次数3.2 创建检索工具
python
from langchain_core.tools import tool
@tool
def retrieve_documents(query: str) -> list[Document]:
"""从 Qdrant 向量数据库检索相关文档。
Args:
query: 搜索查询字符串
Returns:
相关文档列表
"""
docs = retriever.invoke(query)
return docs
@tool
def search_with_filter(query: str, doc_type: str = None) -> list[Document]:
"""带过滤条件的检索。
Args:
query: 搜索查询
doc_type: 文档类型过滤(如 "blog", "api", "tutorial")
Returns:
过滤后的相关文档
"""
from qdrant_client.models import Filter, FieldCondition, MatchValue
# 构建过滤器
filter_condition = None
if doc_type:
filter_condition = Filter(
must=[
FieldCondition(
key="metadata.doc_type",
match=MatchValue(value=doc_type)
)
]
)
# 执行带过滤的检索
results = qdrant_client.search(
collection_name=collection_name,
query_vector=embeddings.embed_query(query),
limit=4,
query_filter=filter_condition
)
# 转换为 Document 格式
docs = []
for result in results:
doc = Document(
page_content=result.payload.get("page_content", ""),
metadata=result.payload.get("metadata", {})
)
docs.append(doc)
return docs3.3 定义节点函数
python
def retrieve(state: AgentState) -> AgentState:
"""检索节点:从 Qdrant 检索文档"""
print("\n📚 RETRIEVE NODE")
question = state["question"]
# 执行检索
documents = retrieve_documents.invoke(question)
print(f" Retrieved {len(documents)} documents")
for i, doc in enumerate(documents):
print(f" [{i+1}] {doc.page_content[:100]}...")
return {
**state,
"documents": documents,
"iteration": state.get("iteration", 0) + 1
}
def grade_documents(state: AgentState) -> AgentState:
"""评分节点:评估文档相关性"""
print("\n⚖️ GRADE DOCUMENTS NODE")
question = state["question"]
documents = state["documents"]
grade_prompt = """你是一个文档相关性评估专家。
给定用户问题和一个文档片段,评估该文档对回答问题的价值。
问题:{question}
文档:{document}
请用 JSON 格式回复:
{{"relevant": true/false, "score": 1-5, "reason": "简短理由"}}
其中 score 含义:
5 = 完全相关,直接回答问题
4 = 高度相关,包含关键信息
3 = 部分相关,有用但不充分
2 = 略有关联,信息有限
1 = 不相关
"""
doc_grades = []
filtered_docs = []
for doc in documents:
response = llm.invoke(
grade_prompt.format(
question=question,
document=doc.page_content[:500]
)
)
# 解析响应
import json
try:
grade = json.loads(response.content)
except:
grade = {"relevant": True, "score": 3, "reason": "解析失败,默认保留"}
doc_grades.append({
"content_preview": doc.page_content[:100],
**grade
})
# 保留相关文档(score >= 3)
if grade.get("score", 0) >= 3:
filtered_docs.append(doc)
print(f" ✅ Score {grade['score']}: {doc.page_content[:50]}...")
else:
print(f" ❌ Score {grade['score']}: {doc.page_content[:50]}...")
# 判断是否需要补充搜索
search_needed = len(filtered_docs) < 2
if search_needed:
print(" ⚠️ 相关文档不足,需要补充搜索")
return {
**state,
"documents": filtered_docs,
"doc_grades": doc_grades,
"search_needed": search_needed
}
def rewrite_query(state: AgentState) -> AgentState:
"""查询重写节点"""
print("\n✏️ REWRITE QUERY NODE")
question = state["question"]
rewrite_prompt = """你是一个搜索查询优化专家。
原始问题检索效果不佳,请重写查询以获得更好的结果。
原始问题:{question}
要求:
1. 保持问题核心意图
2. 使用更精确的关键词
3. 可以拆分为多个子查询
请直接返回优化后的查询(不需要解释)。"""
response = llm.invoke(rewrite_prompt.format(question=question))
rewritten = response.content.strip()
print(f" 原始: {question}")
print(f" 重写: {rewritten}")
return {
**state,
"question": rewritten
}
def web_search(state: AgentState) -> AgentState:
"""网络搜索节点(作为后备)"""
print("\n🌐 WEB SEARCH NODE")
question = state["question"]
documents = state.get("documents", [])
# 使用 DuckDuckGo 搜索
from langchain_community.tools import DuckDuckGoSearchResults
search = DuckDuckGoSearchResults(max_results=3)
results = search.invoke(question)
# 转换为文档格式
web_doc = Document(
page_content=results,
metadata={"source": "web_search", "query": question}
)
documents.append(web_doc)
print(f" Added web search results")
return {
**state,
"documents": documents
}
def generate(state: AgentState) -> AgentState:
"""生成节点:基于检索内容生成答案"""
print("\n💡 GENERATE NODE")
question = state["question"]
documents = state["documents"]
# 构建上下文
context = "\n\n---\n\n".join([
f"来源: {doc.metadata.get('source_url', 'unknown')}\n内容: {doc.page_content}"
for doc in documents
])
generate_prompt = """你是一个知识助手。基于以下检索到的上下文回答用户问题。
上下文信息:
{context}
用户问题:{question}
要求:
1. 准确回答问题,引用上下文中的具体信息
2. 如果上下文信息不足以回答,明确说明
3. 使用清晰的结构组织答案
4. 如有必要,提供相关建议或后续步骤"""
response = llm.invoke(
generate_prompt.format(
context=context,
question=question
)
)
print(f" Generated answer ({len(response.content)} chars)")
return {
**state,
"generation": response.content
}3.4 定义路由逻辑
python
def route_after_grade(state: AgentState) -> Literal["rewrite", "generate"]:
"""评分后的路由决策"""
if state.get("search_needed", False) and state.get("iteration", 0) < 3:
return "rewrite"
return "generate"3.5 构建工作流图
python
from langgraph.graph import StateGraph, START, END
# 创建状态图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade", grade_documents)
workflow.add_node("rewrite", rewrite_query)
workflow.add_node("web_search", web_search)
workflow.add_node("generate", generate)
# 定义边
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade")
workflow.add_conditional_edges(
"grade",
route_after_grade,
{
"rewrite": "rewrite",
"generate": "generate"
}
)
workflow.add_edge("rewrite", "web_search")
workflow.add_edge("web_search", "retrieve") # 循环回检索
workflow.add_edge("generate", END)
# 编译
app = workflow.compile()
# 可视化
print(app.get_graph().draw_ascii())四、高级功能
4.1 混合检索
结合向量搜索和关键词搜索:
python
from qdrant_client.models import SearchParams, MatchText
def hybrid_retrieve(state: AgentState) -> AgentState:
"""混合检索:向量 + 关键词"""
question = state["question"]
# 向量检索
vector_results = qdrant_client.search(
collection_name=collection_name,
query_vector=embeddings.embed_query(question),
limit=3,
search_params=SearchParams(hnsw_ef=128, exact=False)
)
# 关键词检索(全文搜索)
keyword_results = qdrant_client.scroll(
collection_name=collection_name,
scroll_filter=Filter(
should=[
FieldCondition(
key="page_content",
match=MatchText(text=question)
)
]
),
limit=3
)[0]
# 合并并去重
seen_ids = set()
documents = []
for result in vector_results:
if result.id not in seen_ids:
seen_ids.add(result.id)
documents.append(Document(
page_content=result.payload["page_content"],
metadata=result.payload.get("metadata", {})
))
for result in keyword_results:
if result.id not in seen_ids:
seen_ids.add(result.id)
documents.append(Document(
page_content=result.payload["page_content"],
metadata=result.payload.get("metadata", {})
))
return {
**state,
"documents": documents
}4.2 多集合检索
python
def multi_collection_retrieve(state: AgentState) -> AgentState:
"""从多个集合检索并整合"""
question = state["question"]
collections = ["docs_en", "docs_zh", "api_reference"]
all_documents = []
for coll_name in collections:
try:
results = qdrant_client.search(
collection_name=coll_name,
query_vector=embeddings.embed_query(question),
limit=2
)
for r in results:
doc = Document(
page_content=r.payload["page_content"],
metadata={
**r.payload.get("metadata", {}),
"collection": coll_name,
"score": r.score
}
)
all_documents.append(doc)
except Exception as e:
print(f" Warning: Failed to search {coll_name}: {e}")
# 按分数排序
all_documents.sort(
key=lambda x: x.metadata.get("score", 0),
reverse=True
)
return {
**state,
"documents": all_documents[:5] # 取 top 5
}4.3 带记忆的对话
python
from langgraph.checkpoint.memory import MemorySaver
# 添加记忆
memory = MemorySaver()
app_with_memory = workflow.compile(checkpointer=memory)
# 使用时指定 thread_id
config = {"configurable": {"thread_id": "user-123"}}
# 第一轮对话
result1 = app_with_memory.invoke(
{"question": "什么是 ReAct 模式?", "messages": []},
config=config
)
# 第二轮对话(会记住上下文)
result2 = app_with_memory.invoke(
{"question": "它有什么优缺点?", "messages": []},
config=config
)五、生产部署建议
5.1 性能优化
python
# 1. 使用批量操作
def batch_index_documents(documents: list[Document], batch_size: int = 100):
"""批量索引文档"""
from qdrant_client.models import PointStruct
import uuid
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
points = []
for doc in batch:
vector = embeddings.embed_query(doc.page_content)
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=vector,
payload={
"page_content": doc.page_content,
"metadata": doc.metadata
}
))
qdrant_client.upsert(
collection_name=collection_name,
points=points
)
print(f"Indexed batch {i//batch_size + 1}")
# 2. 使用索引优化
qdrant_client.update_collection(
collection_name=collection_name,
optimizer_config={
"indexing_threshold": 20000, # 超过此数量才建索引
},
hnsw_config={
"m": 16,
"ef_construct": 100,
}
)5.2 错误处理
python
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
def safe_retrieve(question: str) -> list[Document]:
"""带重试的安全检索"""
try:
docs = retriever.invoke(question)
return docs
except Exception as e:
print(f"Retrieval error: {e}")
raise5.3 监控指标
python
import time
from dataclasses import dataclass
@dataclass
class RAGMetrics:
retrieval_time: float
grade_time: float
generation_time: float
total_time: float
doc_count: int
relevant_doc_count: int
iteration_count: int
def retrieve_with_metrics(state: AgentState) -> tuple[AgentState, dict]:
"""带监控的检索"""
start = time.time()
result = retrieve(state)
elapsed = time.time() - start
metrics = {
"retrieval_time": elapsed,
"doc_count": len(result["documents"])
}
return result, metrics六、完整示例
python
# 运行完整的 Agentic RAG 系统
def run_agentic_rag(question: str) -> str:
"""运行 Agentic RAG 并返回答案"""
initial_state = {
"question": question,
"messages": [],
"documents": [],
"doc_grades": [],
"generation": "",
"search_needed": False,
"iteration": 0
}
result = app.invoke(initial_state)
return result["generation"]
# 测试
if __name__ == "__main__":
questions = [
"什么是 LLM Agent 的 ReAct 模式?",
"如何防范 LLM 的提示注入攻击?",
"Prompt Engineering 有哪些最佳实践?"
]
for q in questions:
print(f"\n{'='*60}")
print(f"问题: {q}")
print('='*60)
answer = run_agentic_rag(q)
print(f"\n答案:\n{answer}")思考题
- 如何设计 Qdrant 的 payload 结构以支持更复杂的过滤场景?
- 在高并发场景下,如何优化 Qdrant 的查询性能?
- 如何实现跨语言的文档检索和生成?