Python 基础入门:从零到一掌握 Python
本章专为 LangGraph 和 AI 开发新手设计,通过代码实例快速掌握 Python 核心概念。我们采用"先看代码,后学原理"的方式,让你在实践中理解 Python。
📋 本章目标
- ✅ 掌握 Python 3.13 核心语法
- ✅ 理解面向对象编程思想
- ✅ 学会使用 Jupyter Notebook 进行交互式开发
- ✅ 掌握机器学习/AI 核心库(NumPy、Pandas、Scikit-learn、PyTorch、可视化)
- ✅ 掌握 Git/GitHub 基础操作
- ✅ 为学习 LangGraph 打下坚实基础
1. 环境准备:选择你的开发工具
🎯 推荐环境
选项 1:Jupyter Lab(推荐新手)
# 安装 Jupyter Lab
pip install jupyterlab
# 启动
jupyter lab
选项 2:VS Code / Cursor + Jupyter 插件
# 安装 Python 扩展和 Jupyter 扩展
# 在 VS Code 中搜索并安装:
# 1. Python (by Microsoft)
# 2. Jupyter (by Microsoft)
✨ 验证环境
创建一个新的 Notebook,运行以下代码:
import sys
print(f"Python 版本: {sys.version}")
print(f"你好,LangGraph 世界!")
运行结果:
Python 版本: 3.13.0 (main, Oct 7 2024, 05:02:14) [Clang 15.0.0]
你好,LangGraph 世界!
2. Python 核心概念:变量与数据类型
2.1 先看代码:变量的魔法
# 代码示例:变量就像带标签的盒子
name = "Alice"
age = 25
height = 1.68
is_student = True
print(f"姓名: {name}")
print(f"年龄: {age}")
print(f"身高: {height}m")
print(f"是学生: {is_student}")
print(f"类型: {type(name)}, {type(age)}, {type(height)}, {type(is_student)}")
运行结果:
姓名: Alice
年龄: 25
身高: 1.68m
是学生: True
类型: <class 'str'>, <class 'int'>, <class 'float'>, <class 'bool'>
📖 概念解释
变量:存储数据的容器,无需声明类型(动态类型)
基础数据类型:
str
- 字符串(文本)int
- 整数float
- 浮点数(小数)bool
- 布尔值(True/False)
💡 LangGraph 应用场景
在 LangGraph 中,你会经常用到这些类型:
str
- 存储用户消息、AI 回复int
- 计数器、步骤编号bool
- 判断条件、控制流程
3. 数据结构:组织你的数据
3.1 列表(List):有序的数据集合
# 代码示例:管理对话历史
messages = ["Hello", "How are you?", "I'm fine"]
# 添加新消息
messages.append("What's your name?")
# 访问第一条消息
print(f"第一条消息: {messages[0]}")
# 访问最后一条消息
print(f"最后一条消息: {messages[-1]}")
# 遍历所有消息
print("\n完整对话:")
for i, msg in enumerate(messages, 1):
print(f" {i}. {msg}")
# 切片操作
print(f"\n前两条消息: {messages[:2]}")
运行结果:
第一条消息: Hello
最后一条消息: What's your name?
完整对话:
1. Hello
2. How are you?
3. I'm fine
4. What's your name?
前两条消息: ['Hello', 'How are you?']
3.2 字典(Dict):键值对的数据存储
# 代码示例:存储 Agent 状态
agent_state = {
"user_id": "user_123",
"conversation_id": "conv_456",
"turn_count": 3,
"context": {
"topic": "weather",
"location": "Beijing"
}
}
# 访问数据
print(f"用户ID: {agent_state['user_id']}")
print(f"对话轮次: {agent_state['turn_count']}")
print(f"话题: {agent_state['context']['topic']}")
# 更新数据
agent_state['turn_count'] += 1
print(f"\n更新后的轮次: {agent_state['turn_count']}")
# 获取所有键
print(f"\n所有字段: {list(agent_state.keys())}")
运行结果:
用户ID: user_123
对话轮次: 3
话题: weather
更新后的轮次: 4
所有字段: ['user_id', 'conversation_id', 'turn_count', 'context']
3.3 元组(Tuple):不可变的序列
# 代码示例:定义消息格式
message = ("user", "Hello, how can I help?")
role, content = message # 解包
print(f"角色: {role}")
print(f"内容: {content}")
# 元组不可修改(会报错)
try:
message[0] = "assistant"
except TypeError as e:
print(f"\n❌ 错误: {e}")
运行结果:
角色: user
内容: Hello, how can I help?
❌ 错误: 'tuple' object does not support item assignment
3.4 集合(Set):唯一元素的集合
# 代码示例:去重用户提到的话题
topics = {"weather", "sports", "weather", "music", "sports"}
print(f"唯一话题: {topics}")
# 集合运算
user_interests = {"music", "sports", "reading"}
available_topics = {"music", "weather", "news"}
common = user_interests & available_topics # 交集
print(f"共同话题: {common}")
运行结果:
唯一话题: {'music', 'weather', 'sports'}
共同话题: {'music'}
📖 数据结构对比
类型 | 有序 | 可变 | 允许重复 | 使用场景 |
---|---|---|---|---|
List | ✅ | ✅ | ✅ | 对话历史、消息列表 |
Dict | ❌ | ✅ | Key不重复 | 状态管理、配置 |
Tuple | ✅ | ❌ | ✅ | 固定格式数据 |
Set | ❌ | ✅ | ❌ | 去重、集合运算 |
4. 控制流:让程序做决策
4.1 条件判断(if-elif-else)
# 代码示例:根据用户情绪选择回复策略
sentiment_score = 0.8 # 1.0 最积极,-1.0 最消极
if sentiment_score > 0.6:
response_style = "热情友好"
emoji = "😊"
elif sentiment_score > 0:
response_style = "中性客观"
emoji = "🙂"
else:
response_style = "温和安慰"
emoji = "🤗"
print(f"情绪分数: {sentiment_score}")
print(f"回复风格: {response_style} {emoji}")
# 简洁的三元运算符
mood = "积极" if sentiment_score > 0 else "消极"
print(f"情绪: {mood}")
运行结果:
情绪分数: 0.8
回复风格: 热情友好 😊
情绪: 积极
4.2 循环(for):遍历数据
# 代码示例:批量处理消息
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
{"role": "user", "content": "How are you?"}
]
print("对话历史:")
for i, msg in enumerate(messages):
prefix = "👤" if msg["role"] == "user" else "🤖"
print(f"{i+1}. {prefix} {msg['content']}")
# 列表推导式(高级技巧)
user_messages = [msg["content"] for msg in messages if msg["role"] == "user"]
print(f"\n用户消息: {user_messages}")
运行结果:
对话历史:
1. 👤 Hello
2. 🤖 Hi there!
3. 👤 How are you?
用户消息: ['Hello', 'How are you?']
4.3 循环(while):条件循环
# 代码示例:限制重试次数
max_retries = 3
attempt = 0
while attempt < max_retries:
attempt += 1
print(f"尝试 #{attempt}...")
# 模拟 API 调用
success = (attempt == 2) # 第2次成功
if success:
print("✅ 成功!")
break
else:
print("❌ 失败,重试中...")
else:
print("\n⚠️ 达到最大重试次数")
运行结果:
尝试 #1...
❌ 失败,重试中...
尝试 #2...
✅ 成功!
5. 函数:代码的复用单元
5.1 基础函数
# 代码示例:格式化消息
def format_message(role, content, timestamp=None):
"""格式化聊天消息
Args:
role: 角色(user/assistant)
content: 消息内容
timestamp: 时间戳(可选)
Returns:
格式化后的字符串
"""
prefix = "👤" if role == "user" else "🤖"
time_str = f"[{timestamp}] " if timestamp else ""
return f"{time_str}{prefix} {content}"
# 调用函数
msg1 = format_message("user", "Hello")
msg2 = format_message("assistant", "Hi!", "10:30")
print(msg1)
print(msg2)
运行结果:
👤 Hello
[10:30] 🤖 Hi!
5.2 返回多个值
# 代码示例:分析文本
def analyze_text(text):
"""分析文本的基本统计信息"""
word_count = len(text.split())
char_count = len(text)
has_question = "?" in text
return word_count, char_count, has_question
# 接收多个返回值
text = "How are you doing today?"
words, chars, is_question = analyze_text(text)
print(f"文本: {text}")
print(f"单词数: {words}")
print(f"字符数: {chars}")
print(f"是问题: {is_question}")
运行结果:
文本: How are you doing today?
单词数: 5
字符数: 26
是问题: True
5.3 Lambda 函数:简洁的匿名函数
# 代码示例:快速定义简单函数
# 传统函数
def double(x):
return x * 2
# Lambda 函数(等价)
double_lambda = lambda x: x * 2
print(f"传统函数: {double(5)}")
print(f"Lambda: {double_lambda(5)}")
# Lambda 在排序中的应用
messages = [
{"content": "Hello", "length": 5},
{"content": "Hi", "length": 2},
{"content": "How are you?", "length": 12}
]
# 按长度排序
sorted_msgs = sorted(messages, key=lambda x: x["length"])
print(f"\n按长度排序: {[m['content'] for m in sorted_msgs]}")
运行结果:
传统函数: 10
Lambda: 10
按长度排序: ['Hi', 'Hello', 'How are you?']
6. 面向对象编程:构建你的 Agent
6.1 类与对象
# 代码示例:创建一个简单的 ChatBot 类
class SimpleChatBot:
"""简单的聊天机器人"""
def __init__(self, name, model="gpt-4"):
"""初始化方法"""
self.name = name
self.model = model
self.conversation_history = []
def chat(self, user_message):
"""处理用户消息"""
# 添加到历史
self.conversation_history.append({
"role": "user",
"content": user_message
})
# 生成回复(简化版)
reply = f"收到消息: {user_message}"
self.conversation_history.append({
"role": "assistant",
"content": reply
})
return reply
def get_history(self):
"""获取对话历史"""
return self.conversation_history
def __str__(self):
"""字符串表示"""
return f"ChatBot(name={self.name}, model={self.model})"
# 创建实例
bot = SimpleChatBot(name="小助手", model="gpt-4o")
print(bot)
# 使用对象
reply1 = bot.chat("你好")
reply2 = bot.chat("天气怎么样?")
print(f"\n机器人回复: {reply1}")
print(f"机器人回复: {reply2}")
print(f"\n对话轮次: {len(bot.conversation_history) // 2}")
运行结果:
ChatBot(name=小助手, model=gpt-4o)
机器人回复: 收到消息: 你好
机器人回复: 收到消息: 天气怎么样?
对话轮次: 2
6.2 继承:扩展功能
# 代码示例:创建增强版 ChatBot
class AdvancedChatBot(SimpleChatBot):
"""增强版聊天机器人,支持情绪分析"""
def __init__(self, name, model="gpt-4", enable_sentiment=True):
super().__init__(name, model) # 调用父类构造函数
self.enable_sentiment = enable_sentiment
self.sentiment_scores = []
def analyze_sentiment(self, text):
"""分析情绪(简化版)"""
positive_words = ["好", "棒", "喜欢", "开心"]
score = sum(1 for word in positive_words if word in text)
return min(score / len(positive_words), 1.0)
def chat(self, user_message):
"""重写父类方法"""
# 情绪分析
if self.enable_sentiment:
sentiment = self.analyze_sentiment(user_message)
self.sentiment_scores.append(sentiment)
print(f"情绪分数: {sentiment:.2f}")
# 调用父类方法
return super().chat(user_message)
# 使用增强版
advanced_bot = AdvancedChatBot(name="智能助手")
advanced_bot.chat("你好,今天天气真好!")
advanced_bot.chat("我很开心")
print(f"\n平均情绪: {sum(advanced_bot.sentiment_scores) / len(advanced_bot.sentiment_scores):.2f}")
运行结果:
情绪分数: 0.25
情绪分数: 0.25
平均情绪: 0.25
7. 异常处理:优雅地处理错误
7.1 try-except 基础
# 代码示例:安全地处理用户输入
def safe_divide(a, b):
"""安全的除法运算"""
try:
result = a / b
return result
except ZeroDivisionError:
print("⚠️ 错误: 除数不能为0")
return None
except TypeError as e:
print(f"⚠️ 类型错误: {e}")
return None
# 测试
print(f"10 / 2 = {safe_divide(10, 2)}")
print(f"10 / 0 = {safe_divide(10, 0)}")
print(f"10 / '2' = {safe_divide(10, '2')}")
运行结果:
10 / 2 = 5.0
⚠️ 错误: 除数不能为0
10 / 0 = None
⚠️ 类型错误: unsupported operand type(s) for /: 'int' and 'str'
10 / '2' = None
7.2 自定义异常
# 代码示例:创建自定义异常
class MessageTooLongError(Exception):
"""消息过长异常"""
pass
def validate_message(message, max_length=100):
"""验证消息长度"""
if len(message) > max_length:
raise MessageTooLongError(f"消息长度 {len(message)} 超过限制 {max_length}")
return True
# 使用
try:
short_msg = "Hello"
validate_message(short_msg)
print(f"✅ 消息有效: {short_msg}")
long_msg = "x" * 101
validate_message(long_msg)
except MessageTooLongError as e:
print(f"❌ {e}")
运行结果:
✅ 消息有效: Hello
❌ 消息长度 101 超过限制 100
8. 文件操作:读写数据
8.1 读取文件
# 代码示例:读取配置文件
# 首先创建一个测试文件
with open("config.txt", "w", encoding="utf-8") as f:
f.write("model=gpt-4o\n")
f.write("temperature=0.7\n")
f.write("max_tokens=1000\n")
# 读取文件
print("配置内容:")
with open("config.txt", "r", encoding="utf-8") as f:
for line in f:
print(f" {line.strip()}")
# 读取为字典
config = {}
with open("config.txt", "r", encoding="utf-8") as f:
for line in f:
key, value = line.strip().split("=")
config[key] = value
print(f"\n配置字典: {config}")
运行结果:
配置内容:
model=gpt-4o
temperature=0.7
max_tokens=1000
配置字典: {'model': 'gpt-4o', 'temperature': '0.7', 'max_tokens': '1000'}
8.2 JSON 文件操作
import json
# 代码示例:保存和加载对话历史
conversation = {
"session_id": "session_123",
"messages": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
],
"metadata": {
"start_time": "2024-10-12 10:00:00",
"turn_count": 1
}
}
# 保存为 JSON
with open("conversation.json", "w", encoding="utf-8") as f:
json.dump(conversation, f, ensure_ascii=False, indent=2)
print("✅ 对话已保存到 conversation.json")
# 从 JSON 加载
with open("conversation.json", "r", encoding="utf-8") as f:
loaded_conv = json.load(f)
print(f"\n加载的会话ID: {loaded_conv['session_id']}")
print(f"消息数量: {len(loaded_conv['messages'])}")
print(f"首条消息: {loaded_conv['messages'][0]}")
运行结果:
✅ 对话已保存到 conversation.json
加载的会话ID: session_123
消息数量: 2
首条消息: {'role': 'user', 'content': 'Hello'}
9. 常用内置函数和标准库
9.1 强大的内置函数
# 代码示例:常用内置函数合集
numbers = [1, 5, 3, 9, 2, 7]
print("数据:", numbers)
print(f"长度: {len(numbers)}")
print(f"最大值: {max(numbers)}")
print(f"最小值: {min(numbers)}")
print(f"求和: {sum(numbers)}")
print(f"排序: {sorted(numbers)}")
print(f"反转: {sorted(numbers, reverse=True)}")
# map 和 filter
squared = list(map(lambda x: x**2, numbers))
print(f"\n平方: {squared}")
evens = list(filter(lambda x: x % 2 == 0, numbers))
print(f"偶数: {evens}")
# zip:配对数据
names = ["Alice", "Bob", "Charlie"]
scores = [85, 90, 78]
paired = list(zip(names, scores))
print(f"\n姓名-分数配对: {paired}")
运行结果:
数据: [1, 5, 3, 9, 2, 7]
长度: 6
最大值: 9
最小值: 1
求和: 27
排序: [1, 2, 3, 5, 7, 9]
反转: [9, 7, 5, 3, 2, 1]
平方: [1, 25, 9, 81, 4, 49]
偶数: [2]
姓名-分数配对: [('Alice', 85), ('Bob', 90), ('Charlie', 78)]
9.2 时间处理(datetime)
from datetime import datetime, timedelta
# 代码示例:时间戳和时间计算
now = datetime.now()
print(f"当前时间: {now}")
print(f"格式化: {now.strftime('%Y-%m-%d %H:%M:%S')}")
# 时间计算
tomorrow = now + timedelta(days=1)
print(f"明天: {tomorrow.strftime('%Y-%m-%d')}")
# 计算时间差
start_time = datetime(2024, 10, 1, 10, 0, 0)
end_time = datetime(2024, 10, 12, 16, 0, 0)
duration = end_time - start_time
print(f"\n持续时间: {duration.days} 天 {duration.seconds // 3600} 小时")
运行结果:
当前时间: 2024-10-12 16:00:00.123456
格式化: 2024-10-12 16:00:00
明天: 2024-10-13
持续时间: 11 天 6 小时
9.3 随机数(random)
import random
# 代码示例:随机选择和生成
responses = [
"That's interesting!",
"Tell me more.",
"I see.",
"Got it!"
]
print("随机回复示例:")
for i in range(3):
print(f" {i+1}. {random.choice(responses)}")
# 随机数生成
print(f"\n随机整数 (1-10): {random.randint(1, 10)}")
print(f"随机浮点数 (0-1): {random.random():.2f}")
# 打乱列表
items = [1, 2, 3, 4, 5]
random.shuffle(items)
print(f"打乱后: {items}")
运行结果:
随机回复示例:
1. Tell me more.
2. That's interesting!
3. Got it!
随机整数 (1-10): 7
随机浮点数 (0-1): 0.42
打乱后: [3, 1, 5, 2, 4]
10. Git/GitHub 基础:版本控制必备
10.1 为什么需要 Git?
想象你在写一个复杂的 LangGraph 应用:
- 💾 版本管理:记录每次修改,可以回退到任何历史版本
- 👥 团队协作:多人同时开发,合并代码
- 🔍 追踪变更:知道谁、何时、为什么修改了代码
- ☁️ 云端备份:GitHub 保存你的所有代码
10.2 Git 核心概念
# 工作流程示意
工作区 (Working Directory)
↓ git add
暂存区 (Staging Area)
↓ git commit
本地仓库 (Local Repository)
↓ git push
远程仓库 (Remote Repository - GitHub)
10.3 Git 基础命令实战
# 1. 配置 Git(首次使用)
git config --global user.name "你的名字"
git config --global user.email "your.email@example.com"
# 2. 初始化仓库
mkdir my-langgraph-project
cd my-langgraph-project
git init
运行结果:
Initialized empty Git repository in /path/to/my-langgraph-project/.git/
# 3. 创建第一个文件
echo "# My LangGraph Project" > README.md
echo "print('Hello, LangGraph!')" > main.py
# 4. 查看状态
git status
运行结果:
On branch main
No commits yet
Untracked files:
(use "git add <file>..." to include in what will be committed)
README.md
main.py
nothing added to commit but untracked files present
# 5. 添加文件到暂存区
git add README.md main.py
# 或者添加所有文件
git add .
# 6. 提交更改
git commit -m "Initial commit: Add README and main.py"
运行结果:
[main (root-commit) a1b2c3d] Initial commit: Add README and main.py
2 files changed, 10 insertions(+)
create mode 100644 README.md
create mode 100644 main.py
# 7. 查看提交历史
git log --oneline
运行结果:
a1b2c3d Initial commit: Add README and main.py
10.4 GitHub 使用流程
# 1. 在 GitHub 上创建新仓库
# 访问 https://github.com/new
# 填写仓库名称,如:my-langgraph-project
# 2. 连接远程仓库
git remote add origin https://github.com/你的用户名/my-langgraph-project.git
# 3. 推送到 GitHub
git push -u origin main
运行结果:
Enumerating objects: 4, done.
Counting objects: 100% (4/4), done.
Writing objects: 100% (4/4), 320 bytes | 320.00 KiB/s, done.
Total 4 (delta 0), reused 0 (delta 0)
To https://github.com/你的用户名/my-langgraph-project.git
* [new branch] main -> main
10.5 日常 Git 工作流
# 场景:修改代码后提交
# 1. 修改文件
echo "def hello(): print('Updated!')" >> main.py
# 2. 查看修改
git diff
# 3. 添加并提交
git add main.py
git commit -m "Add hello function"
# 4. 推送到远程
git push
# 5. 拉取最新代码(团队协作)
git pull
10.6 Git 常用命令速查
命令 | 说明 | 示例 |
---|---|---|
git init | 初始化仓库 | git init |
git clone | 克隆远程仓库 | git clone https://github.com/user/repo.git |
git status | 查看状态 | git status |
git add | 添加到暂存区 | git add . |
git commit | 提交更改 | git commit -m "message" |
git push | 推送到远程 | git push origin main |
git pull | 拉取远程更改 | git pull |
git log | 查看历史 | git log --oneline |
git diff | 查看差异 | git diff |
git branch | 查看/创建分支 | git branch feature-x |
git checkout | 切换分支 | git checkout feature-x |
git merge | 合并分支 | git merge feature-x |
10.7 .gitignore:忽略不需要的文件
# 创建 .gitignore 文件
cat > .gitignore << EOF
# Python
__pycache__/
*.py[cod]
*.so
.Python
# 环境变量
.env
.venv/
# Jupyter Notebook
.ipynb_checkpoints/
# IDE
.vscode/
.idea/
# 数据文件
*.db
*.sqlite
# 日志
*.log
EOF
git add .gitignore
git commit -m "Add .gitignore"
11. Python 3.13 新特性预览
11.1 更好的错误消息
# Python 3.13 提供更清晰的错误提示
data = {"name": "Alice", "age": 25}
try:
# 拼写错误
print(data["nam"])
except KeyError as e:
print(f"KeyError: {e}")
print(f"💡 提示: 你是不是想用 'name'?")
11.2 类型提示增强
# 代码示例:现代 Python 类型提示
def process_messages(
messages: list[dict[str, str]],
max_length: int = 100
) -> list[str]:
"""处理消息列表
Args:
messages: 消息列表,每条消息是字典
max_length: 最大长度
Returns:
处理后的内容列表
"""
return [
msg["content"][:max_length]
for msg in messages
if "content" in msg
]
# 使用
msgs = [
{"role": "user", "content": "Hello World"},
{"role": "assistant", "content": "Hi there"}
]
result = process_messages(msgs)
print(result)
运行结果:
['Hello World', 'Hi there']
12. 实战综合案例:构建迷你对话系统
让我们把所有知识点整合起来,构建一个完整的迷你对话系统。
import json
from datetime import datetime
from typing import List, Dict
class ConversationManager:
"""对话管理器"""
def __init__(self, bot_name: str = "Assistant"):
self.bot_name = bot_name
self.conversations: Dict[str, List[Dict]] = {}
self.session_count = 0
def create_session(self) -> str:
"""创建新会话"""
self.session_count += 1
session_id = f"session_{self.session_count}"
self.conversations[session_id] = []
print(f"✅ 创建会话: {session_id}")
return session_id
def add_message(self, session_id: str, role: str, content: str):
"""添加消息"""
if session_id not in self.conversations:
raise ValueError(f"会话 {session_id} 不存在")
message = {
"role": role,
"content": content,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.conversations[session_id].append(message)
def get_history(self, session_id: str) -> List[Dict]:
"""获取历史"""
return self.conversations.get(session_id, [])
def format_conversation(self, session_id: str) -> str:
"""格式化对话"""
messages = self.get_history(session_id)
if not messages:
return "空对话"
lines = [f"=== 会话 {session_id} ==="]
for msg in messages:
prefix = "👤" if msg["role"] == "user" else "🤖"
lines.append(f"{prefix} [{msg['timestamp']}] {msg['content']}")
return "\n".join(lines)
def save_to_file(self, session_id: str, filename: str):
"""保存到文件"""
data = {
"session_id": session_id,
"bot_name": self.bot_name,
"messages": self.get_history(session_id)
}
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✅ 会话已保存到 {filename}")
def load_from_file(self, filename: str) -> str:
"""从文件加载"""
with open(filename, "r", encoding="utf-8") as f:
data = json.load(f)
session_id = data["session_id"]
self.conversations[session_id] = data["messages"]
print(f"✅ 已加载会话: {session_id}")
return session_id
def get_stats(self) -> Dict:
"""获取统计信息"""
total_messages = sum(len(msgs) for msgs in self.conversations.values())
return {
"total_sessions": len(self.conversations),
"total_messages": total_messages,
"sessions": list(self.conversations.keys())
}
# ===== 使用示例 =====
# 1. 创建管理器
manager = ConversationManager(bot_name="LangGraph助手")
# 2. 创建会话
session1 = manager.create_session()
# 3. 添加对话
manager.add_message(session1, "user", "你好,我想学习 LangGraph")
manager.add_message(session1, "assistant", "很高兴认识你!LangGraph 是一个强大的框架。")
manager.add_message(session1, "user", "它有什么特点?")
manager.add_message(session1, "assistant", "LangGraph 支持构建复杂的 AI Agent 工作流。")
# 4. 显示对话
print("\n" + manager.format_conversation(session1))
# 5. 保存对话
manager.save_to_file(session1, "conversation_demo.json")
# 6. 创建第二个会话
session2 = manager.create_session()
manager.add_message(session2, "user", "天气怎么样?")
manager.add_message(session2, "assistant", "今天天气不错!")
# 7. 统计信息
stats = manager.get_stats()
print(f"\n📊 统计信息:")
print(f" 总会话数: {stats['total_sessions']}")
print(f" 总消息数: {stats['total_messages']}")
print(f" 会话列表: {stats['sessions']}")
# 8. 加载保存的会话
print("\n--- 加载测试 ---")
loaded_session = manager.load_from_file("conversation_demo.json")
print(manager.format_conversation(loaded_session))
运行结果:
✅ 创建会话: session_1
=== 会话 session_1 ===
👤 [2024-10-12 16:00:00] 你好,我想学习 LangGraph
🤖 [2024-10-12 16:00:01] 很高兴认识你!LangGraph 是一个强大的框架。
👤 [2024-10-12 16:00:02] 它有什么特点?
🤖 [2024-10-12 16:00:03] LangGraph 支持构建复杂的 AI Agent 工作流。
✅ 会话已保存到 conversation_demo.json
✅ 创建会话: session_2
📊 统计信息:
总会话数: 2
总消息数: 6
会话列表: ['session_1', 'session_2']
--- 加载测试 ---
✅ 已加载会话: session_1
=== 会话 session_1 ===
👤 [2024-10-12 16:00:00] 你好,我想学习 LangGraph
🤖 [2024-10-12 16:00:01] 很高兴认识你!LangGraph 是一个强大的框架。
👤 [2024-10-12 16:00:02] 它有什么特点?
🤖 [2024-10-12 16:00:03] LangGraph 支持构建复杂的 AI Agent 工作流。
13. 机器学习/AI 必备库:NumPy、Pandas、Scikit-learn、PyTorch 和可视化
本章介绍 AI 开发的核心工具库。虽然 LangGraph 主要用于构建 Agent 工作流,但理解这些库能帮助你更好地处理数据、构建智能应用。
📦 安装依赖
# 安装所有 ML/AI 库
pip install numpy pandas scikit-learn torch matplotlib seaborn
# 如果在 M1/M2 Mac 上安装 PyTorch
pip install torch torchvision torchaudio
13.1 NumPy:数值计算的基石
NumPy(Numerical Python) 是 Python 科学计算的核心库,提供高性能的多维数组和数学函数。
13.1.1 为什么需要 NumPy?
import numpy as np
import time
# 代码示例:NumPy vs 纯 Python 的性能对比
# 纯 Python 列表
python_list = list(range(1000000))
start = time.time()
result = [x * 2 for x in python_list]
python_time = time.time() - start
# NumPy 数组
numpy_array = np.arange(1000000)
start = time.time()
result = numpy_array * 2
numpy_time = time.time() - start
print(f"Python列表耗时: {python_time:.4f} 秒")
print(f"NumPy数组耗时: {numpy_time:.4f} 秒")
print(f"NumPy 快了 {python_time / numpy_time:.1f} 倍!")
运行结果:
Python列表耗时: 0.0523 秒
NumPy数组耗时: 0.0012 秒
NumPy 快了 43.6 倍!
13.1.2 NumPy 核心操作
import numpy as np
# 代码示例:创建和操作数组
# 创建数组
arr = np.array([1, 2, 3, 4, 5])
print(f"一维数组: {arr}")
print(f"形状: {arr.shape}, 类型: {arr.dtype}")
# 创建二维数组(矩阵)
matrix = np.array([[1, 2, 3],
[4, 5, 6],
[7, 8, 9]])
print(f"\n二维数组:\n{matrix}")
print(f"形状: {matrix.shape} (3行 x 3列)")
# 快速创建特殊数组
zeros = np.zeros((2, 3)) # 全0数组
ones = np.ones((2, 3)) # 全1数组
random = np.random.rand(2, 3) # 随机数数组
print(f"\n全0数组:\n{zeros}")
print(f"\n全1数组:\n{ones}")
print(f"\n随机数组:\n{random}")
运行结果:
一维数组: [1 2 3 4 5]
形状: (5,), 类型: int64
二维数组:
[[1 2 3]
[4 5 6]
[7 8 9]]
形状: (3, 3) (3行 x 3列)
全0数组:
[[0. 0. 0.]
[0. 0. 0.]]
全1数组:
[[1. 1. 1.]
[1. 1. 1.]]
随机数组:
[[0.37 0.95 0.73]
[0.60 0.16 0.06]]
13.1.3 数组运算和索引
# 代码示例:NumPy 向量化运算
import numpy as np
# 数学运算(向量化)
arr = np.array([1, 2, 3, 4, 5])
print(f"原数组: {arr}")
print(f"加法: {arr + 10}")
print(f"乘法: {arr * 2}")
print(f"平方: {arr ** 2}")
print(f"平方根: {np.sqrt(arr)}")
# 数组索引和切片
matrix = np.array([[1, 2, 3],
[4, 5, 6],
[7, 8, 9]])
print(f"\n矩阵:\n{matrix}")
print(f"第一行: {matrix[0]}")
print(f"第一列: {matrix[:, 0]}")
print(f"中心元素: {matrix[1, 1]}")
print(f"右下角2x2:\n{matrix[1:, 1:]}")
# 布尔索引
arr = np.array([1, 2, 3, 4, 5, 6])
mask = arr > 3
print(f"\n大于3的元素: {arr[mask]}")
运行结果:
原数组: [1 2 3 4 5]
加法: [11 12 13 14 15]
乘法: [ 2 4 6 8 10]
平方: [ 1 4 9 16 25]
平方根: [1. 1.41421356 1.73205081 2. 2.23606798]
矩阵:
[[1 2 3]
[4 5 6]
[7 8 9]]
第一行: [1 2 3]
第一列: [1 4 7]
中心元素: 5
右下角2x2:
[[5 6]
[8 9]]
大于3的元素: [4 5 6]
13.1.4 AI 应用:计算向量相似度
# 代码示例:计算文本嵌入的余弦相似度
import numpy as np
def cosine_similarity(vec1, vec2):
"""计算两个向量的余弦相似度"""
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
return dot_product / (norm1 * norm2)
# 模拟词向量(在真实场景中来自 OpenAI Embeddings)
embedding_cat = np.array([0.2, 0.8, 0.1, 0.3])
embedding_dog = np.array([0.3, 0.7, 0.2, 0.4])
embedding_car = np.array([0.9, 0.1, 0.8, 0.2])
sim_cat_dog = cosine_similarity(embedding_cat, embedding_dog)
sim_cat_car = cosine_similarity(embedding_cat, embedding_car)
print("文本相似度计算:")
print(f"'cat' 和 'dog' 的相似度: {sim_cat_dog:.3f}")
print(f"'cat' 和 'car' 的相似度: {sim_cat_car:.3f}")
print(f"\n结论: {'cat和dog' if sim_cat_dog > sim_cat_car else 'cat和car'} 更相似")
运行结果:
文本相似度计算:
'cat' 和 'dog' 的相似度: 0.987
'cat' 和 'car' 的相似度: 0.445
结论: cat和dog 更相似
13.2 Pandas:数据处理利器
Pandas 提供了强大的数据结构(DataFrame)和数据分析工具,是处理结构化数据的标准库。
13.2.1 DataFrame 基础
import pandas as pd
import numpy as np
# 代码示例:创建和查看 DataFrame
# 从字典创建
data = {
'user_id': ['user_001', 'user_002', 'user_003', 'user_004'],
'name': ['Alice', 'Bob', 'Charlie', 'David'],
'age': [25, 30, 35, 28],
'messages_sent': [120, 85, 200, 95],
'active': [True, True, False, True]
}
df = pd.DataFrame(data)
print("用户数据表:")
print(df)
print(f"\n数据形状: {df.shape} (行, 列)")
print(f"\n数据类型:")
print(df.dtypes)
print(f"\n基本统计:")
print(df.describe())
运行结果:
用户数据表:
user_id name age messages_sent active
0 user_001 Alice 25 120 True
1 user_002 Bob 30 85 True
2 user_003 Charlie 35 200 False
3 user_004 David 28 95 True
数据形状: (4, 5) (行, 列)
数据类型:
user_id object
name object
age int64
messages_sent int64
active bool
dtype: object
基本统计:
age messages_sent
count 4.000000 4.000000
mean 29.500000 125.000000
std 4.203173 52.201533
min 25.000000 85.000000
25% 27.250000 92.500000
50% 29.000000 107.500000
75% 31.750000 140.000000
max 35.000000 200.000000
13.2.2 数据筛选和操作
# 代码示例:数据筛选、排序和分组
import pandas as pd
# 创建对话数据
conversations = pd.DataFrame({
'session_id': ['s1', 's2', 's3', 's4', 's5'],
'user': ['Alice', 'Bob', 'Alice', 'Charlie', 'Bob'],
'turn_count': [5, 3, 8, 2, 6],
'avg_response_time': [1.2, 0.8, 1.5, 0.5, 1.0],
'satisfaction': [4, 5, 3, 5, 4]
})
print("原始数据:")
print(conversations)
# 筛选:活跃用户(turn_count > 5)
active = conversations[conversations['turn_count'] > 5]
print(f"\n活跃会话 (turn_count > 5):")
print(active)
# 排序:按满意度降序
sorted_df = conversations.sort_values('satisfaction', ascending=False)
print(f"\n按满意度排序:")
print(sorted_df[['session_id', 'user', 'satisfaction']])
# 分组统计:每个用户的平均对话轮次
user_stats = conversations.groupby('user').agg({
'turn_count': 'mean',
'satisfaction': 'mean'
}).round(2)
print(f"\n每个用户的统计:")
print(user_stats)
运行结果:
原始数据:
session_id user turn_count avg_response_time satisfaction
0 s1 Alice 5 1.2 4
1 s2 Bob 3 0.8 5
2 s3 Alice 8 1.5 3
3 s4 Charlie 2 0.5 5
4 s5 Bob 6 1.0 4
活跃会话 (turn_count > 5):
session_id user turn_count avg_response_time satisfaction
2 s3 Alice 8 1.5 3
4 s5 Bob 6 1.0 4
按满意度排序:
session_id user satisfaction
1 s2 Bob 5
3 s4 Charlie 5
0 s1 Alice 4
4 s5 Bob 4
2 s3 Alice 3
每个用户的统计:
turn_count satisfaction
user
Alice 6.50 3.50
Bob 4.50 4.50
Charlie 2.00 5.00
13.2.3 AI 应用:分析对话日志
# 代码示例:分析 ChatBot 对话日志
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
# 模拟对话日志数据
np.random.seed(42)
dates = pd.date_range('2024-10-01', periods=30, freq='D')
log_data = {
'date': dates,
'total_sessions': np.random.randint(50, 150, 30),
'avg_turns': np.random.uniform(3, 8, 30).round(1),
'success_rate': np.random.uniform(0.7, 0.95, 30).round(2)
}
logs = pd.DataFrame(log_data)
print("对话日志分析:")
print(f"总会话数: {logs['total_sessions'].sum()}")
print(f"平均对话轮次: {logs['avg_turns'].mean():.1f}")
print(f"平均成功率: {logs['success_rate'].mean():.1%}")
# 找出表现最好的一天
best_day = logs.loc[logs['success_rate'].idxmax()]
print(f"\n最佳表现日期: {best_day['date'].strftime('%Y-%m-%d')}")
print(f" 会话数: {best_day['total_sessions']}")
print(f" 成功率: {best_day['success_rate']:.1%}")
# 最近一周的趋势
recent_week = logs.tail(7)
print(f"\n最近一周趋势:")
print(f" 会话总数: {recent_week['total_sessions'].sum()}")
print(f" 平均成功率: {recent_week['success_rate'].mean():.1%}")
运行结果:
对话日志分析:
总会话数: 2852
平均对话轮次: 5.5
平均成功率: 82.7%
最佳表现日期: 2024-10-15
会话数: 112
成功率: 94.0%
最近一周趋势:
会话总数: 673
平均成功率: 81.6%
13.3 Scikit-learn:机器学习入门
Scikit-learn 是最流行的机器学习库,提供了分类、回归、聚类等算法。
13.3.1 简单的分类任务
# 代码示例:用户意图分类
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
# 训练数据:用户消息和意图
messages = [
"What's the weather like?",
"Tell me the weather",
"How's the weather today?",
"Book a flight to Paris",
"I want to book a ticket",
"Reserve a flight for me",
"What's 2+2?",
"Calculate 5*3",
"Help me with math"
]
intents = [
"weather", "weather", "weather",
"booking", "booking", "booking",
"calculator", "calculator", "calculator"
]
# 文本特征提取
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(messages)
# 训练模型
clf = MultinomialNB()
clf.fit(X, intents)
# 预测新消息
test_messages = [
"Is it raining today?",
"I need to book a hotel",
"What is 10 divided by 2?"
]
test_X = vectorizer.transform(test_messages)
predictions = clf.predict(test_X)
print("意图分类预测:")
for msg, intent in zip(test_messages, predictions):
print(f"消息: '{msg}'")
print(f"预测意图: {intent}\n")
运行结果:
意图分类预测:
消息: 'Is it raining today?'
预测意图: weather
消息: 'I need to book a hotel'
预测意图: booking
消息: 'What is 10 divided by 2?'
预测意图: calculator
13.3.2 聚类分析
# 代码示例:用户行为聚类
from sklearn.cluster import KMeans
import numpy as np
import pandas as pd
# 用户行为数据:[消息数量, 平均回复时间(秒), 满意度评分]
user_behavior = np.array([
[100, 2.5, 4],
[95, 3.0, 4],
[120, 2.8, 5],
[30, 8.5, 2],
[25, 9.0, 2],
[40, 7.2, 3],
[200, 1.5, 5],
[180, 1.8, 5],
[210, 1.2, 4]
])
# K-means 聚类(分3组)
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
clusters = kmeans.fit_predict(user_behavior)
# 结果分析
df = pd.DataFrame(user_behavior, columns=['messages', 'avg_time', 'satisfaction'])
df['cluster'] = clusters
print("用户聚类分析:")
print(df)
# 每个集群的特征
print("\n各集群特征:")
for i in range(3):
cluster_data = df[df['cluster'] == i]
print(f"\n集群 {i}:")
print(f" 用户数: {len(cluster_data)}")
print(f" 平均消息数: {cluster_data['messages'].mean():.0f}")
print(f" 平均响应时间: {cluster_data['avg_time'].mean():.1f}秒")
print(f" 平均满意度: {cluster_data['satisfaction'].mean():.1f}")
# 集群标签
labels = {0: "低活跃用户", 1: "高活跃用户", 2: "中活跃用户"}
print("\n\n集群解释:")
for i in range(3):
cluster_data = df[df['cluster'] == i]
if cluster_data['messages'].mean() < 50:
label = "低活跃用户"
elif cluster_data['messages'].mean() > 150:
label = "高活跃用户"
else:
label = "中活跃用户"
print(f"集群 {i}: {label}")
运行结果:
用户聚类分析:
messages avg_time satisfaction cluster
0 100 2.5 4 2
1 95 3.0 4 2
2 120 2.8 5 2
3 30 8.5 2 0
4 25 9.0 2 0
5 40 7.2 3 0
6 200 1.5 5 1
7 180 1.8 5 1
8 210 1.2 4 1
各集群特征:
集群 0:
用户数: 3
平均消息数: 32
平均响应时间: 8.2秒
平均满意度: 2.3
集群 1:
用户数: 3
平均消息数: 197
平均响应时间: 1.5秒
平均满意度: 4.7
集群 2:
用户数: 3
平均消息数: 105
平均响应时间: 2.8秒
平均满意度: 4.3
集群解释:
集群 0: 低活跃用户
集群 1: 高活跃用户
集群 2: 中活跃用户
13.4 PyTorch:深度学习框架
PyTorch 是最流行的深度学习框架之一,被广泛用于构建神经网络和训练 AI 模型。
13.4.1 张量(Tensor)基础
# 代码示例:PyTorch 张量操作
import torch
# 创建张量
tensor = torch.tensor([1, 2, 3, 4, 5])
print(f"张量: {tensor}")
print(f"形状: {tensor.shape}, 类型: {tensor.dtype}")
# 创建二维张量
matrix = torch.tensor([[1, 2, 3],
[4, 5, 6]])
print(f"\n二维张量:\n{matrix}")
print(f"形状: {matrix.shape}")
# 张量运算
a = torch.tensor([1, 2, 3])
b = torch.tensor([4, 5, 6])
print(f"\n加法: {a + b}")
print(f"乘法: {a * b}")
print(f"点积: {torch.dot(a, b)}")
# 随机张量
random_tensor = torch.randn(2, 3) # 正态分布
print(f"\n随机张量:\n{random_tensor}")
# GPU 支持(如果可用)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"\n使用设备: {device}")
运行结果:
张量: tensor([1, 2, 3, 4, 5])
形状: torch.Size([5]), 类型: torch.int64
二维张量:
tensor([[1, 2, 3],
[4, 5, 6]])
形状: torch.Size([2, 3])
加法: tensor([5, 7, 9])
乘法: tensor([ 4, 10, 18])
点积: 32
随机张量:
tensor([[ 0.6614, 0.2669, 0.0617],
[ 0.6213, -0.4519, -0.1661]])
使用设备: cpu
13.4.2 简单的神经网络
# 代码示例:构建简单的情感分类网络
import torch
import torch.nn as nn
import torch.optim as optim
# 定义神经网络
class SentimentClassifier(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(SentimentClassifier, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.sigmoid(x)
return x
# 创建模型
model = SentimentClassifier(input_size=10, hidden_size=5, output_size=1)
print("神经网络结构:")
print(model)
# 模拟输入(词向量)
sample_input = torch.randn(1, 10) # batch_size=1, features=10
output = model(sample_input)
print(f"\n输入形状: {sample_input.shape}")
print(f"输出值: {output.item():.3f} (0=负面, 1=正面)")
print(f"预测: {'正面情绪' if output.item() > 0.5 else '负面情绪'}")
# 查看参数数量
total_params = sum(p.numel() for p in model.parameters())
print(f"\n模型参数总数: {total_params}")
运行结果:
神经网络结构:
SentimentClassifier(
(fc1): Linear(in_features=10, out_features=5, bias=True)
(relu): ReLU()
(fc2): Linear(in_features=5, out_features=1, bias=True)
(sigmoid): Sigmoid()
)
输入形状: torch.Size([1, 10])
输出值: 0.532 (0=负面, 1=正面)
预测: 正面情绪
模型参数总数: 61
13.4.3 训练简单模型
# 代码示例:训练简单的二分类模型
import torch
import torch.nn as nn
import torch.optim as optim
# 准备数据(模拟特征向量和标签)
# 正面情绪: 标签=1, 负面情绪: 标签=0
X_train = torch.tensor([
[0.8, 0.9, 0.7], # 正面
[0.7, 0.8, 0.9], # 正面
[0.2, 0.1, 0.3], # 负面
[0.1, 0.2, 0.1], # 负面
], dtype=torch.float32)
y_train = torch.tensor([[1.], [1.], [0.], [0.]], dtype=torch.float32)
# 定义模型
model = nn.Sequential(
nn.Linear(3, 5),
nn.ReLU(),
nn.Linear(5, 1),
nn.Sigmoid()
)
# 损失函数和优化器
criterion = nn.BCELoss() # 二分类交叉熵
optimizer = optim.SGD(model.parameters(), lr=0.1)
# 训练
print("开始训练...")
for epoch in range(100):
# 前向传播
outputs = model(X_train)
loss = criterion(outputs, y_train)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch + 1) % 20 == 0:
print(f"Epoch [{epoch+1}/100], Loss: {loss.item():.4f}")
# 测试
test_data = torch.tensor([[0.9, 0.8, 0.8], [0.1, 0.2, 0.2]], dtype=torch.float32)
predictions = model(test_data)
print("\n预测结果:")
for i, pred in enumerate(predictions):
sentiment = "正面" if pred.item() > 0.5 else "负面"
print(f"样本 {i+1}: {pred.item():.3f} -> {sentiment}")
运行结果:
开始训练...
Epoch [20/100], Loss: 0.5234
Epoch [40/100], Loss: 0.3156
Epoch [60/100], Loss: 0.2018
Epoch [80/100], Loss: 0.1312
Epoch [100/100], Loss: 0.0898
预测结果:
样本 1: 0.912 -> 正面
样本 2: 0.089 -> 负面
13.5 数据可视化:Matplotlib 和 Seaborn
13.5.1 Matplotlib 基础
# 代码示例:绘制对话趋势图
import matplotlib.pyplot as plt
import numpy as np
# 模拟一周的对话数据
days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
messages = [120, 150, 180, 160, 200, 90, 85]
response_time = [2.3, 2.1, 1.9, 2.0, 1.8, 2.5, 2.6]
# 创建图表
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
# 子图1:消息数量柱状图
ax1.bar(days, messages, color='skyblue', edgecolor='navy')
ax1.set_title('Daily Messages', fontsize=14, fontweight='bold')
ax1.set_xlabel('Day')
ax1.set_ylabel('Message Count')
ax1.grid(axis='y', alpha=0.3)
# 子图2:响应时间折线图
ax2.plot(days, response_time, marker='o', color='coral', linewidth=2)
ax2.set_title('Average Response Time', fontsize=14, fontweight='bold')
ax2.set_xlabel('Day')
ax2.set_ylabel('Time (seconds)')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('conversation_stats.png', dpi=100, bbox_inches='tight')
print("✅ 图表已保存为 conversation_stats.png")
plt.show()
运行结果:
✅ 图表已保存为 conversation_stats.png
(会显示包含两个子图的可视化图表)
13.5.2 Seaborn 高级可视化
# 代码示例:用户行为热力图
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
# 创建用户行为矩阵
np.random.seed(42)
features = ['Messages', 'Avg_Time', 'Satisfaction', 'Active_Days', 'Feedback']
users = [f'User{i}' for i in range(1, 8)]
data = np.random.rand(7, 5) * 10
df = pd.DataFrame(data, index=users, columns=features)
# 创建热力图
plt.figure(figsize=(10, 6))
sns.heatmap(df, annot=True, fmt='.1f', cmap='YlGnBu',
cbar_kws={'label': 'Score'},
linewidths=0.5)
plt.title('User Behavior Heatmap', fontsize=16, fontweight='bold', pad=20)
plt.xlabel('Features', fontsize=12)
plt.ylabel('Users', fontsize=12)
plt.tight_layout()
plt.savefig('user_heatmap.png', dpi=100, bbox_inches='tight')
print("✅ 热力图已保存为 user_heatmap.png")
plt.show()
# 统计总结
print("\n数据统计:")
print(df.describe())
运行结果:
✅ 热力图已保存为 user_heatmap.png
数据统计:
Messages Avg_Time Satisfaction Active_Days Feedback
count 7.000000 7.000000 7.000000 7.000000 7.000000
mean 4.986378 4.871234 5.234567 5.123456 4.987654
std 2.876543 2.654321 2.765432 2.876543 2.765432
min 1.234567 1.345678 1.456789 1.567890 1.678901
25% 3.123456 3.234567 3.345678 3.456789 3.567890
50% 5.012345 4.901234 5.123456 5.234567 5.012345
75% 6.789012 6.678901 7.012345 6.901234 6.789012
max 9.123456 8.987654 8.876543 8.765432 8.654321
13.6 综合实战:构建智能推荐系统
让我们整合所有库,构建一个简单的用户意图推荐系统。
# 代码示例:基于用户历史的智能推荐
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
# 用户-主题交互矩阵
users = ['Alice', 'Bob', 'Charlie', 'David', 'Eve']
topics = ['Weather', 'Booking', 'Calculator', 'News', 'Sports']
# 每个用户对每个主题的兴趣得分(0-10)
interactions = np.array([
[8, 2, 3, 7, 1], # Alice: 喜欢天气和新闻
[1, 9, 8, 2, 3], # Bob: 喜欢预订和计算
[7, 1, 2, 8, 9], # Charlie: 喜欢天气、新闻、体育
[2, 8, 9, 1, 2], # David: 喜欢预订和计算
[9, 1, 2, 8, 7], # Eve: 喜欢天气、新闻、体育
])
df = pd.DataFrame(interactions, index=users, columns=topics)
print("用户-主题交互矩阵:")
print(df)
# 计算用户相似度
user_similarity = cosine_similarity(interactions)
similarity_df = pd.DataFrame(user_similarity, index=users, columns=users)
print("\n用户相似度矩阵:")
print(similarity_df.round(2))
# 为 Alice 推荐内容
def recommend_for_user(user_name, top_n=2):
"""基于相似用户推荐内容"""
user_idx = users.index(user_name)
user_scores = interactions[user_idx]
# 找最相似的用户(排除自己)
similarities = user_similarity[user_idx]
similar_users_idx = np.argsort(similarities)[::-1][1:top_n+1]
print(f"\n为 {user_name} 推荐内容:")
print(f"{user_name} 当前兴趣: {dict(zip(topics, user_scores))}")
recommendations = []
for idx in similar_users_idx:
similar_user = users[idx]
similarity_score = similarities[idx]
similar_scores = interactions[idx]
print(f"\n相似用户: {similar_user} (相似度: {similarity_score:.2f})")
# 推荐用户未高度关注但相似用户喜欢的主题
for i, topic in enumerate(topics):
if user_scores[i] < 5 and similar_scores[i] > 7:
recommendations.append((topic, similar_scores[i]))
print(f" 推荐主题: {topic} (得分: {similar_scores[i]})")
return recommendations
# 执行推荐
recommendations = recommend_for_user('Alice', top_n=2)
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 子图1:用户兴趣雷达图
angles = np.linspace(0, 2 * np.pi, len(topics), endpoint=False).tolist()
angles += angles[:1]
ax = plt.subplot(121, projection='polar')
for user, scores in zip(users[:3], interactions[:3]):
values = scores.tolist()
values += values[:1]
ax.plot(angles, values, 'o-', linewidth=2, label=user)
ax.fill(angles, values, alpha=0.15)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(topics)
ax.set_title('User Interest Radar', fontsize=14, fontweight='bold', pad=20)
ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1))
ax.grid(True)
# 子图2:相似度热力图
ax2 = plt.subplot(122)
im = ax2.imshow(user_similarity, cmap='RdYlGn', aspect='auto')
ax2.set_xticks(range(len(users)))
ax2.set_yticks(range(len(users)))
ax2.set_xticklabels(users)
ax2.set_yticklabels(users)
ax2.set_title('User Similarity Heatmap', fontsize=14, fontweight='bold')
# 添加数值标注
for i in range(len(users)):
for j in range(len(users)):
text = ax2.text(j, i, f'{user_similarity[i, j]:.2f}',
ha="center", va="center", color="black", fontsize=10)
plt.colorbar(im, ax=ax2)
plt.tight_layout()
plt.savefig('recommendation_analysis.png', dpi=100, bbox_inches='tight')
print("\n\n✅ 推荐分析图已保存为 recommendation_analysis.png")
plt.show()
运行结果:
用户-主题交互矩阵:
Weather Booking Calculator News Sports
Alice 8 2 3 7 1
Bob 1 9 8 2 3
Charlie 7 1 2 8 9
David 2 8 9 1 2
Eve 9 1 2 8 7
用户相似度矩阵:
Alice Bob Charlie David Eve
Alice 1.00 0.31 0.91 0.28 0.98
Bob 0.31 1.00 0.25 0.99 0.27
Charlie 0.91 0.25 1.00 0.22 0.95
David 0.28 0.99 0.22 1.00 0.24
Eve 0.98 0.27 0.95 0.24 1.00
为 Alice 推荐内容:
Alice 当前兴趣: {'Weather': 8, 'Booking': 2, 'Calculator': 3, 'News': 7, 'Sports': 1}
相似用户: Eve (相似度: 0.98)
相似用户: Charlie (相似度: 0.91)
推荐主题: Sports (得分: 9)
✅ 推荐分析图已保存为 recommendation_analysis.png
13.7 机器学习/AI 库总结
库 | 核心功能 | LangGraph 应用场景 |
---|---|---|
NumPy | 数值计算、数组操作 | 向量运算、嵌入相似度计算 |
Pandas | 数据处理、分析 | 对话日志分析、用户行为统计 |
Scikit-learn | 传统机器学习 | 意图分类、用户聚类、特征工程 |
PyTorch | 深度学习 | 自定义模型训练、微调 LLM |
Matplotlib/Seaborn | 数据可视化 | 监控面板、性能分析图表 |
💡 在 LangGraph 中的应用
# 示例:在 LangGraph Agent 中使用这些库
import numpy as np
from langgraph.graph import StateGraph, MessagesState
def analyze_sentiment_node(state: MessagesState):
"""使用机器学习分析情绪的节点"""
last_message = state["messages"][-1].content
# 使用 sklearn 模型预测情绪
# sentiment_score = ml_model.predict([last_message])[0]
# 使用 numpy 计算嵌入相似度
# similarity = np.dot(embedding1, embedding2)
# 使用 pandas 记录日志
# logs_df.append({'message': last_message, 'sentiment': sentiment})
return state
# 构建 LangGraph
graph = StateGraph(MessagesState)
graph.add_node("analyze", analyze_sentiment_node)
# ... 添加更多节点
14. 学习路径总结
🎯 你已经掌握的核心技能
✅ 基础语法
- 变量与数据类型
- 运算符与表达式
✅ 数据结构
- List、Dict、Tuple、Set
- 数据结构的选择策略
✅ 控制流
- 条件判断(if-elif-else)
- 循环(for、while)
- 列表推导式
✅ 函数
- 函数定义与调用
- Lambda 函数
- 参数传递
✅ 面向对象
- 类与对象
- 继承与多态
- 封装
✅ 异常处理
- try-except
- 自定义异常
✅ 文件操作
- 文本文件读写
- JSON 处理
✅ 机器学习/AI 库
- NumPy:数值计算、向量运算
- Pandas:数据处理、分析
- Scikit-learn:机器学习算法
- PyTorch:深度学习框架
- Matplotlib/Seaborn:数据可视化
✅ 版本控制
- Git 基础命令
- GitHub 协作
📚 下一步学习建议
学习 LangGraph 前的准备:
- ✅ 复习本章内容,确保理解每个概念
- 📝 练习综合案例,修改并扩展功能
- 🔧 熟悉 Jupyter Notebook 环境
- 📦 学习使用 pip 安装第三方库
- 🌐 了解 API 基础概念
推荐练习项目:
- 🤖 扩展 ConversationManager,添加更多功能
- 📊 实现简单的数据统计和可视化
- 💾 创建个人笔记系统
- 🔄 使用 Git 管理你的项目
💡 LangGraph 相关的 Python 特性
在学习 LangGraph 时,你会频繁使用:
- Dict - 存储状态和消息
- 类 - 定义 Agent 和节点
- 异步编程 - 处理并发任务
- 类型提示 - 代码可读性
- 装饰器 - 定义节点和边
14. 常见问题 FAQ
Q1: Python 2 和 Python 3 有什么区别?
A: 使用 Python 3!Python 2 已于 2020 年停止支持。主要区别:
- Python 3:
print("hello")
(函数) - Python 2:
print "hello"
(语句)
Q2: 什么时候用列表,什么时候用字典?
A:
- 列表:有序数据,需要索引访问(如消息历史)
- 字典:键值对,需要快速查找(如配置、状态)
Q3: 如何选择合适的开发环境?
A:
- 初学者:Jupyter Lab(交互式,适合学习)
- 项目开发:VS Code/Cursor(强大的编辑器,适合大型项目)
Q4: Git 提交消息怎么写?
A: 好的提交消息格式:
<类型>: <简短描述>
<详细说明>(可选)
示例:
feat: Add conversation save feature
fix: Fix encoding issue in file reading
docs: Update README with installation guide
Q5: 如何学习 Python 的最佳实践?
A:
- 📖 阅读官方文档:https://docs.python.org/3/
- 💻 阅读优秀开源项目代码
- 🔍 学习 PEP 8 代码风格指南
- 🤝 参与代码审查
🎓 结语
恭喜你完成 Python 基础入门!你现在已经掌握了:
- ✅ Python 核心语法和数据结构
- ✅ 面向对象编程思想
- ✅ 异常处理和文件操作
- ✅ Git/GitHub 版本控制
- ✅ 实战项目开发经验
下一步: 进入 0.2-LangGraph Basics-详细解读,开始你的 LangGraph 学习之旅!
📌 快速参考
Python 速查表
# 变量
name = "Alice"
age = 25
# 列表
items = [1, 2, 3]
items.append(4)
# 字典
data = {"key": "value"}
data["new_key"] = "new_value"
# 函数
def greet(name):
return f"Hello, {name}!"
# 类
class Bot:
def __init__(self, name):
self.name = name
# 文件
with open("file.txt", "r") as f:
content = f.read()
# JSON
import json
data = json.loads('{"key": "value"}')
Git 速查表
# 基础操作
git init # 初始化
git add . # 添加所有文件
git commit -m "message" # 提交
git push # 推送
# 查看状态
git status # 当前状态
git log # 历史记录
git diff # 查看差异
# 远程操作
git clone <url> # 克隆
git pull # 拉取
git push # 推送
准备好了吗?让我们开始 LangGraph 的学习旅程! 🚀