pandas 上不动了:Polars + DuckDB 重写 5000w 行漏斗实录

pandas 处理 5000w 行 OOM,Dask 慢且复杂。从 pandas → Dask → Polars + DuckDB 演进全实录:工具对比 + Polars Lazy + DuckDB SQL 互转 + 性能调优 + 3 大坑(Null/lazy 链/S3 慢)+ K8s 部署。内存 -70%,速度 +8x。

2024 年我们一个 Python 数据处理服务,处理量从 100w 涨到 5000w/天后崩了:OOM、慢、不收敛。从 pandas 单机 → Dask → Polars 一路走来,最终用 Polars + DuckDB 重写,内存降 70%,处理速度提升 8x。本文复盘整个演进 + 工具选型 + 代码对比。

问题背景

业务:每天处理用户行为日志做漏斗分析
原始数据:50GB Parquet(分区按天)
输出:5 张报表(到 ClickHouse)

最初实现(pandas):
import pandas as pd
df = pd.read_parquet('/data/events/dt=2024-03-15/')
df = df[df['event_type'].isin(['view', 'click', 'order'])]
df = df.groupby(['user_id', 'page_id']).agg({
    'event_id': 'count',
    'amount': 'sum'
}).reset_index()
df.to_parquet('/output/funnel_2024-03-15.parquet')

5000w 行后:
- 内存:32GB Pod 不够,OOM
- 时长:单机 90 分钟
- 频繁卡顿(GC)

需要换工具

方案对比

工具                适用场景             内存效率   速度    学习成本
=============================================================
pandas              < 1GB 数据           低         慢      低
Dask                pandas 不够时        中         中      中
PySpark             分布式 / 大集群      中         快      高
Polars              单机大数据           高         极快    中
DuckDB              SQL 分析             极高       极快    低
Modin               pandas 加速          中         快      低
Vaex                亿级行 OLAP          高         快      中

我们最终选 Polars + DuckDB
- Polars:数据处理(替代 pandas)
- DuckDB:复杂 SQL 聚合(替代 Spark)

第一阶段:Dask 尝试

import dask.dataframe as dd

# Dask 类似 pandas API,但 lazy + 分块
ddf = dd.read_parquet('/data/events/dt=2024-03-15/')
result = (ddf
    [ddf['event_type'].isin(['view', 'click', 'order'])]
    .groupby(['user_id', 'page_id'])
    .agg({'event_id': 'count', 'amount': 'sum'})
)

# 触发计算
result.compute()  # 或 result.to_parquet(...)

# 配置
from dask.distributed import Client
client = Client(
    n_workers=4,
    threads_per_worker=2,
    memory_limit='6GB',
    processes=True
)

# 优势:接 pandas 用户,几乎不用学
# 劣势:
# - 内存不够还是 OOM,需要会拆分区
# - 调试痛苦(任务图不直观)
# - shuffle 慢(网络传输大)
# - 复杂操作行为不如 pandas 稳定

第二阶段:Polars 重写

import polars as pl

# Polars Lazy API(强烈推荐用 lazy)
result = (pl
    .scan_parquet('/data/events/dt=2024-03-15/*.parquet')   # lazy scan
    .filter(pl.col('event_type').is_in(['view', 'click', 'order']))
    .group_by(['user_id', 'page_id'])
    .agg([
        pl.col('event_id').count().alias('event_count'),
        pl.col('amount').sum().alias('total_amount')
    ])
    .collect(streaming=True)    # 流式执行,内存不超过 4GB
)

result.write_parquet('/output/funnel_2024-03-15.parquet')

# Polars 优势:
# - Rust 实现,SIMD + 多线程,极快
# - Lazy + 查询优化(自动 predicate pushdown / projection pushdown)
# - 内存效率:相同数据,内存只占 pandas 的 1/3
# - API 表达力强(链式 + chainable expressions)

第三阶段:DuckDB 补复杂 SQL

import duckdb

# 复杂 JOIN + 窗口函数,用 DuckDB SQL 更直观
con = duckdb.connect(':memory:')   # 或文件持久化
con.execute("SET memory_limit='24GB'")
con.execute("SET threads=8")

result = con.execute("""
WITH funnel AS (
    SELECT
        user_id,
        page_id,
        COUNT(*) FILTER (WHERE event_type = 'view') AS views,
        COUNT(*) FILTER (WHERE event_type = 'click') AS clicks,
        COUNT(*) FILTER (WHERE event_type = 'order') AS orders,
        SUM(amount) FILTER (WHERE event_type = 'order') AS total_amount
    FROM read_parquet('/data/events/dt=2024-03-15/*.parquet')
    WHERE event_type IN ('view', 'click', 'order')
    GROUP BY user_id, page_id
),
user_stats AS (
    SELECT
        user_id,
        SUM(orders) AS user_orders,
        ROW_NUMBER() OVER (ORDER BY SUM(total_amount) DESC) AS rank
    FROM funnel
    GROUP BY user_id
)
SELECT
    f.*,
    us.user_orders,
    us.rank
FROM funnel f
JOIN user_stats us ON us.user_id = f.user_id
WHERE us.rank <= 1000
""").pl()   # 直接转 Polars DataFrame

# DuckDB 优势:
# - SQL 写聚合更直观
# - 自动并行 + 向量化
# - 直接读 Parquet/CSV,无需 ETL
# - 跟 Polars / pandas 无缝互转

Polars vs pandas 关键 API

# 读 Parquet
pd.read_parquet('file.parquet')
pl.read_parquet('file.parquet')         # 渴望
pl.scan_parquet('file.parquet')         # 懒(推荐)

# 过滤
df[df['a'] > 10]
df.filter(pl.col('a') > 10)

# 多列选择
df[['a', 'b', 'c']]
df.select(['a', 'b', 'c'])
df.select([pl.col('a'), pl.col('b') * 2])  # 还能变换

# 分组聚合
df.groupby('cat').agg({'val': ['sum', 'mean']})
df.group_by('cat').agg([
    pl.col('val').sum().alias('s'),
    pl.col('val').mean().alias('m')
])

# 窗口函数
df['rank'] = df.groupby('cat')['val'].rank()
df.with_columns(
    pl.col('val').rank().over('cat').alias('rank')
)

# JOIN
pd.merge(df1, df2, on='key', how='left')
df1.join(df2, on='key', how='left')

# 字符串
df['email'].str.contains('@gmail')
df.filter(pl.col('email').str.contains('@gmail'))

# 时间
pd.to_datetime(df['ts'])
df.with_columns(pl.col('ts').str.to_datetime())

# 透视
df.pivot(index='date', columns='cat', values='val')
df.pivot(index='date', on='cat', values='val')

Polars 性能关键

# 1. 用 Lazy API
# 不好
df = pl.read_parquet('big.parquet')   # 全加载
df = df.filter(...)                   # 已经加载完才过滤
df = df.select(['a', 'b'])            # 已经加载完才裁剪

# 好
df = (pl.scan_parquet('big.parquet')  # lazy
    .filter(pl.col('a') > 10)         # predicate pushdown
    .select(['a', 'b'])               # projection pushdown
    .collect(streaming=True))         # 只读取过滤后 + 选中列

# 2. 流式执行(数据 > 内存)
df = pl.scan_parquet('huge.parquet').collect(streaming=True)

# 3. 避免 .to_pandas() 中转
# 不好:Polars 处理一半,转 pandas 处理后再转回
# 好:全程 Polars

# 4. 用 Expression API,不用 .map_elements
# 不好(Python loop)
df.with_columns(pl.col('x').map_elements(lambda x: x * 2))

# 好(向量化)
df.with_columns(pl.col('x') * 2)

# 5. join 顺序:小表 join 大表
df_big.join(df_small, on='key')  # 自动选 hash join

# 6. 用 Categorical 节省内存
df = df.with_columns(pl.col('country').cast(pl.Categorical))

实战:漏斗分析重写

# 完整漏斗分析:浏览 → 点击 → 加购 → 下单
import polars as pl
import duckdb
from datetime import datetime, timedelta

def funnel_analysis(date_str: str) -> pl.DataFrame:
    """5000w 行漏斗分析"""

    # 1. Polars 加载 + 过滤 + 排序
    events = (pl.scan_parquet(f'/data/events/dt={date_str}/*.parquet')
        .filter(pl.col('event_type').is_in(['view', 'click', 'add_cart', 'order']))
        .filter(pl.col('user_id').is_not_null())
        .sort(['user_id', 'ts']))

    # 2. DuckDB 做窗口聚合(SQL 表达力强)
    con = duckdb.connect()
    con.register('events', events.collect())

    funnel = con.execute("""
    WITH user_events AS (
        SELECT
            user_id,
            event_type,
            ts,
            ROW_NUMBER() OVER (
                PARTITION BY user_id, event_type
                ORDER BY ts
            ) AS first_seq
        FROM events
    ),
    first_events AS (
        SELECT user_id, event_type, ts
        FROM user_events
        WHERE first_seq = 1
    ),
    pivoted AS (
        SELECT
            user_id,
            MAX(CASE WHEN event_type = 'view' THEN ts END) AS view_ts,
            MAX(CASE WHEN event_type = 'click' THEN ts END) AS click_ts,
            MAX(CASE WHEN event_type = 'add_cart' THEN ts END) AS cart_ts,
            MAX(CASE WHEN event_type = 'order' THEN ts END) AS order_ts
        FROM first_events
        GROUP BY user_id
    )
    SELECT
        COUNT(*) FILTER (WHERE view_ts IS NOT NULL) AS view_users,
        COUNT(*) FILTER (WHERE click_ts IS NOT NULL AND click_ts > view_ts) AS click_users,
        COUNT(*) FILTER (WHERE cart_ts IS NOT NULL AND cart_ts > click_ts) AS cart_users,
        COUNT(*) FILTER (WHERE order_ts IS NOT NULL AND order_ts > cart_ts) AS order_users
    FROM pivoted
    """).pl()

    return funnel

# 跑 7 天对比
for i in range(7):
    date = (datetime.now() - timedelta(days=i+1)).strftime('%Y-%m-%d')
    result = funnel_analysis(date)
    print(f"{date}: {result}")

性能对比

数据量:5000w 行,50GB Parquet

工具         峰值内存    总耗时       结果
=========================================
pandas       OOM         -            失败
Dask         28GB        82min         成功
Polars(单 lazy)  9GB     14min        成功
Polars + DuckDB   9GB    11min        成功

代码行数对比(同样逻辑):
- pandas:120 行
- Dask:130 行(改 5 处)
- Polars:80 行(用 lazy 表达式)
- Polars + DuckDB(SQL):60 行(SQL 表达力强)

CPU 利用率:
- pandas:1 核 100%(单线程)
- Dask:8 核 60%(任务调度开销)
- Polars:8 核 95%(原生多线程)
- DuckDB:8 核 95%(向量化)

常见坑

坑 1:Polars 和 pandas 的 NaN/Null 不同

# pandas:NaN 是 float64 special value
df['col'].isnull()        # NaN 和 None 都是 True

# Polars:Null 是显式的,不依赖 NaN
df.filter(pl.col('col').is_null())   # 只有真 Null
df.filter(pl.col('col').is_nan())    # 只有 NaN(float 特殊值)

# 转换时注意
pl_df = pl.from_pandas(pd_df)
# NaN 在 pl 里可能变成 Null,要确认

坑 2:Lazy 链路太长内存涨

# 太多 lazy 操作堆积,collect 时一次性算
lazy = scan...
for col in 100_cols:
    lazy = lazy.with_columns(...)
result = lazy.collect()   # 一次执行 100 个 op

# 修法:中间 collect 拆分
checkpoint = lazy.collect()
new_lazy = checkpoint.lazy().with_columns(...)

坑 3:DuckDB 读 S3 慢

# DuckDB 直接读 S3
con.execute("""
INSTALL httpfs;
LOAD httpfs;
SET s3_region='us-east-1';
SET s3_access_key_id='...';
SET s3_secret_access_key='...';
""")

# 慢的原因:每次 SELECT 都重新拉
# 修法:先下载到本地或挂载 EFS/S3FS
con.execute("""
CREATE TABLE local AS
SELECT * FROM read_parquet('s3://bucket/data/*.parquet');
""")
# 后续查询用 local 表,快

生产部署

# Dockerfile
FROM python:3.11-slim
RUN pip install polars==0.20.5 duckdb==0.10.0 pyarrow==15.0.0
COPY pipeline.py /app/
CMD ["python", "/app/pipeline.py"]

# K8s Job 调度
apiVersion: batch/v1
kind: CronJob
metadata:
  name: funnel-analysis
spec:
  schedule: "0 2 * * *"   # 每天 2:00 跑昨天数据
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: pipeline
              image: pipeline:latest
              resources:
                requests:
                  memory: 12Gi
                  cpu: 8
                limits:
                  memory: 16Gi
                  cpu: 8
              env:
                - name: POLARS_MAX_THREADS
                  value: "8"
                - name: DUCKDB_THREADS
                  value: "8"
          restartPolicy: OnFailure

监控告警

import time
from prometheus_client import Counter, Histogram, push_to_gateway, CollectorRegistry

registry = CollectorRegistry()
runs = Counter('pipeline_runs_total', 'Pipeline runs', ['status'], registry=registry)
duration = Histogram('pipeline_duration_seconds', 'Pipeline duration',
                     buckets=[60, 300, 600, 1200, 1800], registry=registry)
memory_gauge = Gauge('pipeline_memory_peak_gb', 'Peak memory', registry=registry)

start = time.time()
try:
    result = funnel_analysis(date_str)
    runs.labels(status='success').inc()
except Exception as e:
    runs.labels(status='failure').inc()
    raise
finally:
    duration.observe(time.time() - start)
    # peak memory from psutil
    import psutil
    memory_gauge.set(psutil.Process().memory_info().rss / 1e9)
    push_to_gateway('pushgateway:9091', job='funnel_analysis', registry=registry)

避坑清单

  1. pandas 不够用时,先试 Polars 不要直接上分布式
  2. Polars 必须用 Lazy + collect(streaming=True) 才发挥优势
  3. 复杂 SQL 用 DuckDB 比 Polars Expression 直观
  4. Polars 的 Null 概念跟 pandas NaN 不同,迁移要注意
  5. 不要 .map_elements,用 Expression API 向量化
  6. 大数据 + 单机就够,不要盲目上 Spark / Dask
  7. 分区数据(按日期/类别)预先 Parquet 化
  8. DuckDB 读远程对象慢,先 cache 到本地
  9. 设置 POLARS_MAX_THREADS / DUCKDB_THREADS 控制 CPU
  10. 监控 peak memory + 运行时长,异常告警

总结

Python 数据处理这几年生态进步巨大:pandas 仍然是入门选择,但单机大数据场景 Polars 已经全面超越;复杂 SQL 用 DuckDB 比写 pandas 链式调用更清晰。我们这次重构把"5000w 行需要分布式"变成"单机 11 分钟搞定",省了一整套 Spark 集群。最大的认知改变:工具选型要看真实数据量,不要被"大数据"标签吓住先上分布式 — 单机 32GB + Polars/DuckDB 能搞定大多数 1TB 以内的数据处理。如果你的 pipeline 还在 pandas + Dask,2024 年值得花一周时间评估 Polars + DuckDB 重写。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

JVM G1 切 ZGC 两周调参实录:p99 GC 暂停 80ms→3ms

2026-5-19 12:32:01

技术教程

JVM 容器化优化实录:1.2GB→180MB 启动 90s→15s

2026-5-19 12:36:57

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索