OpenManus 核心概念
深入理解 Agent、Tool、Memory、Message 等关键抽象的设计原理
本章概览
- Agent 状态机:理解 Agent 的生命周期
- Memory 系统:对话历史与上下文管理
- Message 模型:多角色消息的数据结构
- Tool 抽象:工具的定义与执行
- ToolCall 流程:从 LLM 响应到工具执行
1. Agent 状态机
1.1 状态定义
Agent 在执行过程中会经历不同的状态:
python
# app/schema.py
class AgentState(str, Enum):
"""Agent 执行状态"""
IDLE = "IDLE" # 空闲状态,可接受新任务
RUNNING = "RUNNING" # 运行中,正在执行任务
FINISHED = "FINISHED" # 完成状态,任务已结束
ERROR = "ERROR" # 错误状态,执行出错通俗比喻:这就像一个员工的工作状态:
- IDLE:坐在工位上等待任务
- RUNNING:正在处理工作
- FINISHED:完成了当前任务,准备汇报
- ERROR:遇到问题,需要帮助
1.2 状态转换图
mermaid
stateDiagram-v2
[*] --> IDLE: 初始化
IDLE --> RUNNING: run() 被调用
RUNNING --> RUNNING: step() 执行中
RUNNING --> FINISHED: 任务完成 / Terminate 工具
RUNNING --> ERROR: 发生异常
RUNNING --> IDLE: 达到最大步数
FINISHED --> IDLE: 重置
ERROR --> IDLE: 重置
IDLE --> [*]: 销毁1.3 状态上下文管理器
OpenManus 使用异步上下文管理器来安全地管理状态转换:
python
# app/agent/base.py
@asynccontextmanager
async def state_context(self, new_state: AgentState):
"""安全的状态转换上下文管理器"""
if not isinstance(new_state, AgentState):
raise ValueError(f"Invalid state: {new_state}")
previous_state = self.state
self.state = new_state # 进入新状态
try:
yield
except Exception as e:
self.state = AgentState.ERROR # 异常时转为错误状态
raise e
finally:
self.state = previous_state # 恢复之前状态使用示例:
python
async def run(self, request: Optional[str] = None) -> str:
# 确保从 IDLE 状态开始
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run from state: {self.state}")
# 使用状态上下文管理器
async with self.state_context(AgentState.RUNNING):
while self.current_step < self.max_steps:
# ... 执行逻辑
if self.state == AgentState.FINISHED:
break2. Memory 系统
2.1 Memory 类设计
Memory 是 Agent 的 "记忆",存储对话历史:
python
# app/schema.py
class Memory(BaseModel):
"""Agent 记忆存储"""
messages: List[Message] = Field(default_factory=list)
max_messages: int = Field(default=100) # 最大消息数限制
def add_message(self, message: Message) -> None:
"""添加单条消息"""
self.messages.append(message)
# 超过限制时保留最近的消息
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]
def add_messages(self, messages: List[Message]) -> None:
"""批量添加消息"""
self.messages.extend(messages)
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]
def clear(self) -> None:
"""清空所有消息"""
self.messages.clear()
def get_recent_messages(self, n: int) -> List[Message]:
"""获取最近 n 条消息"""
return self.messages[-n:]
def to_dict_list(self) -> List[dict]:
"""转换为字典列表(用于 API 调用)"""
return [msg.to_dict() for msg in self.messages]2.2 Memory 的作用
通俗比喻:Memory 就像你和朋友的聊天记录。当你问 "刚才我说的那个网站是什么?",你需要翻看聊天记录才能回答。
┌─────────────────────────────────────────────┐
│ Memory │
├─────────────────────────────────────────────┤
│ Message 1: [user] 帮我搜索 AI 新闻 │
│ Message 2: [assistant] 好的,我来搜索... │
│ Message 3: [tool] WebSearch 结果... │
│ Message 4: [assistant] 找到了以下新闻... │
│ Message 5: [user] 第一条新闻详细说说 │
│ ... │
└─────────────────────────────────────────────┘
↓
LLM 可以看到完整上下文2.3 Memory 管理策略
OpenManus 采用 滑动窗口 策略管理内存:
python
# 超过限制时,保留最近的消息
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]这种策略的优缺点:
| 优点 | 缺点 |
|---|---|
| 简单高效 | 可能丢失早期重要信息 |
| 内存可控 | 长对话中断上下文 |
| 保持最新上下文 | 无法回顾历史 |
3. Message 模型
3.1 消息角色
OpenManus 定义了四种消息角色:
python
# app/schema.py
class Role(str, Enum):
"""消息角色"""
SYSTEM = "system" # 系统指令
USER = "user" # 用户输入
ASSISTANT = "assistant" # AI 响应
TOOL = "tool" # 工具执行结果角色交互示意:
┌─────────┐ ┌───────────┐ ┌──────────┐ ┌────────┐
│ SYSTEM │ │ USER │ │ASSISTANT │ │ TOOL │
│ 系统指令 │ │ 用户输入 │ │ AI响应 │ │工具结果 │
└────┬────┘ └─────┬─────┘ └────┬─────┘ └───┬────┘
│ │ │ │
│ "你是OpenManus,一个通用Agent" │ │
│ │ │ │
│ │ "帮我搜索新闻" │ │
│ │ │ │
│ │ │ 调用 WebSearch │
│ │ │──────────────▶│
│ │ │ │
│ │ │◀─────────────│
│ │ │ 返回搜索结果 │
│ │ │ │
│ │ "找到了这些新闻..." │ │
│ │◀──────────────│ │
▼ ▼ ▼ ▼3.2 Message 类设计
python
# app/schema.py
class Message(BaseModel):
"""对话消息"""
role: ROLE_TYPE # 角色
content: Optional[str] = None # 文本内容
tool_calls: Optional[List[ToolCall]] = None # 工具调用列表
name: Optional[str] = None # 工具名称(tool 角色)
tool_call_id: Optional[str] = None # 工具调用 ID
base64_image: Optional[str] = None # 图片(base64 编码)
@classmethod
def user_message(cls, content: str, base64_image: Optional[str] = None):
"""创建用户消息"""
return cls(role=Role.USER, content=content, base64_image=base64_image)
@classmethod
def system_message(cls, content: str):
"""创建系统消息"""
return cls(role=Role.SYSTEM, content=content)
@classmethod
def assistant_message(cls, content: Optional[str] = None):
"""创建助手消息"""
return cls(role=Role.ASSISTANT, content=content)
@classmethod
def tool_message(cls, content: str, name: str, tool_call_id: str):
"""创建工具结果消息"""
return cls(
role=Role.TOOL,
content=content,
name=name,
tool_call_id=tool_call_id,
)
@classmethod
def from_tool_calls(cls, tool_calls: List[Any], content: str = ""):
"""从工具调用创建助手消息"""
formatted_calls = [
{"id": call.id, "function": call.function.model_dump(), "type": "function"}
for call in tool_calls
]
return cls(role=Role.ASSISTANT, content=content, tool_calls=formatted_calls)3.3 消息流转示例
一个完整的任务执行过程中的消息序列:
python
# 1. 系统消息(初始化时添加)
Message.system_message(
"你是 OpenManus,一个能够使用各种工具完成任务的 AI 助手..."
)
# 2. 用户消息
Message.user_message("帮我查找今天的科技新闻")
# 3. 助手消息(包含工具调用)
Message.from_tool_calls(
content="好的,我来帮你搜索科技新闻。",
tool_calls=[
ToolCall(
id="call_123",
function=Function(name="web_search", arguments='{"query": "科技新闻 今天"}')
)
]
)
# 4. 工具结果消息
Message.tool_message(
content="找到以下新闻:1. AI 突破... 2. 量子计算...",
name="web_search",
tool_call_id="call_123"
)
# 5. 最终助手消息
Message.assistant_message("根据搜索结果,今天的科技新闻主要有...")4. Tool 抽象
4.1 工具的本质
通俗比喻:工具就像你手机里的 APP——每个 APP 有特定的功能,你告诉它要做什么,它返回结果。
┌─────────────────────────────────────────────────────┐
│ Tool 抽象 │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 名称 │ │ python_execute │ │
│ │ (name) │ │ │ │
│ └─────────────┘ └─────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 描述 │ │ 执行 Python 代码 │ │
│ │(description)│ │ │ │
│ └─────────────┘ └─────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 参数 │ │ {code: "print('hi')"} │ │
│ │(parameters) │ │ │ │
│ └─────────────┘ └─────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 执行 │ │ async def execute() │ │
│ │ (execute) │ │ │ │
│ └─────────────┘ └─────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘4.2 参数定义(JSON Schema)
工具的参数使用 JSON Schema 定义,这让 LLM 能够理解如何调用工具:
python
# 以 PythonExecute 为例
class PythonExecute(BaseTool):
name: str = "python_execute"
description: str = "执行 Python 代码"
parameters: dict = {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "要执行的 Python 代码",
},
"timeout": {
"type": "integer",
"description": "执行超时时间(秒)",
"default": 5
}
},
"required": ["code"],
}JSON Schema 到 OpenAI Function Calling:
python
def to_param(self) -> Dict:
"""转换为 OpenAI function calling 格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}转换后的格式(发送给 OpenAI API):
json
{
"type": "function",
"function": {
"name": "python_execute",
"description": "执行 Python 代码",
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "要执行的 Python 代码"
}
},
"required": ["code"]
}
}
}4.3 ToolResult:统一的返回格式
python
# app/tool/base.py
class ToolResult(BaseModel):
"""工具执行结果"""
output: Any = None # 正常输出
error: Optional[str] = None # 错误信息
base64_image: Optional[str] = None # 图片(截图等)
system: Optional[str] = None # 系统级消息
def __bool__(self):
"""判断结果是否有效"""
return any(getattr(self, field) for field in self.__fields__)
def __str__(self):
"""转为字符串"""
return f"Error: {self.error}" if self.error else str(self.output)
def __add__(self, other: "ToolResult"):
"""合并两个结果"""
return ToolResult(
output=combine_fields(self.output, other.output),
error=combine_fields(self.error, other.error),
# 图片不能合并,取其一
base64_image=self.base64_image or other.base64_image,
system=combine_fields(self.system, other.system),
)使用示例:
python
# 成功结果
result = ToolResult(output="代码执行成功,输出:Hello World")
# 错误结果
result = ToolResult(error="执行超时")
# 带图片的结果(如浏览器截图)
result = ToolResult(
output="页面加载完成",
base64_image="iVBORw0KGgoAAAANSUhEUg..."
)5. ToolCall 流程
5.1 ToolCall 数据结构
python
# app/schema.py
class Function(BaseModel):
"""函数定义"""
name: str # 函数名
arguments: str # 参数 JSON 字符串
class ToolCall(BaseModel):
"""工具调用"""
id: str # 唯一标识(由 LLM 生成)
type: str = "function" # 类型
function: Function # 函数信息5.2 完整调用流程
mermaid
sequenceDiagram
participant User as 用户
participant Agent as ToolCallAgent
participant LLM as LLM
participant TC as ToolCollection
participant Tool as 具体工具
User->>Agent: 输入请求
Agent->>Agent: update_memory("user", request)
loop ReAct 循环
Agent->>Agent: think()
Agent->>LLM: ask_tool(messages, tools)
LLM-->>Agent: response with tool_calls
alt 有工具调用
Agent->>Agent: 保存 tool_calls
Agent->>Agent: act()
loop 每个 ToolCall
Agent->>TC: execute(name, args)
TC->>Tool: __call__(**args)
Tool-->>TC: ToolResult
TC-->>Agent: ToolResult
Agent->>Agent: update_memory("tool", result)
end
else 无工具调用
Agent->>Agent: 检查是否完成
end
end
Agent-->>User: 返回结果5.3 代码级别的流程
Step 1: 思考阶段 (think)
python
# app/agent/toolcall.py
async def think(self) -> bool:
"""思考:决定下一步行动"""
# 添加引导提示
if self.next_step_prompt:
self.messages += [Message.user_message(self.next_step_prompt)]
# 调用 LLM,获取工具选择
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=[Message.system_message(self.system_prompt)],
tools=self.available_tools.to_params(), # 传入可用工具列表
tool_choice=self.tool_choices, # auto / required / none
)
# 解析响应
self.tool_calls = response.tool_calls or []
content = response.content or ""
# 日志记录
logger.info(f"✨ {self.name}'s thoughts: {content}")
logger.info(f"🛠️ Selected {len(self.tool_calls)} tools")
# 创建助手消息并保存到内存
assistant_msg = Message.from_tool_calls(
content=content,
tool_calls=self.tool_calls
)
self.memory.add_message(assistant_msg)
return bool(self.tool_calls)Step 2: 行动阶段 (act)
python
async def act(self) -> str:
"""执行工具调用"""
results = []
for command in self.tool_calls:
# 执行单个工具
result = await self.execute_tool(command)
# 截断过长的结果
if self.max_observe:
result = result[:self.max_observe]
# 添加工具结果到内存
tool_msg = Message.tool_message(
content=result,
tool_call_id=command.id,
name=command.function.name,
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)Step 3: 执行单个工具
python
async def execute_tool(self, command: ToolCall) -> str:
"""执行单个工具调用"""
name = command.function.name
# 检查工具是否存在
if name not in self.available_tools.tool_map:
return f"Error: Unknown tool '{name}'"
try:
# 解析参数
args = json.loads(command.function.arguments or "{}")
# 执行工具
result = await self.available_tools.execute(name=name, tool_input=args)
# 处理特殊工具(如 Terminate)
await self._handle_special_tool(name=name, result=result)
# 格式化输出
return f"Observed output of cmd `{name}`:\n{str(result)}"
except json.JSONDecodeError:
return f"Error: Invalid JSON arguments for {name}"
except Exception as e:
return f"Error: Tool '{name}' failed: {str(e)}"6. ToolChoice 策略
6.1 三种选择模式
python
# app/schema.py
class ToolChoice(str, Enum):
NONE = "none" # 不使用工具
AUTO = "auto" # LLM 自动决定是否使用工具
REQUIRED = "required" # 必须使用工具使用场景对比:
| 模式 | 场景 | LLM 行为 |
|---|---|---|
none | 纯对话,不需要工具 | 只返回文本,忽略工具 |
auto | 通用场景(默认) | 根据需要决定是否调用工具 |
required | 必须执行操作 | 一定会返回工具调用 |
6.2 不同模式的处理逻辑
python
async def think(self) -> bool:
# ... 调用 LLM 获取响应 ...
# 处理不同的 tool_choices 模式
if self.tool_choices == ToolChoice.NONE:
# 不应该有工具调用
if tool_calls:
logger.warning("Tools used when not available!")
if content:
self.memory.add_message(Message.assistant_message(content))
return True
return False
# ... 创建并保存助手消息 ...
if self.tool_choices == ToolChoice.REQUIRED and not self.tool_calls:
# REQUIRED 模式下没有工具调用,在 act() 中处理
return True
# AUTO 模式
if self.tool_choices == ToolChoice.AUTO and not self.tool_calls:
# 没有工具调用时,检查是否有内容
return bool(content)
return bool(self.tool_calls)7. 特殊工具处理
7.1 Terminate 工具
Terminate 是一个特殊工具,调用后会结束 Agent 的执行:
python
# app/tool/terminate.py
class Terminate(BaseTool):
name: str = "terminate"
description: str = "当任务完成时调用此工具终止交互"
parameters: dict = {
"type": "object",
"properties": {
"status": {
"type": "string",
"description": "最终状态消息",
},
},
"required": ["status"],
}
async def execute(self, status: str) -> ToolResult:
return ToolResult(output=f"Task completed: {status}")7.2 特殊工具处理机制
python
# app/agent/toolcall.py
async def _handle_special_tool(self, name: str, result: Any):
"""处理特殊工具"""
if not self._is_special_tool(name):
return
if self._should_finish_execution(name=name, result=result):
logger.info(f"🏁 Special tool '{name}' completed the task!")
self.state = AgentState.FINISHED # 设置完成状态
def _is_special_tool(self, name: str) -> bool:
"""检查是否为特殊工具"""
return name.lower() in [n.lower() for n in self.special_tool_names]8. 概念关系总结
8.1 核心概念关系图
┌─────────────────────────────────────────────────────────────────┐
│ Agent │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ State │ │ Memory │ │ LLM │ │ Tools │ │
│ │ 状态机 │ │ 记忆 │ │ 语言模型 │ │ 工具集 │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │ │
│ │ ▼ │ │ │
│ │ ┌───────────┐ │ │ │
│ │ │ Messages │ │ │ │
│ │ │ 消息列表 │◀───────┘ │ │
│ │ └─────┬─────┘ │ │
│ │ │ │ │
│ │ ▼ ▼ │
│ │ ┌───────────┐ ┌───────────┐ │
│ │ │ ToolCalls │──────────────│ToolResult │ │
│ │ │ 工具调用 │ │ 执行结果 │ │
│ │ └───────────┘ └───────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ ReAct Loop (思考-行动循环) │ │
│ │ IDLE ──▶ RUNNING ──▶ (think/act) ──▶ FINISHED │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘8.2 概念速查表
| 概念 | 作用 | 关键属性/方法 |
|---|---|---|
| AgentState | 管理执行状态 | IDLE, RUNNING, FINISHED, ERROR |
| Memory | 存储对话历史 | messages, add_message(), clear() |
| Message | 单条消息 | role, content, tool_calls |
| BaseTool | 工具基类 | name, description, execute() |
| ToolResult | 工具执行结果 | output, error, base64_image |
| ToolCall | 工具调用请求 | id, function.name, function.arguments |
| ToolChoice | 工具选择策略 | NONE, AUTO, REQUIRED |
| ToolCollection | 工具集合管理 | tool_map, execute(), add_tools() |
下一章:22.4 实现细节 - 深入源码,理解关键实现