Skip to content

10.3 机器学习实战:Scikit-Learn 完全指南

从数据到智能:机器学习的本质

在掌握了 NumPy 和 Pandas 这些数据处理工具后,我们终于可以回答这个核心问题:如何让计算机从数据中学习?

💡 核心概念:机器学习的本质是从数据中自动发现模式,并用这些模式对未见过的数据做出预测。

传统编程:

规则 + 数据 → 结果

机器学习:

数据 + 结果 → 规则

Scikit-Learn 是 Python 中最成熟、最易用的机器学习库,被工业界广泛采用。它提供了:

  • 统一的 API 设计(所有模型都有 fit()predict() 方法)
  • 丰富的算法实现(分类、回归、聚类、降维)
  • 完善的数据预处理工具
  • 强大的模型评估和选择功能

机器学习的三大范式

1. 监督学习(Supervised Learning)

定义:从有标签的数据中学习,预测新数据的标签。

python
训练数据:(特征, 标签) 对
例如:(房屋面积、位置、房龄, 房价)
     (邮件内容, 是否垃圾邮件)

两大子类

  • 分类(Classification):预测离散标签(如:垃圾邮件/正常邮件)
  • 回归(Regression):预测连续值(如:房价、温度)

2. 无监督学习(Unsupervised Learning)

定义:从无标签的数据中发现隐藏的结构或模式。

python
训练数据:只有特征,没有标签
例如:(用户浏览记录) → 用户分群
     (文章内容) → 主题聚类

常见任务

  • 聚类(Clustering):将相似的数据点分组
  • 降维(Dimensionality Reduction):压缩高维数据
  • 异常检测(Anomaly Detection):发现异常数据点

3. 强化学习(Reinforcement Learning)

定义:通过与环境交互,学习最优策略以最大化累积奖励。

python
Agent → 执行动作 → 环境反馈奖励 → Agent 学习

经典应用:AlphaGo、自动驾驶、机器人控制

📌 本节重点:我们将深入监督学习和无监督学习,强化学习在后续章节介绍。


Scikit-Learn 的设计哲学

统一的 Estimator API

所有 Scikit-Learn 模型都遵循相同的接口:

python
from sklearn.base import BaseEstimator, ClassifierMixin

# 所有模型都继承自 BaseEstimator
class MyModel(BaseEstimator, ClassifierMixin):
    def fit(self, X, y):
        """从训练数据中学习"""
        # 学习参数
        return self

    def predict(self, X):
        """对新数据做预测"""
        # 返回预测结果
        return predictions

    def score(self, X, y):
        """评估模型性能"""
        # 返回准确率、R²等指标
        return accuracy

这种统一的设计让我们可以轻松切换不同算法:

python
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

# 三个不同的模型,但 API 完全相同
models = [
    LogisticRegression(),
    RandomForestClassifier(),
    SVC()
]

for model in models:
    model.fit(X_train, y_train)  # 训练
    score = model.score(X_test, y_test)  # 评估
    print(f"{model.__class__.__name__}: {score:.4f}")

第一个机器学习项目:鸢尾花分类

让我们用经典的 Iris 数据集开始:

python
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from typing import Tuple

# 1. 加载数据
iris = load_iris()
X = iris.data  # 特征:花萼长度、花萼宽度、花瓣长度、花瓣宽度
y = iris.target  # 标签:0=setosa, 1=versicolor, 2=virginica

print(f"数据集形状: {X.shape}")
print(f"特征名称: {iris.feature_names}")
print(f"类别名称: {iris.target_names}")

# 转换为 DataFrame 便于分析
df = pd.DataFrame(X, columns=iris.feature_names)
df['species'] = iris.target_names[y]
print(df.head())

# 2. 数据分割(训练集 80%,测试集 20%)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"\n训练集: {X_train.shape}, 测试集: {X_test.shape}")
print(f"训练集类别分布: {np.bincount(y_train)}")
print(f"测试集类别分布: {np.bincount(y_test)}")

# 3. 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"\n标准化前均值: {X_train[:, 0].mean():.4f}, 标准差: {X_train[:, 0].std():.4f}")
print(f"标准化后均值: {X_train_scaled[:, 0].mean():.4f}, 标准差: {X_train_scaled[:, 0].std():.4f}")

# 4. 训练模型
model = LogisticRegression(max_iter=200, random_state=42)
model.fit(X_train_scaled, y_train)

# 5. 预测
y_pred = model.predict(X_test_scaled)

# 6. 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"\n准确率: {accuracy:.4f}")

# 详细的分类报告
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=iris.target_names))

输出示例

数据集形状: (150, 4)
训练集: (120, 4), 测试集: (30, 4)
准确率: 1.0000

分类报告:
              precision    recall  f1-score   support

      setosa       1.00      1.00      1.00        10
  versicolor       1.00      1.00      1.00         9
   virginica       1.00      1.00      1.00        11

    accuracy                           1.00        30

💡 关键步骤

  1. 加载数据:使用内置数据集或读取文件
  2. 数据分割:训练集用于学习,测试集用于评估
  3. 特征缩放:让不同尺度的特征在同一量级
  4. 训练模型:调用 fit() 方法
  5. 预测评估:用 predict() 和评估指标

数据预处理:机器学习的 80% 工作

1. 特征缩放(Feature Scaling)

为什么需要缩放?

许多机器学习算法(如 KNN、SVM、神经网络)对特征的尺度敏感。考虑这个例子:

python
# 两个特征:房屋面积(平方米)和房间数
X = np.array([
    [50, 2],    # 50 平米,2 个房间
    [150, 4],   # 150 平米,4 个房间
])

# 如果直接计算欧氏距离,面积的影响会远大于房间数
# 因为面积的数值范围 [50, 150],而房间数只有 [2, 4]

标准化(Standardization)

python
from sklearn.preprocessing import StandardScaler

# 公式:z = (x - μ) / σ
# 结果:均值=0,标准差=1
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

print("原始数据:")
print(X)
print("\n标准化后:")
print(X_scaled)
print(f"均值: {X_scaled.mean(axis=0)}")
print(f"标准差: {X_scaled.std(axis=0)}")

归一化(Normalization)

python
from sklearn.preprocessing import MinMaxScaler

# 公式:x' = (x - x_min) / (x_max - x_min)
# 结果:数据范围 [0, 1]
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)

print("归一化后:")
print(X_normalized)
print(f"最小值: {X_normalized.min(axis=0)}")
print(f"最大值: {X_normalized.max(axis=0)}")

⚠️ 重要:必须先在训练集上 fit(),再对训练集和测试集都进行 transform()

python
# ❌ 错误做法
X_train = scaler.fit_transform(X_train)
X_test = scaler.fit_transform(X_test)  # 会导致数据泄露!

# ✅ 正确做法
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)  # 使用训练集的统计量

2. 类别特征编码

python
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd

# 示例数据
data = pd.DataFrame({
    'city': ['Beijing', 'Shanghai', 'Beijing', 'Guangzhou', 'Shanghai'],
    'size': ['S', 'M', 'L', 'M', 'S'],
    'price': [100, 200, 150, 180, 120]
})

# 方法 1:标签编码(LabelEncoder)
# 适用于有序类别(如 S < M < L)
le = LabelEncoder()
data['size_encoded'] = le.fit_transform(data['size'])
print("标签编码:")
print(data[['size', 'size_encoded']])

# 方法 2:独热编码(One-Hot Encoding)
# 适用于无序类别(如城市)
data_onehot = pd.get_dummies(data, columns=['city'], prefix='city')
print("\n独热编码:")
print(data_onehot)

# 使用 Scikit-Learn 的 OneHotEncoder
from sklearn.preprocessing import OneHotEncoder

ohe = OneHotEncoder(sparse_output=False)
city_encoded = ohe.fit_transform(data[['city']])
print(f"\nOneHotEncoder 结果形状: {city_encoded.shape}")
print(ohe.get_feature_names_out())

3. 缺失值处理

python
from sklearn.impute import SimpleImputer
import numpy as np

# 创建带缺失值的数据
X = np.array([
    [1, 2, np.nan],
    [3, np.nan, 6],
    [np.nan, 8, 9],
    [4, 5, 6]
])

# 策略 1:用均值填充
imputer_mean = SimpleImputer(strategy='mean')
X_mean = imputer_mean.fit_transform(X)
print("均值填充:")
print(X_mean)

# 策略 2:用中位数填充
imputer_median = SimpleImputer(strategy='median')
X_median = imputer_median.fit_transform(X)
print("\n中位数填充:")
print(X_median)

# 策略 3:用最频繁值填充
imputer_frequent = SimpleImputer(strategy='most_frequent')
X_frequent = imputer_frequent.fit_transform(X)
print("\n众数填充:")
print(X_frequent)

4. 完整的预处理 Pipeline

python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

# 定义数值特征和类别特征的处理流程
numeric_features = ['age', 'income', 'credit_score']
categorical_features = ['city', 'occupation']

# 数值特征 Pipeline
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# 类别特征 Pipeline
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

# 组合所有预处理步骤
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# 完整的 ML Pipeline
full_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression())
])

# 一步到位:预处理 + 训练
# full_pipeline.fit(X_train, y_train)
# y_pred = full_pipeline.predict(X_test)

分类算法详解

1. 逻辑回归(Logistic Regression)

虽然名字里有"回归",但逻辑回归是分类算法

数学原理

1. 线性组合:z = w₁x₁ + w₂x₂ + ... + b
2. Sigmoid 函数:σ(z) = 1 / (1 + e⁻ᶻ)
3. 预测概率:P(y=1|x) = σ(z)
4. 决策规则:如果 P(y=1|x) > 0.5,预测类别 1,否则类别 0
python
from sklearn.linear_model import LogisticRegression
import matplotlib.pyplot as plt

# 生成二分类数据
from sklearn.datasets import make_classification

X, y = make_classification(
    n_samples=200,
    n_features=2,
    n_informative=2,
    n_redundant=0,
    n_clusters_per_class=1,
    random_state=42
)

# 训练逻辑回归
lr = LogisticRegression()
lr.fit(X, y)

# 可视化决策边界
def plot_decision_boundary(model, X, y):
    h = 0.02  # 网格步长
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                         np.arange(y_min, y_max, h))

    Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    plt.contourf(xx, yy, Z, alpha=0.3)
    plt.scatter(X[:, 0], X[:, 1], c=y, edgecolors='k')
    plt.xlabel('Feature 1')
    plt.ylabel('Feature 2')
    plt.title('Decision Boundary')
    plt.show()

plot_decision_boundary(lr, X, y)

# 预测概率
proba = lr.predict_proba(X[:5])
print("预测概率:")
print(proba)

2. 决策树(Decision Tree)

决策树通过一系列 if-else 规则进行预测。

python
from sklearn.tree import DecisionTreeClassifier, plot_tree
import matplotlib.pyplot as plt

# 训练决策树
dt = DecisionTreeClassifier(max_depth=3, random_state=42)
dt.fit(X, y)

# 可视化决策树
plt.figure(figsize=(20, 10))
plot_tree(dt, filled=True, feature_names=['Feature 1', 'Feature 2'])
plt.show()

# 特征重要性
importances = dt.feature_importances_
print(f"特征重要性: {importances}")

关键超参数

  • max_depth:树的最大深度(防止过拟合)
  • min_samples_split:分裂内部节点所需的最小样本数
  • min_samples_leaf:叶子节点所需的最小样本数

3. 随机森林(Random Forest)

随机森林是集成学习的代表,通过构建多个决策树并投票。

python
from sklearn.ensemble import RandomForestClassifier

# 训练随机森林
rf = RandomForestClassifier(
    n_estimators=100,  # 树的数量
    max_depth=10,
    random_state=42
)
rf.fit(X_train, y_train)

# 评估
accuracy = rf.score(X_test, y_test)
print(f"准确率: {accuracy:.4f}")

# 特征重要性
feature_importances = pd.DataFrame({
    'feature': iris.feature_names,
    'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)

print("\n特征重要性:")
print(feature_importances)

为什么随机森林效果好?

  • Bagging:每棵树训练在不同的数据子集上(有放回抽样)
  • 特征随机性:每次分裂只考虑部分特征
  • 降低方差:多个模型平均可以减少过拟合

4. 支持向量机(SVM)

SVM 寻找最优的决策边界(最大间隔超平面)。

python
from sklearn.svm import SVC

# 线性 SVM
svm_linear = SVC(kernel='linear', C=1.0)
svm_linear.fit(X_train, y_train)

# RBF 核 SVM(可以处理非线性问题)
svm_rbf = SVC(kernel='rbf', C=1.0, gamma='scale')
svm_rbf.fit(X_train, y_train)

# 对比性能
print(f"线性 SVM 准确率: {svm_linear.score(X_test, y_test):.4f}")
print(f"RBF SVM 准确率: {svm_rbf.score(X_test, y_test):.4f}")

关键参数

  • C:正则化参数(越大越倾向于拟合训练数据)
  • kernel:核函数类型('linear', 'rbf', 'poly')
  • gamma:RBF 核的参数(影响决策边界的"光滑程度")

5. K近邻(K-Nearest Neighbors)

KNN 是最简单的算法之一:预测新样本时,找到最近的 K 个训练样本,用它们的标签投票。

python
from sklearn.neighbors import KNeighborsClassifier

# 训练 KNN
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(X_train, y_train)

# 预测
y_pred = knn.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"准确率: {accuracy:.4f}")

# 找到不同 K 值的最佳设置
k_values = range(1, 31)
accuracies = []

for k in k_values:
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(X_train, y_train)
    accuracies.append(knn.score(X_test, y_test))

plt.plot(k_values, accuracies)
plt.xlabel('K')
plt.ylabel('Accuracy')
plt.title('KNN: Accuracy vs K')
plt.show()

回归算法

1. 线性回归(Linear Regression)

python
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression

# 生成回归数据
X, y = make_regression(n_samples=100, n_features=1, noise=10, random_state=42)

# 训练线性回归
lr = LinearRegression()
lr.fit(X, y)

# 参数
print(f"斜率: {lr.coef_[0]:.4f}")
print(f"截距: {lr.intercept_:.4f}")

# 预测
y_pred = lr.predict(X)

# R² 评分(决定系数)
from sklearn.metrics import r2_score, mean_squared_error

r2 = r2_score(y, y_pred)
mse = mean_squared_error(y, y_pred)
rmse = np.sqrt(mse)

print(f"R²: {r2:.4f}")
print(f"RMSE: {rmse:.4f}")

# 可视化
plt.scatter(X, y, alpha=0.5)
plt.plot(X, y_pred, color='red', linewidth=2)
plt.xlabel('X')
plt.ylabel('y')
plt.title('Linear Regression')
plt.show()

2. 正则化回归

Lasso(L1 正则化)

python
from sklearn.linear_model import Lasso

# Lasso 会将一些系数压缩到 0,实现特征选择
lasso = Lasso(alpha=0.1)
lasso.fit(X_train, y_train)

Ridge(L2 正则化)

python
from sklearn.linear_model import Ridge

# Ridge 会缩小所有系数,但不会变为 0
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)

ElasticNet(L1 + L2)

python
from sklearn.linear_model import ElasticNet

# 结合了 Lasso 和 Ridge 的优点
elastic = ElasticNet(alpha=0.1, l1_ratio=0.5)
elastic.fit(X_train, y_train)

模型评估:不仅仅是准确率

分类评估指标

python
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    roc_auc_score,
    roc_curve
)

# 假设我们有预测结果
y_true = np.array([0, 1, 1, 0, 1, 1, 0, 0, 1, 0])
y_pred = np.array([0, 1, 0, 0, 1, 1, 0, 1, 1, 0])

# 1. 准确率(Accuracy)
accuracy = accuracy_score(y_true, y_pred)
print(f"准确率: {accuracy:.4f}")

# 2. 精确率(Precision):预测为正的样本中,真正为正的比例
precision = precision_score(y_true, y_pred)
print(f"精确率: {precision:.4f}")

# 3. 召回率(Recall):真正为正的样本中,被预测为正的比例
recall = recall_score(y_true, y_pred)
print(f"召回率: {recall:.4f}")

# 4. F1 分数:精确率和召回率的调和平均数
f1 = f1_score(y_true, y_pred)
print(f"F1 分数: {f1:.4f}")

# 5. 混淆矩阵
cm = confusion_matrix(y_true, y_pred)
print("\n混淆矩阵:")
print(cm)

混淆矩阵解释

              预测为负   预测为正
实际为负 (TN)   4      1
实际为正 (FN)   1      4
  • True Positive (TP):预测为正,实际为正
  • True Negative (TN):预测为负,实际为负
  • False Positive (FP):预测为正,实际为负(第一类错误)
  • False Negative (FN):预测为负,实际为正(第二类错误)
python
# 计算各个指标
TN, FP, FN, TP = cm.ravel()

print(f"TP: {TP}, TN: {TN}, FP: {FP}, FN: {FN}")
print(f"精确率: {TP / (TP + FP):.4f}")
print(f"召回率: {TP / (TP + FN):.4f}")

ROC 曲线和 AUC

python
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# 生成数据
X, y = make_classification(n_samples=1000, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# 训练模型
lr = LogisticRegression()
lr.fit(X_train, y_train)

# 预测概率
y_proba = lr.predict_proba(X_test)[:, 1]

# 计算 ROC 曲线
fpr, tpr, thresholds = roc_curve(y_test, y_proba)
auc = roc_auc_score(y_test, y_proba)

# 绘制 ROC 曲线
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random Classifier')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.show()

💡 AUC 解释

  • AUC = 1.0:完美分类器
  • AUC = 0.5:随机猜测
  • AUC < 0.5:比随机猜测还差(说明模型反了)

回归评估指标

python
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

y_true = np.array([3.0, -0.5, 2.0, 7.0])
y_pred = np.array([2.5, 0.0, 2.0, 8.0])

# 1. 平均绝对误差(MAE)
mae = mean_absolute_error(y_true, y_pred)
print(f"MAE: {mae:.4f}")

# 2. 均方误差(MSE)
mse = mean_squared_error(y_true, y_pred)
print(f"MSE: {mse:.4f}")

# 3. 均方根误差(RMSE)
rmse = np.sqrt(mse)
print(f"RMSE: {rmse:.4f}")

# 4. R² 分数(决定系数)
r2 = r2_score(y_true, y_pred)
print(f"R²: {r2:.4f}")

交叉验证:更可靠的性能评估

问题:如果只分割一次训练集和测试集,结果可能不稳定(取决于随机分割)。

解决方案:K 折交叉验证(K-Fold Cross-Validation)

python
from sklearn.model_selection import cross_val_score, KFold

# K 折交叉验证
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(lr, X, y, cv=kfold, scoring='accuracy')

print(f"每折准确率: {scores}")
print(f"平均准确率: {scores.mean():.4f} (+/- {scores.std():.4f})")

工作原理

数据集分为 5 份:

第 1 折:[Test] [Train] [Train] [Train] [Train]
第 2 折:[Train] [Test] [Train] [Train] [Train]
第 3 折:[Train] [Train] [Test] [Train] [Train]
第 4 折:[Train] [Train] [Train] [Test] [Train]
第 5 折:[Train] [Train] [Train] [Train] [Test]

最终结果 = 5 次测试的平均值

分层 K 折交叉验证(适用于类别不平衡):

python
from sklearn.model_selection import StratifiedKFold

# 确保每折中各类别的比例与整体一致
skfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(lr, X, y, cv=skfold, scoring='accuracy')

超参数调优

python
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC

# 定义参数网格
param_grid = {
    'C': [0.1, 1, 10, 100],
    'gamma': ['scale', 'auto', 0.001, 0.01, 0.1],
    'kernel': ['rbf', 'linear']
}

# 网格搜索
grid_search = GridSearchCV(
    SVC(),
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,  # 使用所有 CPU 核心
    verbose=2
)

grid_search.fit(X_train, y_train)

# 最佳参数
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳准确率: {grid_search.best_score_:.4f}")

# 使用最佳模型预测
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)

当参数空间很大时,随机搜索比网格搜索更高效。

python
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint

# 定义参数分布
param_distributions = {
    'C': uniform(0.1, 100),
    'gamma': uniform(0.001, 0.1),
    'kernel': ['rbf', 'linear']
}

# 随机搜索
random_search = RandomizedSearchCV(
    SVC(),
    param_distributions,
    n_iter=50,  # 尝试 50 种随机组合
    cv=5,
    random_state=42,
    n_jobs=-1
)

random_search.fit(X_train, y_train)
print(f"最佳参数: {random_search.best_params_}")

无监督学习

1. K-Means 聚类

python
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs

# 生成聚类数据
X, y_true = make_blobs(n_samples=300, centers=4, random_state=42)

# K-Means 聚类
kmeans = KMeans(n_clusters=4, random_state=42)
y_pred = kmeans.fit_predict(X)

# 可视化
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0],
            kmeans.cluster_centers_[:, 1],
            s=300, c='red', marker='X', label='Centroids')
plt.legend()
plt.title('K-Means Clustering')
plt.show()

# 惯性(Inertia):样本到最近聚类中心的距离平方和
print(f"Inertia: {kmeans.inertia_:.2f}")

肘部法则(Elbow Method):选择最佳 K 值

python
inertias = []
K_range = range(1, 11)

for k in K_range:
    kmeans = KMeans(n_clusters=k, random_state=42)
    kmeans.fit(X)
    inertias.append(kmeans.inertia_)

plt.plot(K_range, inertias, marker='o')
plt.xlabel('Number of Clusters (K)')
plt.ylabel('Inertia')
plt.title('Elbow Method')
plt.show()

2. 主成分分析(PCA)

PCA 用于降维,保留最重要的特征。

python
from sklearn.decomposition import PCA
from sklearn.datasets import load_digits

# 加载手写数字数据集(64 维)
digits = load_digits()
X = digits.data
y = digits.target

print(f"原始维度: {X.shape}")

# 降维到 2 维
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)

print(f"降维后: {X_pca.shape}")
print(f"解释的方差比例: {pca.explained_variance_ratio_}")

# 可视化
plt.figure(figsize=(10, 8))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='tab10', alpha=0.5)
plt.colorbar(scatter)
plt.xlabel('First Principal Component')
plt.ylabel('Second Principal Component')
plt.title('PCA of Digits Dataset')
plt.show()

选择合适的主成分数量

python
# 保留 95% 的方差
pca_95 = PCA(n_components=0.95)
X_pca_95 = pca_95.fit_transform(X)
print(f"保留 95% 方差需要 {X_pca_95.shape[1]} 个主成分")

综合实战:客户流失预测

让我们构建一个完整的机器学习项目:

python
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from typing import Tuple, Dict, Any
import matplotlib.pyplot as plt
import seaborn as sns

class ChurnPredictionPipeline:
    """
    客户流失预测完整流程

    包含:
    1. 数据加载和探索
    2. 特征工程
    3. 模型训练和评估
    4. 超参数调优
    5. 模型解释
    """

    def __init__(self):
        self.models = {}
        self.best_model = None
        self.scaler = StandardScaler()
        self.label_encoders = {}

    def generate_data(self, n_samples: int = 5000) -> pd.DataFrame:
        """生成模拟客户数据"""
        np.random.seed(42)

        data = {
            'customer_id': range(1, n_samples + 1),
            'age': np.random.randint(18, 70, n_samples),
            'tenure_months': np.random.randint(1, 72, n_samples),
            'monthly_charges': np.random.uniform(20, 120, n_samples),
            'total_charges': np.random.uniform(100, 8000, n_samples),
            'num_products': np.random.randint(1, 5, n_samples),
            'has_internet': np.random.choice([0, 1], n_samples),
            'has_phone': np.random.choice([0, 1], n_samples),
            'contract_type': np.random.choice(['Month-to-month', 'One year', 'Two year'], n_samples),
            'payment_method': np.random.choice(['Electronic', 'Mail', 'Bank', 'Credit'], n_samples),
            'support_calls': np.random.randint(0, 10, n_samples),
        }

        df = pd.DataFrame(data)

        # 基于特征生成流失标签(有一定的逻辑关系)
        churn_prob = (
            0.1 +
            0.3 * (df['contract_type'] == 'Month-to-month').astype(int) +
            0.2 * (df['tenure_months'] < 12).astype(int) +
            0.1 * (df['support_calls'] > 5).astype(int) +
            0.15 * (df['monthly_charges'] > 80).astype(int)
        )

        churn_prob = np.clip(churn_prob, 0, 1)
        df['churn'] = np.random.binomial(1, churn_prob)

        return df

    def eda(self, df: pd.DataFrame) -> None:
        """探索性数据分析"""
        print("=" * 60)
        print("数据概览")
        print("=" * 60)
        print(f"数据集形状: {df.shape}")
        print(f"\n流失率: {df['churn'].mean():.2%}")

        print("\n数值特征统计:")
        print(df.describe())

        print("\n缺失值统计:")
        print(df.isnull().sum())

        # 可视化流失率
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))

        # 1. 合同类型 vs 流失率
        churn_by_contract = df.groupby('contract_type')['churn'].mean()
        axes[0, 0].bar(churn_by_contract.index, churn_by_contract.values)
        axes[0, 0].set_title('Churn Rate by Contract Type')
        axes[0, 0].set_ylabel('Churn Rate')

        # 2. 在网时长 vs 流失率
        tenure_bins = [0, 12, 24, 36, 48, 72]
        df['tenure_group'] = pd.cut(df['tenure_months'], bins=tenure_bins)
        churn_by_tenure = df.groupby('tenure_group')['churn'].mean()
        axes[0, 1].bar(range(len(churn_by_tenure)), churn_by_tenure.values)
        axes[0, 1].set_title('Churn Rate by Tenure')
        axes[0, 1].set_ylabel('Churn Rate')
        axes[0, 1].set_xticklabels(churn_by_tenure.index, rotation=45)

        # 3. 月费用分布
        axes[1, 0].hist(df[df['churn'] == 0]['monthly_charges'], alpha=0.5, label='No Churn', bins=30)
        axes[1, 0].hist(df[df['churn'] == 1]['monthly_charges'], alpha=0.5, label='Churn', bins=30)
        axes[1, 0].set_title('Monthly Charges Distribution')
        axes[1, 0].legend()

        # 4. 相关性热图
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        corr = df[numeric_cols].corr()
        sns.heatmap(corr, annot=True, fmt='.2f', ax=axes[1, 1], cmap='coolwarm')
        axes[1, 1].set_title('Feature Correlation')

        plt.tight_layout()
        plt.show()

    def feature_engineering(self, df: pd.DataFrame) -> pd.DataFrame:
        """特征工程"""
        df = df.copy()

        # 1. 创建新特征
        df['avg_monthly_charges'] = df['total_charges'] / (df['tenure_months'] + 1)
        df['charges_to_products'] = df['monthly_charges'] / (df['num_products'] + 1)
        df['tenure_x_products'] = df['tenure_months'] * df['num_products']

        # 2. 分箱特征
        df['age_group'] = pd.cut(df['age'], bins=[0, 30, 50, 100], labels=['young', 'middle', 'senior'])
        df['tenure_group'] = pd.cut(df['tenure_months'], bins=[0, 12, 24, 72], labels=['new', 'medium', 'long'])

        # 3. 是否高价值客户
        df['is_high_value'] = (df['monthly_charges'] > df['monthly_charges'].median()).astype(int)

        return df

    def prepare_data(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """准备训练数据"""
        # 删除不需要的列
        df = df.drop(['customer_id'], axis=1)

        # 编码类别特征
        categorical_cols = df.select_dtypes(include=['object', 'category']).columns

        for col in categorical_cols:
            if col != 'churn':
                le = LabelEncoder()
                df[col] = le.fit_transform(df[col].astype(str))
                self.label_encoders[col] = le

        # 分离特征和目标
        X = df.drop('churn', axis=1)
        y = df['churn']

        # 分割数据
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # 标准化
        X_train = self.scaler.fit_transform(X_train)
        X_test = self.scaler.transform(X_test)

        return X_train, X_test, y_train, y_test

    def train_models(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray
    ) -> Dict[str, Any]:
        """训练多个模型"""
        models = {
            'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
            'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
            'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42)
        }

        print("\n" + "=" * 60)
        print("训练模型(5 折交叉验证)")
        print("=" * 60)

        results = {}
        for name, model in models.items():
            scores = cross_val_score(model, X_train, y_train, cv=5, scoring='roc_auc')
            results[name] = {
                'model': model,
                'cv_mean': scores.mean(),
                'cv_std': scores.std()
            }
            print(f"{name:25s}: AUC = {scores.mean():.4f} (+/- {scores.std():.4f})")

            # 训练完整模型
            model.fit(X_train, y_train)

        self.models = results
        return results

    def evaluate_model(
        self,
        model: Any,
        X_test: np.ndarray,
        y_test: np.ndarray,
        model_name: str
    ) -> None:
        """评估单个模型"""
        y_pred = model.predict(X_test)
        y_proba = model.predict_proba(X_test)[:, 1]

        print(f"\n{'=' * 60}")
        print(f"{model_name} - 测试集评估")
        print(f"{'=' * 60}")

        # 分类报告
        print("\n分类报告:")
        print(classification_report(y_test, y_pred, target_names=['No Churn', 'Churn']))

        # AUC
        auc = roc_auc_score(y_test, y_proba)
        print(f"\nROC AUC: {auc:.4f}")

        # 混淆矩阵
        cm = confusion_matrix(y_test, y_pred)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
        plt.title(f'{model_name} - Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.show()

    def tune_best_model(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray
    ) -> Any:
        """对最佳模型进行超参数调优"""
        print("\n" + "=" * 60)
        print("超参数调优(Random Forest)")
        print("=" * 60)

        param_grid = {
            'n_estimators': [50, 100, 200],
            'max_depth': [10, 20, 30, None],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4]
        }

        rf = RandomForestClassifier(random_state=42)

        grid_search = GridSearchCV(
            rf,
            param_grid,
            cv=5,
            scoring='roc_auc',
            n_jobs=-1,
            verbose=1
        )

        grid_search.fit(X_train, y_train)

        print(f"\n最佳参数: {grid_search.best_params_}")
        print(f"最佳 AUC: {grid_search.best_score_:.4f}")

        self.best_model = grid_search.best_estimator_
        return self.best_model

    def feature_importance(self, feature_names: list) -> None:
        """分析特征重要性"""
        if self.best_model is None:
            print("请先训练模型")
            return

        importances = self.best_model.feature_importances_
        indices = np.argsort(importances)[::-1]

        plt.figure(figsize=(12, 8))
        plt.title("Feature Importances")
        plt.bar(range(len(importances)), importances[indices])
        plt.xticks(range(len(importances)), [feature_names[i] for i in indices], rotation=90)
        plt.tight_layout()
        plt.show()

        print("\nTop 10 重要特征:")
        for i in range(min(10, len(importances))):
            idx = indices[i]
            print(f"{i+1:2d}. {feature_names[idx]:30s}: {importances[idx]:.4f}")

# 运行完整 Pipeline
pipeline = ChurnPredictionPipeline()

# 1. 生成数据
df = pipeline.generate_data(n_samples=5000)

# 2. 探索性分析
pipeline.eda(df)

# 3. 特征工程
df_engineered = pipeline.feature_engineering(df)

# 4. 准备数据
X_train, X_test, y_train, y_test = pipeline.prepare_data(df_engineered)
feature_names = [col for col in df_engineered.columns if col not in ['churn', 'customer_id']]

print(f"\n特征数量: {X_train.shape[1]}")
print(f"训练集大小: {X_train.shape[0]}")
print(f"测试集大小: {X_test.shape[0]}")

# 5. 训练多个模型
results = pipeline.train_models(X_train, y_train)

# 6. 评估所有模型
for name, result in results.items():
    pipeline.evaluate_model(result['model'], X_test, y_test, name)

# 7. 超参数调优
best_model = pipeline.tune_best_model(X_train, y_train)

# 8. 最终评估
pipeline.evaluate_model(best_model, X_test, y_test, "Tuned Random Forest")

# 9. 特征重要性分析
pipeline.feature_importance(feature_names)

偏差-方差权衡(Bias-Variance Tradeoff)

这是机器学习中最重要的概念之一。

python
# 总误差 = 偏差² + 方差 + 不可约误差
  • 高偏差(High Bias):模型太简单,欠拟合(underfitting)

    • 例如:用线性模型拟合非线性数据
  • 高方差(High Variance):模型太复杂,过拟合(overfitting)

    • 例如:用高阶多项式拟合少量数据

示例

python
from sklearn.linear_model import Ridge
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline

# 生成非线性数据
np.random.seed(0)
X = np.sort(np.random.rand(40, 1), axis=0)
y = np.sin(2 * np.pi * X).ravel() + np.random.normal(0, 0.1, 40)

# 测试不同复杂度的模型
degrees = [1, 4, 15]
plt.figure(figsize=(15, 5))

for i, degree in enumerate(degrees):
    ax = plt.subplot(1, 3, i + 1)

    # 创建多项式回归模型
    model = make_pipeline(PolynomialFeatures(degree), Ridge())
    model.fit(X, y)

    # 绘制预测曲线
    X_test = np.linspace(0, 1, 100).reshape(-1, 1)
    y_pred = model.predict(X_test)

    plt.scatter(X, y, color='blue', alpha=0.5)
    plt.plot(X_test, y_pred, color='red', linewidth=2)
    plt.title(f'Degree {degree}')
    plt.ylim(-2, 2)

plt.show()

如何解决

  • 正则化:L1/L2 惩罚
  • 更多数据:更多训练数据可以降低方差
  • 特征选择:去除无关特征
  • 集成方法:Bagging 降低方差,Boosting 降低偏差

实用技巧和最佳实践

1. 处理类别不平衡

python
from sklearn.utils.class_weight import compute_class_weight

# 方法 1:类权重
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
model = LogisticRegression(class_weight='balanced')

# 方法 2:过采样(SMOTE)
from imblearn.over_sampling import SMOTE

smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)

# 方法 3:欠采样
from imblearn.under_sampling import RandomUnderSampler

rus = RandomUnderSampler(random_state=42)
X_resampled, y_resampled = rus.fit_resample(X_train, y_train)

2. 特征选择

python
from sklearn.feature_selection import SelectKBest, f_classif, RFE

# 方法 1:单变量特征选择
selector = SelectKBest(f_classif, k=10)
X_selected = selector.fit_transform(X_train, y_train)

# 方法 2:递归特征消除(RFE)
rfe = RFE(estimator=RandomForestClassifier(), n_features_to_select=10)
X_rfe = rfe.fit_transform(X_train, y_train)

# 方法 3:基于模型的特征选择
from sklearn.feature_selection import SelectFromModel

selector = SelectFromModel(RandomForestClassifier(n_estimators=100))
selector.fit(X_train, y_train)
X_selected = selector.transform(X_train)

3. 模型持久化

python
import joblib

# 保存模型
joblib.dump(model, 'model.pkl')

# 加载模型
loaded_model = joblib.load('model.pkl')
predictions = loaded_model.predict(X_test)

小结

在本节中,我们深入学习了:

机器学习三大范式

  • 监督学习:分类和回归
  • 无监督学习:聚类和降维
  • 强化学习:策略学习

Scikit-Learn 核心 API

  • 统一的 Estimator 接口
  • Pipeline 和 ColumnTransformer
  • 模型持久化

数据预处理

  • 特征缩放(标准化、归一化)
  • 类别编码(Label Encoding、One-Hot Encoding)
  • 缺失值处理

分类算法

  • 逻辑回归、决策树、随机森林
  • SVM、KNN
  • 集成学习

模型评估

  • 准确率、精确率、召回率、F1
  • ROC 曲线和 AUC
  • 交叉验证

超参数调优

  • 网格搜索(Grid Search)
  • 随机搜索(Random Search)

实战项目

  • 客户流失预测完整流程
  • 特征工程和模型解释

练习题

基础题

  1. 使用 Scikit-Learn 的 load_breast_cancer 数据集,训练一个逻辑回归模型并计算准确率
  2. 用网格搜索为决策树找到最佳的 max_depth 参数
  3. 绘制一个模型的 ROC 曲线

进阶题

  1. 实现一个完整的 Pipeline,包含:
    • 数值特征标准化
    • 类别特征 One-Hot 编码
    • PCA 降维到 10 维
    • 随机森林分类
  2. 对一个不平衡数据集,比较使用类权重、SMOTE、欠采样三种方法的效果
  3. 用递归特征消除(RFE)选择最重要的 5 个特征

挑战题

  1. 构建一个 Voting Classifier,结合逻辑回归、随机森林和 SVM,比较与单个模型的性能
  2. 实现一个自定义的 Transformer,可以在 Pipeline 中使用
  3. 用 K-Means 对 Iris 数据集聚类,然后用 PCA 可视化结果,并与真实标签对比

🔗 与 LangChain 的联系

Scikit-Learn 在 AI Agent 开发中的应用:

  1. 文本分类器:用于意图识别

    python
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.naive_bayes import MultinomialNB
    
    # 训练意图分类器
    vectorizer = TfidfVectorizer()
    classifier = MultinomialNB()
  2. 聚类:用于对话历史分组

  3. 异常检测:识别异常用户行为

  4. 推荐系统:基于相似度的文档推荐

在下一章,我们将学习深度学习框架 PyTorch 和 TensorFlow,它们将为我们打开神经网络的大门!


下一节:10.4 深度学习框架:PyTorch 与 TensorFlow

在下一节,我们将从传统机器学习迈向深度学习,学习 PyTorch 的张量操作、自动微分和神经网络构建。

基于 MIT 许可证发布。内容版权归作者所有。