2024 年我们一个 Python 数据处理服务,处理量从 100w 涨到 5000w/天后崩了:OOM、慢、不收敛。从 pandas 单机 → Dask → Polars 一路走来,最终用 Polars + DuckDB 重写,内存降 70%,处理速度提升 8x。本文复盘整个演进 + 工具选型 + 代码对比。
问题背景
业务:每天处理用户行为日志做漏斗分析
原始数据:50GB Parquet(分区按天)
输出:5 张报表(到 ClickHouse)
最初实现(pandas):
import pandas as pd
df = pd.read_parquet('/data/events/dt=2024-03-15/')
df = df[df['event_type'].isin(['view', 'click', 'order'])]
df = df.groupby(['user_id', 'page_id']).agg({
'event_id': 'count',
'amount': 'sum'
}).reset_index()
df.to_parquet('/output/funnel_2024-03-15.parquet')
5000w 行后:
- 内存:32GB Pod 不够,OOM
- 时长:单机 90 分钟
- 频繁卡顿(GC)
需要换工具
方案对比
工具 适用场景 内存效率 速度 学习成本
=============================================================
pandas < 1GB 数据 低 慢 低
Dask pandas 不够时 中 中 中
PySpark 分布式 / 大集群 中 快 高
Polars 单机大数据 高 极快 中
DuckDB SQL 分析 极高 极快 低
Modin pandas 加速 中 快 低
Vaex 亿级行 OLAP 高 快 中
我们最终选 Polars + DuckDB
- Polars:数据处理(替代 pandas)
- DuckDB:复杂 SQL 聚合(替代 Spark)
第一阶段:Dask 尝试
import dask.dataframe as dd
# Dask 类似 pandas API,但 lazy + 分块
ddf = dd.read_parquet('/data/events/dt=2024-03-15/')
result = (ddf
[ddf['event_type'].isin(['view', 'click', 'order'])]
.groupby(['user_id', 'page_id'])
.agg({'event_id': 'count', 'amount': 'sum'})
)
# 触发计算
result.compute() # 或 result.to_parquet(...)
# 配置
from dask.distributed import Client
client = Client(
n_workers=4,
threads_per_worker=2,
memory_limit='6GB',
processes=True
)
# 优势:接 pandas 用户,几乎不用学
# 劣势:
# - 内存不够还是 OOM,需要会拆分区
# - 调试痛苦(任务图不直观)
# - shuffle 慢(网络传输大)
# - 复杂操作行为不如 pandas 稳定
第二阶段:Polars 重写
import polars as pl
# Polars Lazy API(强烈推荐用 lazy)
result = (pl
.scan_parquet('/data/events/dt=2024-03-15/*.parquet') # lazy scan
.filter(pl.col('event_type').is_in(['view', 'click', 'order']))
.group_by(['user_id', 'page_id'])
.agg([
pl.col('event_id').count().alias('event_count'),
pl.col('amount').sum().alias('total_amount')
])
.collect(streaming=True) # 流式执行,内存不超过 4GB
)
result.write_parquet('/output/funnel_2024-03-15.parquet')
# Polars 优势:
# - Rust 实现,SIMD + 多线程,极快
# - Lazy + 查询优化(自动 predicate pushdown / projection pushdown)
# - 内存效率:相同数据,内存只占 pandas 的 1/3
# - API 表达力强(链式 + chainable expressions)
第三阶段:DuckDB 补复杂 SQL
import duckdb
# 复杂 JOIN + 窗口函数,用 DuckDB SQL 更直观
con = duckdb.connect(':memory:') # 或文件持久化
con.execute("SET memory_limit='24GB'")
con.execute("SET threads=8")
result = con.execute("""
WITH funnel AS (
SELECT
user_id,
page_id,
COUNT(*) FILTER (WHERE event_type = 'view') AS views,
COUNT(*) FILTER (WHERE event_type = 'click') AS clicks,
COUNT(*) FILTER (WHERE event_type = 'order') AS orders,
SUM(amount) FILTER (WHERE event_type = 'order') AS total_amount
FROM read_parquet('/data/events/dt=2024-03-15/*.parquet')
WHERE event_type IN ('view', 'click', 'order')
GROUP BY user_id, page_id
),
user_stats AS (
SELECT
user_id,
SUM(orders) AS user_orders,
ROW_NUMBER() OVER (ORDER BY SUM(total_amount) DESC) AS rank
FROM funnel
GROUP BY user_id
)
SELECT
f.*,
us.user_orders,
us.rank
FROM funnel f
JOIN user_stats us ON us.user_id = f.user_id
WHERE us.rank <= 1000
""").pl() # 直接转 Polars DataFrame
# DuckDB 优势:
# - SQL 写聚合更直观
# - 自动并行 + 向量化
# - 直接读 Parquet/CSV,无需 ETL
# - 跟 Polars / pandas 无缝互转
Polars vs pandas 关键 API
# 读 Parquet
pd.read_parquet('file.parquet')
pl.read_parquet('file.parquet') # 渴望
pl.scan_parquet('file.parquet') # 懒(推荐)
# 过滤
df[df['a'] > 10]
df.filter(pl.col('a') > 10)
# 多列选择
df[['a', 'b', 'c']]
df.select(['a', 'b', 'c'])
df.select([pl.col('a'), pl.col('b') * 2]) # 还能变换
# 分组聚合
df.groupby('cat').agg({'val': ['sum', 'mean']})
df.group_by('cat').agg([
pl.col('val').sum().alias('s'),
pl.col('val').mean().alias('m')
])
# 窗口函数
df['rank'] = df.groupby('cat')['val'].rank()
df.with_columns(
pl.col('val').rank().over('cat').alias('rank')
)
# JOIN
pd.merge(df1, df2, on='key', how='left')
df1.join(df2, on='key', how='left')
# 字符串
df['email'].str.contains('@gmail')
df.filter(pl.col('email').str.contains('@gmail'))
# 时间
pd.to_datetime(df['ts'])
df.with_columns(pl.col('ts').str.to_datetime())
# 透视
df.pivot(index='date', columns='cat', values='val')
df.pivot(index='date', on='cat', values='val')
Polars 性能关键
# 1. 用 Lazy API
# 不好
df = pl.read_parquet('big.parquet') # 全加载
df = df.filter(...) # 已经加载完才过滤
df = df.select(['a', 'b']) # 已经加载完才裁剪
# 好
df = (pl.scan_parquet('big.parquet') # lazy
.filter(pl.col('a') > 10) # predicate pushdown
.select(['a', 'b']) # projection pushdown
.collect(streaming=True)) # 只读取过滤后 + 选中列
# 2. 流式执行(数据 > 内存)
df = pl.scan_parquet('huge.parquet').collect(streaming=True)
# 3. 避免 .to_pandas() 中转
# 不好:Polars 处理一半,转 pandas 处理后再转回
# 好:全程 Polars
# 4. 用 Expression API,不用 .map_elements
# 不好(Python loop)
df.with_columns(pl.col('x').map_elements(lambda x: x * 2))
# 好(向量化)
df.with_columns(pl.col('x') * 2)
# 5. join 顺序:小表 join 大表
df_big.join(df_small, on='key') # 自动选 hash join
# 6. 用 Categorical 节省内存
df = df.with_columns(pl.col('country').cast(pl.Categorical))
实战:漏斗分析重写
# 完整漏斗分析:浏览 → 点击 → 加购 → 下单
import polars as pl
import duckdb
from datetime import datetime, timedelta
def funnel_analysis(date_str: str) -> pl.DataFrame:
"""5000w 行漏斗分析"""
# 1. Polars 加载 + 过滤 + 排序
events = (pl.scan_parquet(f'/data/events/dt={date_str}/*.parquet')
.filter(pl.col('event_type').is_in(['view', 'click', 'add_cart', 'order']))
.filter(pl.col('user_id').is_not_null())
.sort(['user_id', 'ts']))
# 2. DuckDB 做窗口聚合(SQL 表达力强)
con = duckdb.connect()
con.register('events', events.collect())
funnel = con.execute("""
WITH user_events AS (
SELECT
user_id,
event_type,
ts,
ROW_NUMBER() OVER (
PARTITION BY user_id, event_type
ORDER BY ts
) AS first_seq
FROM events
),
first_events AS (
SELECT user_id, event_type, ts
FROM user_events
WHERE first_seq = 1
),
pivoted AS (
SELECT
user_id,
MAX(CASE WHEN event_type = 'view' THEN ts END) AS view_ts,
MAX(CASE WHEN event_type = 'click' THEN ts END) AS click_ts,
MAX(CASE WHEN event_type = 'add_cart' THEN ts END) AS cart_ts,
MAX(CASE WHEN event_type = 'order' THEN ts END) AS order_ts
FROM first_events
GROUP BY user_id
)
SELECT
COUNT(*) FILTER (WHERE view_ts IS NOT NULL) AS view_users,
COUNT(*) FILTER (WHERE click_ts IS NOT NULL AND click_ts > view_ts) AS click_users,
COUNT(*) FILTER (WHERE cart_ts IS NOT NULL AND cart_ts > click_ts) AS cart_users,
COUNT(*) FILTER (WHERE order_ts IS NOT NULL AND order_ts > cart_ts) AS order_users
FROM pivoted
""").pl()
return funnel
# 跑 7 天对比
for i in range(7):
date = (datetime.now() - timedelta(days=i+1)).strftime('%Y-%m-%d')
result = funnel_analysis(date)
print(f"{date}: {result}")
性能对比
数据量:5000w 行,50GB Parquet
工具 峰值内存 总耗时 结果
=========================================
pandas OOM - 失败
Dask 28GB 82min 成功
Polars(单 lazy) 9GB 14min 成功
Polars + DuckDB 9GB 11min 成功
代码行数对比(同样逻辑):
- pandas:120 行
- Dask:130 行(改 5 处)
- Polars:80 行(用 lazy 表达式)
- Polars + DuckDB(SQL):60 行(SQL 表达力强)
CPU 利用率:
- pandas:1 核 100%(单线程)
- Dask:8 核 60%(任务调度开销)
- Polars:8 核 95%(原生多线程)
- DuckDB:8 核 95%(向量化)
常见坑
坑 1:Polars 和 pandas 的 NaN/Null 不同
# pandas:NaN 是 float64 special value
df['col'].isnull() # NaN 和 None 都是 True
# Polars:Null 是显式的,不依赖 NaN
df.filter(pl.col('col').is_null()) # 只有真 Null
df.filter(pl.col('col').is_nan()) # 只有 NaN(float 特殊值)
# 转换时注意
pl_df = pl.from_pandas(pd_df)
# NaN 在 pl 里可能变成 Null,要确认
坑 2:Lazy 链路太长内存涨
# 太多 lazy 操作堆积,collect 时一次性算
lazy = scan...
for col in 100_cols:
lazy = lazy.with_columns(...)
result = lazy.collect() # 一次执行 100 个 op
# 修法:中间 collect 拆分
checkpoint = lazy.collect()
new_lazy = checkpoint.lazy().with_columns(...)
坑 3:DuckDB 读 S3 慢
# DuckDB 直接读 S3
con.execute("""
INSTALL httpfs;
LOAD httpfs;
SET s3_region='us-east-1';
SET s3_access_key_id='...';
SET s3_secret_access_key='...';
""")
# 慢的原因:每次 SELECT 都重新拉
# 修法:先下载到本地或挂载 EFS/S3FS
con.execute("""
CREATE TABLE local AS
SELECT * FROM read_parquet('s3://bucket/data/*.parquet');
""")
# 后续查询用 local 表,快
生产部署
# Dockerfile
FROM python:3.11-slim
RUN pip install polars==0.20.5 duckdb==0.10.0 pyarrow==15.0.0
COPY pipeline.py /app/
CMD ["python", "/app/pipeline.py"]
# K8s Job 调度
apiVersion: batch/v1
kind: CronJob
metadata:
name: funnel-analysis
spec:
schedule: "0 2 * * *" # 每天 2:00 跑昨天数据
jobTemplate:
spec:
template:
spec:
containers:
- name: pipeline
image: pipeline:latest
resources:
requests:
memory: 12Gi
cpu: 8
limits:
memory: 16Gi
cpu: 8
env:
- name: POLARS_MAX_THREADS
value: "8"
- name: DUCKDB_THREADS
value: "8"
restartPolicy: OnFailure
监控告警
import time
from prometheus_client import Counter, Histogram, push_to_gateway, CollectorRegistry
registry = CollectorRegistry()
runs = Counter('pipeline_runs_total', 'Pipeline runs', ['status'], registry=registry)
duration = Histogram('pipeline_duration_seconds', 'Pipeline duration',
buckets=[60, 300, 600, 1200, 1800], registry=registry)
memory_gauge = Gauge('pipeline_memory_peak_gb', 'Peak memory', registry=registry)
start = time.time()
try:
result = funnel_analysis(date_str)
runs.labels(status='success').inc()
except Exception as e:
runs.labels(status='failure').inc()
raise
finally:
duration.observe(time.time() - start)
# peak memory from psutil
import psutil
memory_gauge.set(psutil.Process().memory_info().rss / 1e9)
push_to_gateway('pushgateway:9091', job='funnel_analysis', registry=registry)
避坑清单
- pandas 不够用时,先试 Polars 不要直接上分布式
- Polars 必须用 Lazy + collect(streaming=True) 才发挥优势
- 复杂 SQL 用 DuckDB 比 Polars Expression 直观
- Polars 的 Null 概念跟 pandas NaN 不同,迁移要注意
- 不要 .map_elements,用 Expression API 向量化
- 大数据 + 单机就够,不要盲目上 Spark / Dask
- 分区数据(按日期/类别)预先 Parquet 化
- DuckDB 读远程对象慢,先 cache 到本地
- 设置 POLARS_MAX_THREADS / DUCKDB_THREADS 控制 CPU
- 监控 peak memory + 运行时长,异常告警
总结
Python 数据处理这几年生态进步巨大:pandas 仍然是入门选择,但单机大数据场景 Polars 已经全面超越;复杂 SQL 用 DuckDB 比写 pandas 链式调用更清晰。我们这次重构把"5000w 行需要分布式"变成"单机 11 分钟搞定",省了一整套 Spark 集群。最大的认知改变:工具选型要看真实数据量,不要被"大数据"标签吓住先上分布式 — 单机 32GB + Polars/DuckDB 能搞定大多数 1TB 以内的数据处理。如果你的 pipeline 还在 pandas + Dask,2024 年值得花一周时间评估 Polars + DuckDB 重写。
—— 别看了 · 2026