2024 年我们公司接了个项目 给一个做内容审核的客户做 RAG 系统 把他们 5 年的审核案例库 大约 200 万条记录 用 OpenAI text-embedding-3-small 做了向量化 存到 PostgreSQL pgvector 里 给业务侧提供 找相似案例 的能力。我第一版很顺利 写了个 embedding pipeline 把所有案例 embedding 一遍 建了一个 IVFFlat 索引 业务侧调 embed 用户输入 然后 ORDER BY embedding <=> query LIMIT 10 看起来 demo 完美无瑕 客户也认可。上线一个月后问题开始陆续出现。第一种最让我傻眼 业务反馈检索质量在下降 同样的 query 一周前能找到对的案例 一周后找不到了 我查了半天发现是有人在 embedding 模型上做了升级 但我没重新 embed 老数据 新旧 embedding 不在同一个语义空间检索全错。第二种最难缠 有些 query 怎么都搜不到明明知道存在的案例 后来才发现是 IVFFlat 索引的 probes 参数太低 recall 只有 60% 我提高到 lists/10 才到 95% 但延迟翻倍。第三种最离谱 业务有一类长文档 我直接对整段 embedding 结果检索效果极差 因为 5000 字的文档 embedding 后语义被平均稀释了 用户搜文档里某个细节根本搜不到 必须分块 chunk 但分块策略我一开始拍脑袋定 512 字符 切断了句子和段落语义。第四种最致命 我们的检索延迟 P99 突然飙到了 3 秒 排查发现是向量表数据涨到 500 万 IVFFlat 在数据量大时需要更多 lists 而我用的默认 100 lists 完全不够 重建索引花了 6 小时。第五种最莫名其妙 客户某天反馈检索结果 偏离主题 我看了几个例子发现是 embedding 模型对短查询表现很差 比如用户只输入两个词 embedding 后语义太弱 周围全是噪声向量 必须做 query expansion 或者 hybrid 检索。我盯着这一连串问题想了很久才彻底想明白第一版错在一个根本的认知上我以为 RAG 就是 文档 embed 一下 query embed 一下 cosine 相似度排个序 就完事了 可这个认知是错的真正能在生产用的 RAG 是一个 chunking 策略 加 embedding 选型 加 向量索引调优 加 检索召回评估 加 reranker 加 hybrid 检索 加 embedding 模型迁移 的整套工程方法论 任何一环没做都可能让检索质量缓慢下降或者突然崩溃本文从头梳理 RAG 检索链路上每个环节的工程细节 chunking 怎么做 embedding 怎么选 pgvector 索引怎么调 reranker 什么时候用 BM25+vector hybrid 怎么搭 以及一些把 RAG 做扎实要避开的工程坑
问题背景:为什么 RAG 看似简单做起来满是坑
很多人对 RAG 的印象停留在 embedding+cosine 几行代码就能跑 但生产里你会发现 同样的数据同样的 query 不同的工程选型 检索质量可能差几倍 延迟可能差十倍 维护成本可能差一个数量级。问题的根源在于:
- embedding 模型不是免费的 也不是永恒的:模型升级会让旧向量与新向量不兼容 必须重新 embed 全量数据 这是几小时到几天的成本。
- chunking 策略直接决定检索粒度:切太大语义稀释切太小上下文丢失 必须按文档类型设计分块策略。
- 向量索引参数不调就是默认 90% 精度:IVFFlat HNSW 都有 recall vs latency 的折中 默认参数往往不满足生产。
- 纯向量检索对短 query 表现差:用户输入两三个词 embedding 信息量不足 必须 hybrid 与 BM25 组合。
- top-k 检索结果质量参差不齐:必须加 reranker 用更强的模型对候选重新打分 才能保证最终给 LLM 的上下文质量。
- 检索质量没有持续评估就是黑盒:必须建立 ground truth 集 持续监控 recall@k MRR nDCG 否则模型升级数据漂移你都发现不了。
一 Chunking 策略:不是越小越好也不是越大越好
chunking 是 RAG 的第一步 也是最被低估的一步。错误的 chunking 会让后面所有的环节都打折扣。原则有三个 按语义边界切 重叠保留上下文 按文档类型分别处理。
import re
from dataclasses import dataclass
from typing import Iterable
@dataclass
class Chunk:
text: str
doc_id: str
chunk_id: int
metadata: dict
class SemanticChunker:
def __init__(self, target_size: int = 500, overlap: int = 80, hard_max: int = 800):
self.target_size = target_size
self.overlap = overlap
self.hard_max = hard_max
self.sentence_end = re.compile(r'(?<=[。!?\.\!\?])\s*')
self.para_break = re.compile(r'\n\s*\n')
def split_paragraphs(self, text: str) -> list:
return [p.strip() for p in self.para_break.split(text) if p.strip()]
def split_sentences(self, text: str) -> list:
return [s.strip() for s in self.sentence_end.split(text) if s.strip()]
def chunk(self, doc_id: str, text: str, metadata: dict) -> list:
chunks = []
chunk_id = 0
paragraphs = self.split_paragraphs(text)
current = []
current_len = 0
for para in paragraphs:
if current_len + len(para) <= self.target_size:
current.append(para)
current_len += len(para)
continue
if current:
chunks.append(Chunk('\n\n'.join(current), doc_id, chunk_id, metadata))
chunk_id += 1
tail = current[-1] if current else ''
current = [tail] if len(tail) < self.overlap else []
current_len = sum(len(c) for c in current)
if len(para) > self.hard_max:
for sent_chunk in self._chunk_long_para(para):
chunks.append(Chunk(sent_chunk, doc_id, chunk_id, metadata))
chunk_id += 1
current = []
current_len = 0
else:
current.append(para)
current_len += len(para)
if current:
chunks.append(Chunk('\n\n'.join(current), doc_id, chunk_id, metadata))
return chunks
def _chunk_long_para(self, para: str) -> Iterable[str]:
sentences = self.split_sentences(para)
buf = []
buf_len = 0
for sent in sentences:
if buf_len + len(sent) > self.target_size and buf:
yield ''.join(buf)
buf = buf[-2:] if len(buf) > 2 else []
buf_len = sum(len(s) for s in buf)
buf.append(sent)
buf_len += len(sent)
if buf:
yield ''.join(buf)
这里的关键设计是 段落优先 句子兜底 重叠保留上下文。先按段落切 段落太长再按句子切 切完之间留 1-2 句重叠 这样上下文不会因为切边丢失。target_size 500 是按 OpenAI embedding 一般 500-800 token 效果最稳 经验值。hard_max 是绝对上限 超过就强制再切 防止模型截断。
不同文档类型 chunking 策略要差异化处理 代码文档按函数切 表格按行切 长论文按章节切 短问答整段保留 不要用一套规则吃所有文档:
class DocTypeRouter:
def __init__(self):
self.chunkers = {
'article': SemanticChunker(target_size=500, overlap=80),
'code': SemanticChunker(target_size=400, overlap=40),
'qa': SemanticChunker(target_size=1000, overlap=0),
'table_row': None,
}
def chunk(self, doc_id: str, text: str, doc_type: str, metadata: dict) -> list:
if doc_type == 'table_row':
return [Chunk(text, doc_id, 0, metadata)]
chunker = self.chunkers.get(doc_type, self.chunkers['article'])
return chunker.chunk(doc_id, text, metadata)
二 Embedding 选型:不是越大越好
embedding 选型要看 多语言 维度 成本 部署模式。OpenAI text-embedding-3-small 1536 维 性价比高 中文也不错 适合大多数业务。bge-large-zh BAAI 中文表现强 自托管 但维护成本高。jina-embeddings-v3 多语言强 支持 8k 输入。不是维度越高越好 1536 vs 3072 在大多数业务上差距 1-2% recall 但存储和计算成本翻倍。
import time
from openai import OpenAI
class EmbeddingClient:
def __init__(self, model: str = 'text-embedding-3-small', batch_size: int = 100):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size
self.dim = {'text-embedding-3-small': 1536, 'text-embedding-3-large': 3072}.get(model, 1536)
def embed_batch(self, texts: list, max_retry: int = 3) -> list:
results = []
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
batch = [self._truncate(t) for t in batch]
for attempt in range(max_retry):
try:
resp = self.client.embeddings.create(input=batch, model=self.model)
results.extend([d.embedding for d in resp.data])
break
except Exception as e:
if attempt == max_retry - 1:
raise
time.sleep(2 ** attempt)
return results
def _truncate(self, text: str, max_chars: int = 8000) -> str:
return text[:max_chars] if len(text) > max_chars else text
关键工程要点 第一是 batch 调用 OpenAI 单 batch 最多 2048 条 但实测 100-200 条最稳 太大容易触发 429。第二是 truncation 模型有 token 上限 超过会报错 必须客户端先截断。第三是 retry with backoff 网络抖动很常见 必须重试。第四是把 embedding 调用花费记录下来 OpenAI embedding 不便宜 200 万 chunk 平均 500 token 大约 80 美元 必须有预算意识。
embedding 模型升级是 RAG 最大的隐性成本 必须事先有迁移方案 不能直接换模型 旧索引用 v3-small 新数据用 v3-large 二者向量空间不同 检索一定出问题:
class EmbeddingVersioning:
def __init__(self, db_conn):
self.conn = db_conn
def migrate(self, old_model: str, new_model: str) -> None:
new_client = EmbeddingClient(model=new_model)
cur = self.conn.cursor()
cur.execute('SELECT chunk_id, text FROM chunks WHERE embed_model = %s', (old_model,))
rows = cur.fetchall()
for i in range(0, len(rows), 100):
batch = rows[i:i + 100]
ids = [r[0] for r in batch]
texts = [r[1] for r in batch]
embeddings = new_client.embed_batch(texts)
for chunk_id, emb in zip(ids, embeddings):
cur.execute(
'UPDATE chunks SET embedding_v2 = %s, embed_model = %s WHERE chunk_id = %s',
(emb, new_model, chunk_id),
)
self.conn.commit()
三 pgvector 索引:IVFFlat vs HNSW
pgvector 是 PostgreSQL 的向量扩展 提供两种近似最近邻索引 IVFFlat 和 HNSW。IVFFlat 把向量空间分成 lists 个 cluster 检索时只查 probes 个最近的 cluster 速度快但 recall 取决于 probes 和 lists 比例。HNSW 用分层小世界图 recall 更高 内存占用更大 写入慢一些。
-- 启用 pgvector
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE chunks (
chunk_id BIGSERIAL PRIMARY KEY,
doc_id TEXT NOT NULL,
text TEXT NOT NULL,
embedding VECTOR(1536),
embed_model TEXT NOT NULL DEFAULT 'text-embedding-3-small',
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT now()
);
-- IVFFlat 索引 lists 推荐为 sqrt(rows) 100w 行设 1000 lists
CREATE INDEX idx_chunks_emb_ivf ON chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 1000);
-- HNSW 索引 m 是图的连接数 ef_construction 是构建时的 candidate list
CREATE INDEX idx_chunks_emb_hnsw ON chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- 检索时调高 probes (IVFFlat) 或 ef_search (HNSW) 换 recall
SET ivfflat.probes = 50; -- 默认 1 太低 lists 的 5-10% 比较合理
SET hnsw.ef_search = 100; -- 默认 40 调到 100 通常有更高 recall
-- 检索 SQL
SELECT chunk_id, text, embedding <=> %s::vector AS distance
FROM chunks
WHERE embed_model = 'text-embedding-3-small'
ORDER BY embedding <=> %s::vector
LIMIT 20;
关键调优经验 第一是 lists 必须接近 sqrt(rows) 太少 cluster 太大每个 cluster 内仍是线性扫 太多 cluster 之间跳过的多 recall 下降。第二是 probes 默认 1 是雪崩般的 recall 灾难 必须根据业务可接受延迟调到 5%-10% 的 lists。第三是 100 万行以上建议直接上 HNSW IVFFlat 在大数据量下重建成本越来越高。第四是建索引前必须先 INSERT 完所有数据 因为 IVFFlat 的 cluster 是基于现有数据训练 后续插入的数据如果分布偏离 cluster recall 会下降 必须定期 REINDEX。
[mermaid]flowchart TD
A[用户 query] --> B[query 改写与扩展]
B --> C[embedding 模型]
C --> D[pgvector ANN 检索
top-50]
A --> E[BM25 全文检索
top-50]
D --> F[结果合并 RRF]
E --> F
F --> G[reranker 重打分
cross-encoder]
G --> H[top-10 候选]
H --> I[LLM 生成]
I --> J[输出 + 引用]
四 Reranker:把好的候选排到前面
向量检索召回的 top-k 中 真正相关的可能只有 2-3 个 其他是 语义近但答案不对 的噪声。这时候上 reranker 用一个更强的 cross-encoder 模型对 query 和每个候选做联合编码 重新打分 把真正相关的排到前面 这是 RAG 质量提升最显著的环节。
from sentence_transformers import CrossEncoder
class Reranker:
def __init__(self, model_name: str = 'BAAI/bge-reranker-large', top_k: int = 10):
self.model = CrossEncoder(model_name)
self.top_k = top_k
def rerank(self, query: str, candidates: list) -> list:
if not candidates:
return []
pairs = [(query, c['text']) for c in candidates]
scores = self.model.predict(pairs, batch_size=32, show_progress_bar=False)
for c, s in zip(candidates, scores):
c['rerank_score'] = float(s)
candidates.sort(key=lambda x: x['rerank_score'], reverse=True)
return candidates[:self.top_k]
class RagPipeline:
def __init__(self, embed_client, vector_store, reranker):
self.embed = embed_client
self.store = vector_store
self.reranker = reranker
def retrieve(self, query: str, recall_k: int = 50, final_k: int = 10) -> list:
query_emb = self.embed.embed_batch([query])[0]
candidates = self.store.search(query_emb, top_k=recall_k)
reranked = self.reranker.rerank(query, candidates)
return reranked[:final_k]
reranker 的核心价值是 它能看到 query 和 candidate 的完整文本交互 而 embedding 检索只是看两个向量的距离 信息量天然有限。我们的业务上线 reranker 后 nDCG@10 从 0.62 提到 0.81 LLM 输出质量随之显著改善。代价是延迟 加上 reranker 每次查询增加 100-300ms 必须做好缓存。
五 Hybrid 检索:BM25 + Vector 的组合
纯向量检索对短 query 表现差 对专有名词敏感度低。BM25 全文检索正好相反 对精确匹配强 对语义弱。两者结合用 RRF Reciprocal Rank Fusion 融合排名 是 RAG 最稳定的检索范式。
from collections import defaultdict
class HybridSearcher:
def __init__(self, vector_store, bm25_store, k_rrf: int = 60):
self.vector_store = vector_store
self.bm25_store = bm25_store
self.k_rrf = k_rrf
def search(self, query: str, query_emb: list, top_k: int = 50) -> list:
vector_results = self.vector_store.search(query_emb, top_k=top_k)
bm25_results = self.bm25_store.search(query, top_k=top_k)
scores = defaultdict(float)
text_map = {}
for rank, hit in enumerate(vector_results):
cid = hit['chunk_id']
scores[cid] += 1.0 / (self.k_rrf + rank + 1)
text_map[cid] = hit
for rank, hit in enumerate(bm25_results):
cid = hit['chunk_id']
scores[cid] += 1.0 / (self.k_rrf + rank + 1)
text_map[cid] = hit
merged = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [text_map[cid] for cid, _ in merged[:top_k]]
对应 PostgreSQL 的 BM25 实现可以用 tsvector 加 ts_rank 也可以引入 paradedb 或 zombodb 等专业全文方案 视规模选择 小规模 ts_rank 够用 千万级以上建议 paradedb:
-- PostgreSQL 原生 tsvector 全文检索
ALTER TABLE chunks ADD COLUMN tsv tsvector;
UPDATE chunks SET tsv = to_tsvector('simple', text);
CREATE INDEX idx_chunks_tsv ON chunks USING GIN(tsv);
-- BM25 风格的 ts_rank 检索
SELECT chunk_id, text,
ts_rank_cd(tsv, query, 32) AS bm25_score
FROM chunks, plainto_tsquery('simple', '关键词查询') query
WHERE tsv @@ query
ORDER BY bm25_score DESC
LIMIT 50;
-- 触发器自动维护 tsv 列
CREATE OR REPLACE FUNCTION chunks_tsv_trigger() RETURNS trigger AS $$
BEGIN
NEW.tsv := to_tsvector('simple', coalesce(NEW.text, ''));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_chunks_tsv
BEFORE INSERT OR UPDATE ON chunks
FOR EACH ROW EXECUTE FUNCTION chunks_tsv_trigger();
RRF 的优势是 不需要把两个不同 scale 的 score 归一化 直接用排名融合 鲁棒性好。k_rrf 通常取 60 这个值是 RRF 论文里的经验值 调到 100 会让排名靠后的项贡献减小 不同业务可以微调但效果差异不大。
六 RAG 评估与监控:没有评估的 RAG 是黑盒
RAG 上线后最大的问题是 你不知道它的检索质量是涨了还是跌了。必须建立 ground truth 集 持续测 recall@k nDCG MRR 这些指标 模型升级数据变更前后做 A/B 对比。
import math
class RagEvaluator:
def __init__(self, ground_truth: dict):
self.ground_truth = ground_truth
def recall_at_k(self, query_id: str, retrieved_ids: list, k: int) -> float:
gold = set(self.ground_truth.get(query_id, []))
if not gold:
return 0.0
hits = sum(1 for cid in retrieved_ids[:k] if cid in gold)
return hits / len(gold)
def mrr(self, query_id: str, retrieved_ids: list) -> float:
gold = set(self.ground_truth.get(query_id, []))
for i, cid in enumerate(retrieved_ids):
if cid in gold:
return 1.0 / (i + 1)
return 0.0
def ndcg_at_k(self, query_id: str, retrieved_ids: list, k: int) -> float:
gold = self.ground_truth.get(query_id, [])
if not gold:
return 0.0
relevance = {cid: 1.0 / (i + 1) for i, cid in enumerate(gold)}
dcg = sum(relevance.get(cid, 0) / math.log2(i + 2) for i, cid in enumerate(retrieved_ids[:k]))
ideal = sorted(relevance.values(), reverse=True)[:k]
idcg = sum(r / math.log2(i + 2) for i, r in enumerate(ideal))
return dcg / idcg if idcg > 0 else 0.0
def evaluate_batch(self, queries: list, retriever) -> dict:
recall, mrr, ndcg = [], [], []
for q in queries:
results = retriever.retrieve(q['query'])
ids = [r['chunk_id'] for r in results]
recall.append(self.recall_at_k(q['id'], ids, 10))
mrr.append(self.mrr(q['id'], ids))
ndcg.append(self.ndcg_at_k(q['id'], ids, 10))
return {
'recall@10': sum(recall) / len(recall),
'mrr': sum(mrr) / len(mrr),
'ndcg@10': sum(ndcg) / len(ndcg),
}
RAG 工程的工程坑还有几个。第一是 元数据过滤忘了 用户其实只关心某客户某时间段的案例 但检索时没加 metadata filter 全库搜回来无关结果。第二是 chunk 的 metadata 必须存 doc_id 段落位置等 否则后面 LLM 引用时拼不出原文链接。第三是 检索结果给 LLM 时必须做 token 预算管理 不是 top-10 全塞进去 而是按 reranker 分数 + token 限制 动态截断。第四是 embedding pipeline 必须幂等 失败重试不能产生重复 chunk 必须用 doc_id + chunk_id 做 ON CONFLICT 处理。
关键概念速查
| 概念 | 含义 | 工程价值 |
|---|---|---|
| Chunking | 文档切分 | 检索粒度的起点 |
| Embedding | 文本转向量 | 语义检索的基础 |
| pgvector | PG 向量扩展 | 无需外部存储 |
| IVFFlat | 聚类倒排索引 | 快但 recall 依赖参数 |
| HNSW | 分层小世界图 | 高 recall 内存大 |
| Reranker | cross-encoder 重排 | 检索质量最大提升 |
| BM25 | 词频统计排序 | 精确匹配补充 |
| Hybrid + RRF | 向量+BM25 融合 | 鲁棒的工业选择 |
| recall@k | 前 k 召回率 | 检索质量主指标 |
| nDCG | 归一化折损增益 | 排序质量主指标 |
避坑清单
- embedding 模型升级必须重新 embed 全量数据 新旧不兼容否则检索全乱。
- chunking 必须按语义边界切 段落优先句子兜底 留 1-2 句重叠保上下文。
- pgvector IVFFlat lists 取 sqrt(rows) probes 取 lists 的 5-10% 默认值会让 recall 拉胯。
- 百万级以上建议直接上 HNSW IVFFlat 在大数据量下重建成本越来越高。
- 必须上 reranker 它对 nDCG 的提升一般有 0.1-0.2 是单点投入收益最高的环节。
- 短 query 必须 hybrid 检索 单靠向量在精确匹配场景上几乎必输。
- 检索结果给 LLM 时必须做 token 预算管理 不能 top-k 全塞 容易超长降智。
- 必须建 ground truth 评估集 模型升级或参数调整前后做 A/B 对比。
- 元数据过滤要在检索时一并加 不要先全库 ANN 再 Python 过滤 效率天差地别。
- embedding pipeline 必须幂等 用 doc_id + chunk_id ON CONFLICT 防止重复入库。
总结
RAG 这事 很多人的直觉是 文档 embed query embed cosine 排序就完事 这其实是把 我会调 OpenAI embedding 和 我能做出可用的 RAG 检索 混为一谈。前者是会调 API 后者是懂检索工程。中间隔着的是 chunking 策略 embedding 选型 向量索引调优 reranker hybrid 检索 评估监控 整整一套工程方法论。
从原型到生产 你需要做的事远不止 把文档 embedding 进库 然后查 top-10。你要懂 chunking 切的位置对不对 要懂 embedding 模型升级的迁移成本 要懂 IVFFlat HNSW 的 recall 与延迟折中 要会做 reranker 要会拼 hybrid 要建评估集 要做监控。每一项单独看都不复杂 但它们组合在一起 才是一个能扛业务的 RAG 系统。少任何一项 都会在某个用户某次提问上让你看到一个 找不到对的答案 的 case 而那种 case 累积起来就是用户对你的产品失去信任。
我经常用一个比喻来理解 RAG 它有点像图书馆的检索台。chunking 是把书切成可索引的页面 embedding 是为每页生成主题摘要 ANN 索引是按主题快速查找的目录 reranker 是图书管理员翻看候选页面挑出最相关的几页 hybrid 是同时按主题和精确书名找 evaluation 是定期抽样检查检索质量。你不能因为图书馆有了主题目录就觉得万事大吉 还要管页面切得合不合理 摘要生成得准不准 目录是否陈旧 管理员是否能挑出最相关的 这才是一整套图书馆工程。
这套架构最难的地方在于 它的复杂度在原型阶段几乎完全暴露不了。你自己测十几个 case 命中率挺高 觉得 RAG 真简单。但真正接了用户上规模 你才发现 99% 的复杂度都在 那 1% 的边缘 case chunking 切错的长文档 短 query 命中不到的专有名词 模型升级后悄悄下降的 recall reranker 算力不足造成的延迟回归。建议任何想做 RAG 的团队 上线前一定要建一个至少 100 条的 ground truth 评估集 每次系统改动前后跑一次 看指标走向 千万别等用户来告诉你检索质量下降了 那时候用户已经流失一大半了。
—— 别看了 · 2026