Skip to content

Thinking in LangGraph

基于 LangGraph 官方文档:如何用图思维构建智能体和工作流


概述

本章介绍如何使用 LangGraph 构建智能体(Agents)和工作流(Workflows)。理解"工作流"与"智能体"的区别是掌握 LangGraph 的关键。

工作流 vs 智能体

引用 Anthropic 的《Building Effective Agents》博客:

工作流(Workflows) 是 LLM 和工具通过 预定义代码路径 进行编排的系统。

智能体(Agents) 是 LLM 动态指导 自身流程和工具使用的系统,保持对任务完成方式的控制。

工作流与智能体的区别图:工作流与智能体的核心区别


核心五步法

使用 LangGraph 构建系统的五个关键步骤:

步骤 1:将流程映射为离散步骤

识别独立的操作,绘制连接关系。示例节点:

  • 读取邮件 → 分类意图 → 文档搜索 → Bug 追踪 → 草拟回复 → 人工审核 → 发送回复

步骤 2:确定步骤类型

将每个节点按类型分类:

类型说明示例
LLM 步骤理解、分析、生成、推理分类意图、生成回复
数据步骤外部信息检索文档搜索、数据库查询
动作步骤外部系统操作发送邮件、创建工单
用户输入步骤人工介入审核确认、修改反馈

步骤 3:设计状态(State)

状态是所有节点可访问的共享内存。 核心原则:

存储原始数据,按需格式化提示词。

python
from typing_extensions import TypedDict

class EmailAgentState(TypedDict):
    email_content: str                      # 原始邮件内容
    sender_email: str                       # 发件人
    classification: EmailClassification | None  # 分类结果
    search_results: list[str] | None        # 搜索结果
    draft_response: str | None              # 草稿回复

步骤 4:构建节点

节点是接受状态并返回更新的 Python 函数:

python
def classify_email(state: EmailAgentState):
    """分类邮件意图"""
    result = llm.invoke(f"分类这封邮件的意图:{state['email_content']}")
    return {"classification": result}

步骤 5:连接节点

使用最小必要的边连接节点。路由逻辑通过 Command 对象在节点内部实现。


环境准备

bash
pip install langchain_core langchain-anthropic langgraph
python
import os
import getpass
from langchain_anthropic import ChatAnthropic

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("ANTHROPIC_API_KEY")

llm = ChatAnthropic(model="claude-3-5-sonnet-latest")

构建基石:增强型 LLM

LLM 通过以下增强能力支持构建工作流和智能体:

增强型 LLM图:增强型 LLM 的核心能力

结构化输出

python
from pydantic import BaseModel, Field

class SearchQuery(BaseModel):
    search_query: str = Field(None, description="优化后的搜索查询")
    justification: str = Field(None, description="查询相关性解释")

# 为 LLM 添加结构化输出能力
structured_llm = llm.with_structured_output(SearchQuery)

# 调用
output = structured_llm.invoke("钙化积分与高胆固醇有什么关系?")

工具调用

python
def multiply(a: int, b: int) -> int:
    return a * b

# 为 LLM 绑定工具
llm_with_tools = llm.bind_tools([multiply])

# 调用触发工具
msg = llm_with_tools.invoke("2 乘以 3 等于多少?")
print(msg.tool_calls)

工作流模式一:提示链(Prompt Chaining)

每个 LLM 调用处理前一个调用的输出。

适用场景:任务可以清晰分解为固定子任务。主要目标是用延迟换取更高准确性。

提示链图:提示链工作流

实现示例

python
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str
    improved_joke: str
    final_joke: str

def generate_joke(state: State):
    """第一步:生成初始笑话"""
    msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
    return {"joke": msg.content}

def check_punchline(state: State):
    """质量检查:是否有笑点"""
    if "?" in state["joke"] or "!" in state["joke"]:
        return "Pass"
    return "Fail"

def improve_joke(state: State):
    """第二步:改进笑话"""
    msg = llm.invoke(f"用双关语让这个笑话更有趣:{state['joke']}")
    return {"improved_joke": msg.content}

def polish_joke(state: State):
    """第三步:最终润色"""
    msg = llm.invoke(f"给这个笑话加个反转:{state['improved_joke']}")
    return {"final_joke": msg.content}

# 构建工作流
workflow = StateGraph(State)
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)

workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
    "generate_joke", check_punchline,
    {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)

chain = workflow.compile()

工作流模式二:并行化(Parallelization)

多个 LLM 同时处理任务,结果聚合。

适用场景:子任务可以并行执行以提高速度,或需要多个视角获得更高置信度。

并行化图:并行化工作流

实现示例

python
class State(TypedDict):
    topic: str
    joke: str
    story: str
    poem: str
    combined_output: str

def call_llm_1(state: State):
    msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
    return {"joke": msg.content}

def call_llm_2(state: State):
    msg = llm.invoke(f"写一个关于 {state['topic']} 的故事")
    return {"story": msg.content}

def call_llm_3(state: State):
    msg = llm.invoke(f"写一首关于 {state['topic']} 的诗")
    return {"poem": msg.content}

def aggregator(state: State):
    combined = f"关于 {state['topic']} 的故事、笑话和诗!\n\n"
    combined += f"故事:\n{state['story']}\n\n"
    combined += f"笑话:\n{state['joke']}\n\n"
    combined += f"诗:\n{state['poem']}"
    return {"combined_output": combined}

# 构建并行工作流
parallel_builder = StateGraph(State)
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)

# 从 START 并行连接到三个节点
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")

# 三个节点汇聚到聚合器
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)

parallel_workflow = parallel_builder.compile()

工作流模式三:路由(Routing)

根据输入分类,将其导向专门的后续任务。

适用场景:复杂任务有不同类别需要分别处理,且分类可以准确完成。

路由图:路由工作流

实现示例

python
from typing_extensions import Literal
from langchain_core.messages import HumanMessage, SystemMessage

class Route(BaseModel):
    step: Literal["poem", "story", "joke"] = Field(
        None, description="路由到的下一步"
    )

router = llm.with_structured_output(Route)

class State(TypedDict):
    input: str
    decision: str
    output: str

def llm_call_router(state: State):
    """路由决策"""
    decision = router.invoke([
        SystemMessage(content="根据用户请求路由到 story、joke 或 poem"),
        HumanMessage(content=state["input"]),
    ])
    return {"decision": decision.step}

def route_decision(state: State):
    if state["decision"] == "story":
        return "llm_call_1"
    elif state["decision"] == "joke":
        return "llm_call_2"
    elif state["decision"] == "poem":
        return "llm_call_3"

# 构建路由工作流
router_builder = StateGraph(State)
router_builder.add_node("llm_call_router", llm_call_router)
router_builder.add_node("llm_call_1", llm_call_1)  # 写故事
router_builder.add_node("llm_call_2", llm_call_2)  # 写笑话
router_builder.add_node("llm_call_3", llm_call_3)  # 写诗

router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
    "llm_call_router", route_decision,
    {"llm_call_1": "llm_call_1", "llm_call_2": "llm_call_2", "llm_call_3": "llm_call_3"}
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)

router_workflow = router_builder.compile()

工作流模式四:编排者-工作者(Orchestrator-Workers)

中央编排者动态分配任务给专业工作者。

适用场景:任务复杂度不可预测,无法预先分解为固定子任务。

编排者-工作者图:编排者-工作者模式

实现示例

python
from langgraph.types import Send

class Section(BaseModel):
    name: str = Field(description="报告章节名称")
    description: str = Field(description="章节内容描述")

class Sections(BaseModel):
    sections: list[Section] = Field(description="报告章节列表")

planner = llm.with_structured_output(Sections)

class State(TypedDict):
    topic: str
    sections: list[Section]
    completed_sections: Annotated[list[str], operator.add]
    final_report: str

class WorkerState(TypedDict):
    section: Section

def orchestrator(state: State):
    """编排者:规划报告结构"""
    report_sections = planner.invoke([
        SystemMessage(content="根据主题生成报告大纲"),
        HumanMessage(content=f"主题:{state['topic']}")
    ])
    return {"sections": report_sections.sections}

def llm_call(state: WorkerState):
    """工作者:撰写单个章节"""
    section = state["section"]
    result = llm.invoke([
        SystemMessage(content="撰写报告章节"),
        HumanMessage(content=f"章节:{section.name}\n描述:{section.description}")
    ])
    return {"completed_sections": [result.content]}

def assign_workers(state: State):
    """分配任务给工作者"""
    return [Send("worker", {"section": s}) for s in state["sections"]]

def synthesizer(state: State):
    """合成最终报告"""
    completed = "\n\n".join(state["completed_sections"])
    return {"final_report": completed}

# 构建工作流
orchestrator_worker = StateGraph(State)
orchestrator_worker.add_node("orchestrator", orchestrator)
orchestrator_worker.add_node("worker", llm_call)
orchestrator_worker.add_node("synthesizer", synthesizer)

orchestrator_worker.add_edge(START, "orchestrator")
orchestrator_worker.add_conditional_edges("orchestrator", assign_workers, ["worker"])
orchestrator_worker.add_edge("worker", "synthesizer")
orchestrator_worker.add_edge("synthesizer", END)

工作流模式五:评估者-优化者(Evaluator-Optimizer)

通过评估-反馈-改进循环持续优化输出。

适用场景:有明确的评估标准,且迭代改进可以带来可衡量的价值。

评估者-优化者图:评估者-优化者模式

实现示例

python
class Feedback(BaseModel):
    grade: Literal["pass", "fail"] = Field(description="评估结果")
    feedback: str = Field(description="改进建议")

evaluator = llm.with_structured_output(Feedback)

class State(TypedDict):
    joke: str
    topic: str
    feedback: str
    funny_or_not: str

def llm_call_generator(state: State):
    """生成笑话"""
    if state.get("feedback"):
        msg = llm.invoke(
            f"根据反馈改进笑话。\n反馈:{state['feedback']}\n当前笑话:{state['joke']}"
        )
    else:
        msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
    return {"joke": msg.content}

def llm_call_evaluator(state: State):
    """评估笑话"""
    grade = evaluator.invoke([
        SystemMessage(content="评估这个笑话是否有趣,给出 pass 或 fail"),
        HumanMessage(content=state["joke"])
    ])
    return {"funny_or_not": grade.grade, "feedback": grade.feedback}

def route_joke(state: State):
    if state["funny_or_not"] == "pass":
        return "Accepted"
    return "Rejected"

# 构建评估优化工作流
optimizer_builder = StateGraph(State)
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)

optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
    "llm_call_evaluator", route_joke,
    {"Accepted": END, "Rejected": "llm_call_generator"}
)

optimizer = optimizer_builder.compile()

智能体模式(Agent)

从工作流到智能体的跨越:让 LLM 自主决定使用哪些工具。

核心区别:智能体中,LLM 在循环中运行,动态决定调用哪些工具,直到完成任务。

智能体图:智能体模式

使用 create_react_agent

python
from langgraph.prebuilt import create_react_agent

def multiply(a: int, b: int) -> int:
    """两数相乘"""
    return a * b

def add(a: int, b: int) -> int:
    """两数相加"""
    return a + b

def divide(a: int, b: int) -> float:
    """两数相除"""
    return a / b

tools = [add, multiply, divide]

# 创建 ReAct 智能体
agent = create_react_agent(llm, tools)

# 调用
result = agent.invoke({
    "messages": [("human", "先把 3 和 4 相加,然后乘以 2")]
})

错误处理策略

不同错误类型的处理方式:

错误类型处理策略
瞬态错误自动重试
LLM 可恢复错误存储错误,循环回重试
用户可修复错误使用 interrupt() 暂停
意外错误冒泡到上层调试

重试策略

python
from langgraph.pregel import RetryPolicy

workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3)
)

人工介入

python
from langgraph.types import interrupt

def human_review(state: State):
    """需要人工审核"""
    result = interrupt("请审核此内容...")
    return {"approved": result}

关键洞察

  1. 分解为离散步骤 - 支持流式输出、持久化和调试
  2. 状态存储原始数据 - 节点按需格式化
  3. 节点是简单函数 - 接收状态,返回更新
  4. 错误纳入工作流逻辑 - 不只是异常处理
  5. 人工输入是一等公民 - 通过 interrupt() 支持
  6. 图结构自然涌现 - 通过节点路由逻辑

思考题

  1. 什么时候应该选择工作流而不是智能体?
  2. 如何在节点粒度和检查点频率之间取得平衡?
  3. 如何设计状态以支持多轮对话的上下文保持?

参考资源

基于 MIT 许可证发布。内容版权归作者所有。