pandas 内存从 8GB 压到 800MB:60 万行 CSV 处理的 7 步优化

600MB CSV 加载占 5.2GB,处理峰值 8GB。本文 7 步优化:usecols 选列、dtype 显式声明、category 类型、parse_dates、chunksize 流式、Parquet 替代 CSV、polars 替代 pandas。最终 800MB + 速度快 4 倍。附 DuckDB 大杀器和 5 条心法。

组里有个数据分析脚本,处理一个 600MB 的 CSV,跑起来内存峰值 8GB,本地 16GB 机器跑都吃力,放到云上 4GB 的 Lambda 直接 OOM。我把同样的逻辑改了 7 处,峰值压到 800MB,运行时间反而快了一半。本文把这 7 处改动逐个讲清楚 —— 每处都有 before / after 代码 + 实测内存数据。

背景:60w 行的订单数据

import pandas as pd

# 原始 CSV:80 列,60w 行
# 包含订单 id、用户 id、商品 id、金额、状态、时间戳、地址、备注等等
# 文件大小 600MB

df = pd.read_csv('orders.csv')
print(df.info(memory_usage='deep'))
# Memory usage: 5.2 GB

600MB 的 CSV load 进 DataFrame 后占 5.2GB,后续做 groupby / merge 内存峰值 8GB。问题不止是慢,是根本跑不起来。

优化 1:只读用到的列

# 错:全部加载
df = pd.read_csv('orders.csv')   # 5.2GB

# 对:只加载实际需要的 5 列
NEEDED = ['order_id', 'user_id', 'amount', 'status', 'created_at']
df = pd.read_csv('orders.csv', usecols=NEEDED)
# Memory usage: 480 MB

这一招直接砍掉 90% 内存。大多数 CSV / Parquet 文件,业务实际只用 5-10 列,加载所有列是浪费。

优化 2:指定 dtype,别让 pandas 猜

pandas 默认 int 是 int64(8 字节)、float 是 float64(8 字节)、str 是 object(每个字符串 50+ 字节开销)。大多数业务字段根本用不上这么大。

import numpy as np

DTYPES = {
    'order_id':   'int32',       # 订单 ID,20 亿够用,不需要 int64
    'user_id':    'int32',
    'amount':     'float32',     # 金额精度够,不需要 float64
    'status':     'category',    # 只有 'pending' / 'paid' / 'cancelled' 几种 → category
    'created_at': 'string',      # 暂时用 string,稍后转 datetime
}

df = pd.read_csv('orders.csv', usecols=list(DTYPES.keys()), dtype=DTYPES)
print(df.info(memory_usage='deep'))
# Memory usage: 165 MB ← 又降了 65%

category 是 pandas 杀手锏:状态字段只有 3 个值,原本每个字符串占 50+ 字节,转 category 之后内部只存一个 int8 索引 + 一份小字典,内存压缩 100 倍。

# 看一下 category 多省内存
s = pd.Series(['pending', 'paid', 'cancelled'] * 1000000)
print(s.memory_usage(deep=True))                    # 67_000_000 bytes ≈ 64MB

s_cat = s.astype('category')
print(s_cat.memory_usage(deep=True))                # 1_000_280 bytes ≈ 1MB

优化 3:datetime 类型 + parse_dates

# 错:用 string 存,后续处理还要 to_datetime
df['created_at'] = pd.to_datetime(df['created_at'])   # 占用 datetime64[ns] = 8 字节/行

# 对:加载时就 parse
df = pd.read_csv('orders.csv',
                 usecols=NEEDED,
                 dtype=DTYPES,
                 parse_dates=['created_at'])

# 还可以指定 format(快 10 倍以上)
df = pd.read_csv('orders.csv',
                 parse_dates=['created_at'],
                 date_format='%Y-%m-%d %H:%M:%S')

parse_dates 比加载后再 to_datetime 快很多,且省一次内存复制。

优化 4:大文件分块流式处理

如果文件实在大(GB 级),即使做了上面优化也装不下,用 chunksize 流式处理:

# 错:一次性 load
df = pd.read_csv('orders.csv')
result = df[df['amount'] > 100].groupby('user_id')['amount'].sum()

# 对:分块处理 + 流式聚合
from collections import defaultdict

totals = defaultdict(float)
counter = 0

for chunk in pd.read_csv('orders.csv',
                         usecols=NEEDED,
                         dtype=DTYPES,
                         chunksize=100_000):    # 每块 10 万行,约 30MB
    # 应用业务逻辑
    paid = chunk[chunk['amount'] > 100]
    # 聚合到全局字典
    for uid, amount in paid.groupby('user_id')['amount'].sum().items():
        totals[uid] += amount
    counter += len(chunk)
    print(f'processed {counter:,} rows, current peak users: {len(totals):,}')

result = pd.Series(totals).sort_values(ascending=False)
print(result.head(10))

这一招处理几 GB CSV 内存峰值都能压在几百 MB。

优化 5:用 Parquet 替代 CSV

CSV 是文本格式,加载时要 parse 每个字段(慢)、不能选择性读列(默认全读)、数据类型靠猜(占内存)。换成 Parquet 全部解决:

# 先一次性把 CSV 转 Parquet(只做一次)
import pandas as pd

df = pd.read_csv('orders.csv', dtype=DTYPES, parse_dates=['created_at'])
df.to_parquet('orders.parquet', compression='zstd', index=False)
# 600MB CSV → 80MB Parquet,列式压缩

# 之后所有读取都用 Parquet
df = pd.read_parquet('orders.parquet', columns=NEEDED)
# 比 CSV 快 5-10 倍,内存占用一致

# Parquet 支持谓词下推(列层面过滤,不读不需要的数据)
import pyarrow.parquet as pq

# 只读 amount > 100 的行,在文件层面就过滤
table = pq.read_table('orders.parquet',
                      columns=NEEDED,
                      filters=[('amount', '>', 100)])
df = table.to_pandas()

Parquet 是数据工程的标配。CSV 只适合给人看,真要给程序处理就转 Parquet。

优化 6:用 polars 替代 pandas

polars 是 Rust 写的 DataFrame 库,API 类似 pandas 但快 5-10 倍、内存只占一半。重写脚本不算大,对性能敏感场景非常值。

import polars as pl

# 读 Parquet(默认就是流式 + 列式 + 类型自动)
df = pl.read_parquet('orders.parquet')

# 业务:筛选 + 聚合
result = (
    df
    .filter(pl.col('amount') > 100)
    .group_by('user_id')
    .agg(pl.col('amount').sum().alias('total'))
    .sort('total', descending=True)
    .head(10)
)
print(result)

# 真正的大杀器:lazy mode
# 把所有操作记录下来,执行时优化整个 pipeline
result = (
    pl.scan_parquet('orders.parquet')          # scan_ 是 lazy,不立即读
    .filter(pl.col('amount') > 100)
    .group_by('user_id')
    .agg(pl.col('amount').sum().alias('total'))
    .sort('total', descending=True)
    .head(10)
    .collect()                                  # 这里才真正执行,且只读需要的列
)
# 比同样的 pandas 代码快 8 倍,内存峰值低 60%

优化 7:groupby 优化

同样的 groupby 写法,差异巨大:

# 错:对每个 group 调用 Python 函数(GIL + 函数调用开销)
result = df.groupby('user_id').apply(lambda g: g['amount'].sum())
# 600w 行跑 45 秒

# 对:用内置 agg,完全向量化
result = df.groupby('user_id')['amount'].sum()
# 600w 行跑 0.8 秒

# 多个聚合:用 .agg(dict)
result = df.groupby('user_id').agg({
    'amount': ['sum', 'mean', 'max'],
    'order_id': 'count',
})

# transform 比 groupby + merge 更高效(保持原表行数)
df['user_avg'] = df.groupby('user_id')['amount'].transform('mean')

# 大数据集排序后再 groupby 性能好很多
df = df.sort_values('user_id')
result = df.groupby('user_id', sort=False)['amount'].sum()

诊断:看具体哪一步内存暴涨

# 1. memory_usage(deep=True) 看真实内存(包括 object 引用的)
df.memory_usage(deep=True).sum() / 1024**2

# 2. tracemalloc 看任意时刻 Python 对象分配
import tracemalloc

tracemalloc.start()
df = pd.read_csv('orders.csv')
current, peak = tracemalloc.get_traced_memory()
print(f'current: {current/1024**2:.1f}MB, peak: {peak/1024**2:.1f}MB')

# 3. memray 是更专业的内存 profiler
# pip install memray
# memray run script.py
# memray flamegraph memray-*.bin   # 生成火焰图

# 4. 业务代码中加 checkpoint
import psutil, os

def mem_mb():
    return psutil.Process(os.getpid()).memory_info().rss / 1024**2

print(f'before load: {mem_mb():.0f}MB')
df = pd.read_csv('orders.csv')
print(f'after load: {mem_mb():.0f}MB')
df['amount_yuan'] = df['amount'] / 100
print(f'after transform: {mem_mb():.0f}MB')
result = df.groupby('user_id')['amount'].sum()
print(f'after groupby: {mem_mb():.0f}MB')

对比表:7 步优化的效果

优化步骤                          内存峰值     运行时间
原版                              8.2 GB      183 秒
+ usecols 只读需要的列            2.1 GB      75 秒
+ dtype 指定类型                   780 MB     58 秒
+ parse_dates                      770 MB     42 秒
+ chunksize 分块流式              280 MB      35 秒
+ Parquet 替代 CSV                240 MB      8 秒
+ polars 替代 pandas              165 MB      4 秒
+ groupby agg 替代 apply          165 MB      2.3 秒

最大收益是 1+2(usecols + dtype),内存从 8GB 干到 780MB。最大时间收益是 6(换 Parquet),CSV → Parquet 整个 pipeline 提速 4 倍。

什么时候该升级到 Spark / Dask

上面这些都是单机优化。如果数据量超过单机内存上限(比如 500GB),还要考虑分布式:

  • polars:单机几十 GB 数据,polars 完全够,且比 Spark 启动快
  • Dask:类 pandas API 的分布式,适合 pandas 用户无痛迁移
  • Spark:几百 GB ~ TB 级数据,需要集群
  • DuckDB:单机内存外 SQL,处理几十 GB Parquet 性价比极高
# DuckDB:单机也能跑 SQL,内存外执行,处理大 Parquet 一绝
import duckdb

# 直接 SQL 查 Parquet,不需要先 load 到 DataFrame
result = duckdb.query("""
    SELECT user_id, SUM(amount) as total
    FROM 'orders.parquet'
    WHERE amount > 100
    GROUP BY user_id
    ORDER BY total DESC
    LIMIT 10
""").to_df()

# 处理 50GB Parquet 文件,DuckDB 在 8GB 机器上能跑,pandas 直接 OOM

pandas 性能 5 条心法

  1. 能不读就不读:usecols + filter pushdown 优先
  2. 类型显式声明:int32 / float32 / category 是默认
  3. 向量化优先,apply 是反模式:不要 .apply(lambda)
  4. 大文件用列式存储:Parquet / Arrow,CSV 只在导入导出时用
  5. polars / DuckDB 是 pandas 的真正后继:别死守 pandas,该换就换

这次优化后,原本要在云上花 1 小时 + 32GB 内存机器跑的脚本,现在 5 分钟 + 2GB 机器跑完。云账单一年下来省 6 位数。优化数据处理代码的 ROI 是真的高。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

Nginx upstream keepalive 漏一行配置,QPS 直接砍 6 倍

2026-5-19 10:44:09

技术教程

Kafka 消费幂等 + Offset 管理:2 年踩过的 7 个真实坑

2026-5-19 10:48:34

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索