Skip to content

10.2 科学计算基石:NumPy 与 Pandas

为什么从 NumPy 开始?

在深入 AI/ML 之前,我们必须理解一个根本性的问题:为什么所有的深度学习框架都建立在 NumPy 之上?

💡 核心概念:现代 AI 的本质是张量运算。图像是张量,文本是张量,神经网络的权重是张量,梯度也是张量。而 NumPy 提供了 Python 中最高效的多维数组(张量)实现。

从 PyTorch 的 torch.Tensor 到 TensorFlow 的 tf.Tensor,它们的 API 设计都深受 NumPy 影响。掌握 NumPy,你就掌握了整个 AI 技术栈的通用语言。

NumPy:多维数组的艺术

什么是张量(Tensor)?

在数学中:

  • 标量(Scalar):0 维张量,例如 5, 3.14
  • 向量(Vector):1 维张量,例如 [1, 2, 3]
  • 矩阵(Matrix):2 维张量,例如 [[1, 2], [3, 4]]
  • 张量(Tensor):n 维数组,例如 3D、4D...

在 AI 中:

  • 标量:损失值、学习率
  • 向量:单词嵌入(word embedding)
  • 矩阵:神经网络权重
  • 3D 张量:一批句子的嵌入(batch_size, seq_len, embedding_dim)
  • 4D 张量:一批图像(batch_size, height, width, channels)

NumPy 数组创建

python
import numpy as np
from typing import Tuple

# 1. 从 Python 列表创建
arr_1d = np.array([1, 2, 3, 4, 5])
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])

print(f"1D 数组形状: {arr_1d.shape}")  # (5,)
print(f"2D 数组形状: {arr_2d.shape}")  # (2, 3)

# 2. 创建特殊数组
zeros = np.zeros((3, 4))  # 3x4 的零矩阵
ones = np.ones((2, 3, 4))  # 2x3x4 的全 1 张量
identity = np.eye(5)  # 5x5 单位矩阵

# 3. 创建数值范围
arange = np.arange(0, 10, 2)  # [0, 2, 4, 6, 8]
linspace = np.linspace(0, 1, 5)  # [0.0, 0.25, 0.5, 0.75, 1.0]

# 4. 随机数组(在 ML 中用于初始化权重)
np.random.seed(42)  # 设置随机种子,保证可重现性
random_normal = np.random.randn(3, 4)  # 标准正态分布
random_uniform = np.random.rand(3, 4)  # [0, 1) 均匀分布

# 💡 Xavier 初始化(深度学习常用)
def xavier_init(shape: Tuple[int, ...]) -> np.ndarray:
    """
    Xavier/Glorot 初始化:用于神经网络权重初始化

    原理:保持前向传播和反向传播时方差一致
    公式:W ~ N(0, 2/(n_in + n_out))
    """
    fan_in, fan_out = shape[0], shape[1]
    std = np.sqrt(2.0 / (fan_in + fan_out))
    return np.random.randn(*shape) * std

weights = xavier_init((128, 64))  # 128 输入,64 输出
print(f"权重形状: {weights.shape}, 均值: {weights.mean():.4f}, 标准差: {weights.std():.4f}")

为什么 NumPy 比 Python 列表快 100 倍?

python
import time

# Python 列表的方式
def python_sum(n: int) -> float:
    """使用 Python 列表求和"""
    data = list(range(n))
    start = time.time()
    result = sum([x**2 for x in data])
    return time.time() - start

# NumPy 的方式
def numpy_sum(n: int) -> float:
    """使用 NumPy 向量化求和"""
    data = np.arange(n)
    start = time.time()
    result = np.sum(data**2)
    return time.time() - start

n = 1_000_000
python_time = python_sum(n)
numpy_time = numpy_sum(n)

print(f"Python 列表: {python_time:.4f} 秒")
print(f"NumPy 数组: {numpy_time:.4f} 秒")
print(f"速度提升: {python_time / numpy_time:.1f}x")

为什么这么快?

  1. 向量化(Vectorization):NumPy 用 C 语言实现,避免了 Python 解释器开销
  2. 连续内存:数组元素在内存中连续存储,CPU 缓存友好
  3. SIMD 指令:现代 CPU 可以一次处理多个数据(Single Instruction, Multiple Data)

🔗 与 AI 的联系:训练神经网络时,我们需要对数百万个参数进行矩阵运算。如果使用 Python 循环,训练一个模型可能需要几个月;用 NumPy/PyTorch,只需要几小时。

广播机制(Broadcasting)——NumPy 的设计哲学

广播是 NumPy 最强大也最容易被误解的特性:

python
# 1. 标量与数组
arr = np.array([1, 2, 3, 4])
result = arr + 10  # 每个元素加 10
print(result)  # [11, 12, 13, 14]

# 2. 不同形状的数组相加
a = np.array([[1, 2, 3],
              [4, 5, 6]])  # (2, 3)
b = np.array([10, 20, 30])  # (3,)

# 广播规则:b 被"复制"为 [[10, 20, 30], [10, 20, 30]]
result = a + b
print(result)
# [[11, 22, 33]
#  [14, 25, 36]]

# 3. 在 AI 中的应用:批量归一化(Batch Normalization)
batch_data = np.random.randn(32, 128)  # 32 个样本,每个 128 维
mean = batch_data.mean(axis=0, keepdims=True)  # (1, 128)
std = batch_data.std(axis=0, keepdims=True)    # (1, 128)

# 广播:每个样本减去均值并除以标准差
normalized = (batch_data - mean) / std
print(f"归一化后均值: {normalized.mean():.6f}, 标准差: {normalized.std():.6f}")

广播的三条规则

  1. 如果两个数组维度不同,在较小数组的形状前面填充 1
  2. 对于任意维度,如果其中一个数组该维度大小为 1,则"拉伸"到匹配另一个数组
  3. 如果任意维度大小不匹配且都不为 1,则报错
python
# 📐 广播规则示例
# A: (3, 1) + B: (1, 4) → 结果: (3, 4)
A = np.array([[1], [2], [3]])  # (3, 1)
B = np.array([[10, 20, 30, 40]])  # (1, 4)
result = A + B  # (3, 4)
print(result)
# [[11, 21, 31, 41]
#  [12, 22, 32, 42]
#  [13, 23, 33, 43]]

# ⚠️ 常见陷阱:维度不匹配
try:
    A = np.ones((3, 4))
    B = np.ones((3, 5))
    C = A + B  # 报错!
except ValueError as e:
    print(f"错误: {e}")

高级索引与切片

python
# 1. 基础索引
arr = np.array([[1, 2, 3, 4],
                [5, 6, 7, 8],
                [9, 10, 11, 12]])

# 取第 2 行第 3 列
print(arr[1, 2])  # 7

# 取第 1 行的所有列
print(arr[0, :])  # [1, 2, 3, 4]

# 取所有行的第 2 列
print(arr[:, 1])  # [2, 6, 10]

# 2. 布尔索引(在数据清洗中极其重要)
data = np.array([1, -2, 3, -4, 5, -6])
positive = data[data > 0]  # 只保留正数
print(positive)  # [1, 3, 5]

# 3. 花式索引
indices = [0, 2, 3]
selected = arr[indices, :]  # 选择第 0, 2, 3 行
print(selected)

# 🔬 实际应用:从数据集中选择特定样本
dataset = np.random.randn(1000, 784)  # 1000 张 28x28 图像(展平)
labels = np.random.randint(0, 10, 1000)  # 标签

# 只选择标签为 3 的样本
class_3_mask = (labels == 3)
class_3_images = dataset[class_3_mask]
print(f"标签为 3 的样本数: {len(class_3_images)}")

矩阵运算——深度学习的核心

python
# 1. 点积(Dot Product)
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
dot_product = np.dot(a, b)  # 1*4 + 2*5 + 3*6 = 32
print(f"点积: {dot_product}")

# 2. 矩阵乘法(Matrix Multiplication)
# 这是神经网络前向传播的基础操作
X = np.random.randn(64, 128)  # 64 个样本,每个 128 维特征
W = np.random.randn(128, 10)  # 权重矩阵:128 输入,10 输出
b = np.random.randn(10)  # 偏置

# 前向传播:y = XW + b
y = np.dot(X, W) + b  # (64, 128) @ (128, 10) = (64, 10)
print(f"输出形状: {y.shape}")

# 3. 使用 @ 运算符(Python 3.5+)
y = X @ W + b  # 等价于 np.dot(X, W) + b

# 📐 矩阵维度规则
# (m, n) @ (n, p) = (m, p)
# 第一个矩阵的列数必须等于第二个矩阵的行数

# 4. 转置(Transpose)
A = np.array([[1, 2, 3],
              [4, 5, 6]])  # (2, 3)
A_T = A.T  # (3, 2)
print(A_T)

# 5. 更高维的转置
tensor_3d = np.random.randn(32, 10, 128)  # (batch, seq_len, hidden)
# 交换 seq_len 和 hidden 维度
tensor_transposed = tensor_3d.transpose(0, 2, 1)  # (32, 128, 10)
print(f"转置后形状: {tensor_transposed.shape}")

实战:实现全连接层的前向传播

让我们从零实现神经网络的核心组件:

python
from typing import Optional

class DenseLayer:
    """
    全连接层(Dense Layer / Fully Connected Layer)

    数学公式:y = σ(Wx + b)
    其中:
        - W: 权重矩阵 (input_dim, output_dim)
        - x: 输入向量 (batch_size, input_dim)
        - b: 偏置向量 (output_dim,)
        - σ: 激活函数
    """

    def __init__(
        self,
        input_dim: int,
        output_dim: int,
        activation: str = "relu"
    ):
        """
        初始化全连接层

        Args:
            input_dim: 输入维度
            output_dim: 输出维度
            activation: 激活函数 ("relu", "sigmoid", "tanh")
        """
        # Xavier 初始化
        self.W = np.random.randn(input_dim, output_dim) * np.sqrt(2.0 / input_dim)
        self.b = np.zeros(output_dim)
        self.activation = activation

    def forward(self, X: np.ndarray) -> np.ndarray:
        """
        前向传播

        Args:
            X: 输入数据 (batch_size, input_dim)

        Returns:
            输出数据 (batch_size, output_dim)
        """
        # 线性变换
        Z = X @ self.W + self.b  # (batch_size, output_dim)

        # 激活函数
        if self.activation == "relu":
            A = np.maximum(0, Z)  # ReLU: max(0, x)
        elif self.activation == "sigmoid":
            A = 1 / (1 + np.exp(-Z))  # Sigmoid: 1/(1+e^-x)
        elif self.activation == "tanh":
            A = np.tanh(Z)  # Tanh: (e^x - e^-x)/(e^x + e^-x)
        else:
            A = Z  # 线性激活

        return A

# 测试全连接层
layer = DenseLayer(input_dim=128, output_dim=64, activation="relu")
X_input = np.random.randn(32, 128)  # 32 个样本
output = layer.forward(X_input)
print(f"输入形状: {X_input.shape}, 输出形状: {output.shape}")
print(f"输出均值: {output.mean():.4f}, 标准差: {output.std():.4f}")

轴(Axis)操作——理解多维数组的关键

python
# 创建 3D 数组:(批次, 序列长度, 特征维度)
data = np.random.randn(4, 5, 3)  # 4 个句子,每个 5 个词,每个词 3 维嵌入

print(f"原始形状: {data.shape}")  # (4, 5, 3)

# axis=0: 沿着批次维度操作
mean_over_batch = data.mean(axis=0)  # (5, 3) - 每个位置的平均嵌入
print(f"axis=0 (批次维度): {mean_over_batch.shape}")

# axis=1: 沿着序列维度操作
mean_over_seq = data.mean(axis=1)  # (4, 3) - 每个句子的平均嵌入
print(f"axis=1 (序列维度): {mean_over_seq.shape}")

# axis=2: 沿着特征维度操作
mean_over_features = data.mean(axis=2)  # (4, 5) - 每个词的特征均值
print(f"axis=2 (特征维度): {mean_over_features.shape}")

# axis=(0, 1): 同时沿着多个维度
mean_over_batch_seq = data.mean(axis=(0, 1))  # (3,) - 整体特征均值
print(f"axis=(0,1): {mean_over_batch_seq.shape}")

# 🔗 实际应用:池化操作(Pooling)
# 在 CNN 中,全局平均池化(Global Average Pooling)用于降维
image_batch = np.random.randn(32, 64, 64, 3)  # (batch, height, width, channels)
global_avg_pool = image_batch.mean(axis=(1, 2))  # (32, 3)
print(f"全局平均池化后: {global_avg_pool.shape}")

Pandas:数据分析的标准工具

NumPy 提供了高效的数值计算,但处理真实世界的数据(CSV、Excel、SQL)时,我们需要更高级的工具——Pandas。

为什么需要 Pandas?

python
import pandas as pd

# 假设我们有客户流失数据
data = {
    'customer_id': [1, 2, 3, 4, 5],
    'age': [25, 35, None, 42, 28],  # 注意:有缺失值
    'income': [50000, 75000, 60000, 95000, 55000],
    'churn': [0, 1, 0, 1, 0]  # 0=留存, 1=流失
}

# 创建 DataFrame
df = pd.DataFrame(data)
print(df)

输出:

   customer_id   age  income  churn
0            1  25.0   50000      0
1            2  35.0   75000      1
2            3   NaN   60000      0
3            4  42.0   95000      1
4            5  28.0   55000      0

DataFrame 核心操作

python
# 1. 数据加载
# CSV
df_csv = pd.read_csv('data.csv')

# Excel
df_excel = pd.read_excel('data.xlsx')

# JSON
df_json = pd.read_json('data.json')

# SQL
# df_sql = pd.read_sql('SELECT * FROM customers', connection)

# 2. 数据探索
print(df.head(3))  # 前 3 行
print(df.tail(3))  # 后 3 行
print(df.info())  # 数据类型和缺失值信息
print(df.describe())  # 统计摘要

# 3. 选择数据
# 选择列
ages = df['age']
subset = df[['age', 'income']]

# 选择行
first_row = df.iloc[0]  # 按位置
filtered = df[df['age'] > 30]  # 按条件

# 4. 缺失值处理
# 检查缺失值
print(df.isnull().sum())

# 填充缺失值
df_filled = df.fillna(df['age'].mean())  # 用均值填充

# 删除缺失值
df_dropped = df.dropna()

# 5. 数据转换
df['age_group'] = pd.cut(
    df['age'],
    bins=[0, 30, 40, 100],
    labels=['young', 'middle', 'senior']
)

# 6. 分组聚合
churn_by_age = df.groupby('age_group')['churn'].mean()
print(churn_by_age)

特征工程实战

python
# 加载真实数据集示例(Titanic 数据集)
# 我们用代码模拟数据
titanic_data = {
    'PassengerId': range(1, 6),
    'Pclass': [3, 1, 3, 1, 3],
    'Name': ['Braund, Mr. Owen Harris', 'Cumings, Mrs. John Bradley',
             'Heikkinen, Miss. Laina', 'Futrelle, Mrs. Jacques Heath',
             'Allen, Mr. William Henry'],
    'Sex': ['male', 'female', 'female', 'female', 'male'],
    'Age': [22, 38, 26, 35, 35],
    'SibSp': [1, 1, 0, 1, 0],
    'Parch': [0, 0, 0, 0, 0],
    'Fare': [7.25, 71.28, 7.92, 53.10, 8.05],
    'Survived': [0, 1, 1, 1, 0]
}

df_titanic = pd.DataFrame(titanic_data)

# 特征工程步骤
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Titanic 数据集的特征工程

    Returns:
        处理后的 DataFrame
    """
    df = df.copy()

    # 1. 从姓名中提取称谓
    df['Title'] = df['Name'].str.extract(r' ([A-Za-z]+)\.', expand=False)

    # 2. 创建家庭规模特征
    df['FamilySize'] = df['SibSp'] + df['Parch'] + 1

    # 3. 创建是否独自一人的标记
    df['IsAlone'] = (df['FamilySize'] == 1).astype(int)

    # 4. 年龄分段
    df['AgeGroup'] = pd.cut(
        df['Age'],
        bins=[0, 12, 18, 60, 100],
        labels=['Child', 'Teen', 'Adult', 'Senior']
    )

    # 5. 票价分段
    df['FareGroup'] = pd.qcut(
        df['Fare'],
        q=4,
        labels=['Low', 'Medium', 'High', 'Very High']
    )

    # 6. 性别编码
    df['Sex_encoded'] = df['Sex'].map({'male': 0, 'female': 1})

    return df

df_engineered = engineer_features(df_titanic)
print(df_engineered[['Name', 'Title', 'FamilySize', 'IsAlone', 'AgeGroup']])

实战:完整的数据预处理 Pipeline

python
from typing import List
from sklearn.preprocessing import StandardScaler, LabelEncoder

class DataPreprocessor:
    """
    通用数据预处理类

    包含:
    - 缺失值处理
    - 数值特征标准化
    - 类别特征编码
    - 特征工程
    """

    def __init__(self):
        self.scalers = {}
        self.encoders = {}

    def fit_transform(
        self,
        df: pd.DataFrame,
        numerical_cols: List[str],
        categorical_cols: List[str],
        target_col: str
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        拟合并转换训练数据

        Args:
            df: 原始 DataFrame
            numerical_cols: 数值列名列表
            categorical_cols: 类别列名列表
            target_col: 目标变量列名

        Returns:
            特征矩阵 X 和目标向量 y
        """
        df = df.copy()

        # 1. 处理数值特征
        for col in numerical_cols:
            # 填充缺失值(使用中位数)
            median_value = df[col].median()
            df[col].fillna(median_value, inplace=True)

            # 标准化
            scaler = StandardScaler()
            df[col] = scaler.fit_transform(df[[col]])
            self.scalers[col] = scaler

        # 2. 处理类别特征
        for col in categorical_cols:
            # 填充缺失值(使用众数)
            mode_value = df[col].mode()[0]
            df[col].fillna(mode_value, inplace=True)

            # 标签编码
            encoder = LabelEncoder()
            df[col] = encoder.fit_transform(df[col])
            self.encoders[col] = encoder

        # 3. 分离特征和目标
        feature_cols = numerical_cols + categorical_cols
        X = df[feature_cols].values
        y = df[target_col].values

        return X, y

    def transform(
        self,
        df: pd.DataFrame,
        numerical_cols: List[str],
        categorical_cols: List[str]
    ) -> np.ndarray:
        """转换测试数据(使用已拟合的转换器)"""
        df = df.copy()

        for col in numerical_cols:
            median_value = df[col].median()
            df[col].fillna(median_value, inplace=True)
            df[col] = self.scalers[col].transform(df[[col]])

        for col in categorical_cols:
            mode_value = df[col].mode()[0]
            df[col].fillna(mode_value, inplace=True)
            df[col] = self.encoders[col].transform(df[col])

        feature_cols = numerical_cols + categorical_cols
        return df[feature_cols].values

# 使用示例
preprocessor = DataPreprocessor()
X_train, y_train = preprocessor.fit_transform(
    df_titanic,
    numerical_cols=['Age', 'Fare'],
    categorical_cols=['Sex', 'Pclass'],
    target_col='Survived'
)

print(f"特征矩阵形状: {X_train.shape}")
print(f"目标向量形状: {y_train.shape}")

SciPy:科学计算的补充工具

python
from scipy import stats, optimize, signal
from scipy.spatial.distance import cosine, euclidean

# 1. 统计测试
# t 检验:比较两组数据的均值是否有显著差异
group_a = np.random.randn(100) + 5  # 均值约为 5
group_b = np.random.randn(100) + 5.5  # 均值约为 5.5

t_statistic, p_value = stats.ttest_ind(group_a, group_b)
print(f"t 统计量: {t_statistic:.4f}, p 值: {p_value:.4f}")

if p_value < 0.05:
    print("两组有显著差异(p < 0.05)")
else:
    print("两组无显著差异")

# 2. 距离计算(在推荐系统和相似度计算中常用)
vec1 = np.random.randn(128)  # 用户 1 的嵌入
vec2 = np.random.randn(128)  # 用户 2 的嵌入

# 余弦相似度
cos_sim = 1 - cosine(vec1, vec2)  # scipy 返回距离,1-距离=相似度
print(f"余弦相似度: {cos_sim:.4f}")

# 欧氏距离
eucl_dist = euclidean(vec1, vec2)
print(f"欧氏距离: {eucl_dist:.4f}")

# 3. 优化问题
# 示例:最小化二次函数 f(x) = x^2 + 10*sin(x)
def objective(x):
    return x**2 + 10 * np.sin(x)

result = optimize.minimize(objective, x0=0)
print(f"最优解: x = {result.x[0]:.4f}, f(x) = {result.fun:.4f}")

综合实战:构建特征工程 Pipeline

让我们整合所有知识,构建一个完整的数据处理流程:

python
import numpy as np
import pandas as pd
from typing import Tuple, Dict, Any
from sklearn.model_selection import train_test_split

class MLDataPipeline:
    """
    机器学习数据处理 Pipeline

    功能:
    - 数据加载
    - 探索性数据分析
    - 特征工程
    - 数据分割
    """

    def __init__(self, data_path: str):
        self.data_path = data_path
        self.df = None
        self.preprocessor = None

    def load_data(self) -> pd.DataFrame:
        """加载数据"""
        # 实际应用中从文件加载
        # self.df = pd.read_csv(self.data_path)

        # 这里我们模拟数据
        np.random.seed(42)
        n_samples = 1000

        self.df = pd.DataFrame({
            'age': np.random.randint(18, 70, n_samples),
            'income': np.random.randint(30000, 150000, n_samples),
            'credit_score': np.random.randint(300, 850, n_samples),
            'num_products': np.random.randint(1, 5, n_samples),
            'tenure_months': np.random.randint(0, 120, n_samples),
            'is_active': np.random.choice([0, 1], n_samples),
            'country': np.random.choice(['US', 'UK', 'DE', 'FR'], n_samples),
            'churn': np.random.choice([0, 1], n_samples, p=[0.8, 0.2])
        })

        # 随机添加一些缺失值
        mask = np.random.rand(*self.df.shape) < 0.05
        self.df = self.df.mask(mask)

        return self.df

    def eda(self) -> Dict[str, Any]:
        """探索性数据分析"""
        report = {
            'shape': self.df.shape,
            'missing': self.df.isnull().sum().to_dict(),
            'statistics': self.df.describe().to_dict(),
            'churn_rate': self.df['churn'].mean()
        }
        return report

    def engineer_features(self) -> pd.DataFrame:
        """特征工程"""
        df = self.df.copy()

        # 1. 创建交互特征
        df['income_per_product'] = df['income'] / (df['num_products'] + 1)

        # 2. 创建多项式特征
        df['age_squared'] = df['age'] ** 2

        # 3. 创建分箱特征
        df['age_group'] = pd.cut(
            df['age'],
            bins=[0, 30, 50, 100],
            labels=[0, 1, 2]
        ).astype(float)

        # 4. 创建比率特征
        df['products_per_month'] = df['num_products'] / (df['tenure_months'] + 1)

        return df

    def prepare_for_training(
        self,
        test_size: float = 0.2
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """准备训练数据"""
        # 特征工程
        df_engineered = self.engineer_features()

        # 定义特征列
        numerical_cols = [
            'age', 'income', 'credit_score', 'num_products',
            'tenure_months', 'income_per_product', 'age_squared',
            'products_per_month'
        ]
        categorical_cols = ['is_active', 'country', 'age_group']

        # 预处理
        self.preprocessor = DataPreprocessor()
        X, y = self.preprocessor.fit_transform(
            df_engineered,
            numerical_cols=numerical_cols,
            categorical_cols=categorical_cols,
            target_col='churn'
        )

        # 分割数据
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42, stratify=y
        )

        print(f"训练集: {X_train.shape}, 测试集: {X_test.shape}")
        print(f"训练集流失率: {y_train.mean():.2%}")
        print(f"测试集流失率: {y_test.mean():.2%}")

        return X_train, X_test, y_train, y_test

# 使用 Pipeline
pipeline = MLDataPipeline('customer_data.csv')
df = pipeline.load_data()

# 探索性分析
eda_report = pipeline.eda()
print("=" * 50)
print("数据概览")
print("=" * 50)
print(f"数据集形状: {eda_report['shape']}")
print(f"流失率: {eda_report['churn_rate']:.2%}")

# 准备训练数据
X_train, X_test, y_train, y_test = pipeline.prepare_for_training()

性能优化技巧

1. 向量化 vs 循环

python
import time

# ❌ 不好的做法:使用 Python 循环
def compute_distances_slow(points: np.ndarray) -> np.ndarray:
    n = len(points)
    distances = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            distances[i, j] = np.sqrt(np.sum((points[i] - points[j])**2))
    return distances

# ✅ 好的做法:使用向量化
def compute_distances_fast(points: np.ndarray) -> np.ndarray:
    # 使用广播计算所有点对之间的距离
    diff = points[:, np.newaxis, :] - points[np.newaxis, :, :]
    distances = np.sqrt((diff ** 2).sum(axis=2))
    return distances

# 性能对比
points = np.random.randn(100, 3)

start = time.time()
dist_slow = compute_distances_slow(points)
time_slow = time.time() - start

start = time.time()
dist_fast = compute_distances_fast(points)
time_fast = time.time() - start

print(f"循环方式: {time_slow:.4f} 秒")
print(f"向量化方式: {time_fast:.4f} 秒")
print(f"速度提升: {time_slow / time_fast:.1f}x")

2. 内存优化

python
# 使用合适的数据类型节省内存
df_large = pd.DataFrame({
    'id': range(1_000_000),
    'value': np.random.randn(1_000_000)
})

# 查看内存使用
print("优化前:")
print(df_large.memory_usage(deep=True))

# 优化数据类型
df_optimized = df_large.copy()
df_optimized['id'] = df_optimized['id'].astype('int32')  # int64 → int32
df_optimized['value'] = df_optimized['value'].astype('float32')  # float64 → float32

print("\n优化后:")
print(df_optimized.memory_usage(deep=True))

小结

在本节中,我们学习了:

NumPy 核心概念

  • 多维数组(张量)的创建和操作
  • 向量化计算——比循环快 100 倍
  • 广播机制——NumPy 的设计哲学
  • 矩阵运算——深度学习的数学基础

Pandas 数据处理

  • DataFrame 的创建和操作
  • 数据清洗:缺失值、异常值处理
  • 特征工程:创建、转换、编码特征
  • 完整的数据预处理 Pipeline

SciPy 科学计算

  • 统计测试和假设检验
  • 距离和相似度计算
  • 优化问题求解

最佳实践

  • 向量化优先于循环
  • 内存优化技巧
  • 代码组织和可重用性

练习题

基础题

  1. 创建一个 5x5 的矩阵,对角线为 1,其余为 0
  2. 计算两个向量的余弦相似度(不使用 SciPy)
  3. 用 Pandas 读取 CSV 文件并处理缺失值

进阶题

  1. 实现批量归一化(Batch Normalization)的前向传播
  2. 用 NumPy 实现 Softmax 函数(注意数值稳定性)
  3. 构建一个完整的特征工程 Pipeline,包括:
    • 数值特征标准化
    • 类别特征编码
    • 创建交互特征

挑战题

  1. 从零实现一个简单的 k-NN 分类器(只用 NumPy)
  2. 实现 PCA(主成分分析)降维算法
  3. 用向量化操作实现图像卷积(不使用 for 循环)

下一节:10.3 机器学习实战:Scikit-Learn 完全指南

在下一节,我们将使用 Scikit-Learn 实现完整的机器学习流程,从数据预处理到模型评估,构建你的第一个生产级 ML 系统。

基于 MIT 许可证发布。内容版权归作者所有。