2024 年我们给一家律所做"案例检索" AI 系统 律师输入"我办了个二房东转租纠纷有什么类似案例"AI 返回 top 10 历史判例。第一版我们用 OpenAI text-embedding-ada-002 + Pinecone 5 万条判例入库 跑出来召回率惨不忍睹 律师反馈"返回的案例跟我问的完全不沾边"我们以为是 prompt 问题改了一周没用 最后才意识到是 embedding 模型不行 ada-002 中文表现差 法律领域更差。换成 bge-large-zh + Milvus 召回率飙到 90% 但生产又出问题 第一种最让我傻眼是同样一份数据 不同 embedding 模型生成的向量维度不同 1536 vs 1024 切换模型必须全部重新 embed 5 万条判例重跑了 4 小时 还烧了 200 美元;第二种最难缠是向量数据库选型 Pinecone 简单但贵 月 700 美元 Milvus 自部署便宜但运维复杂 Qdrant 介于中间 我们换了 3 次最后定 Milvus 但前面浪费了 1 个月;第三种最离谱是 HNSW 索引参数我们用默认 M=16 ef=10 召回率 70% 调到 M=32 ef=200 召回率 92% 但查询延迟从 30ms 涨到 200ms 没人告诉我们这些参数的 trade-off;第四种最致命是我们没做 hybrid search 纯向量搜索对精确关键词无能 律师搜"民法典第 703 条"返回一堆相关但不精确的案例 加了 BM25 keyword search 融合后才解决;第五种最莫名其妙是向量数据库元数据过滤性能跌崩 我们做 filter 比如 jurisdiction='上海' 直接让 HNSW 失效 全表暴力扫描 5 万条 后来才知道 Milvus partition + 字段索引才是正确做法;第六种最坑是重排 (rerank) 阶段 我们直接拿向量召回 top 100 喂 LLM 让 LLM 排 慢且贵 改用 cohere-rerank 或 bge-reranker 小模型一秒搞定。真正能投产的 RAG 检索是一个 embedding 模型选型 + chunk 切分策略 + 向量数据库工程化 + 索引参数调优 + hybrid search 混合检索 + rerank 重排 + 元数据过滤 + 持续评估的完整方法论,任何一环失守都可能让你的 RAG 从"看起来智能"变成"返回的东西全是似是而非"。本文从头梳理 embedding 与向量检索的工程要点,模型怎么选 chunk 怎么切 库怎么挑 索引怎么调 hybrid 怎么搞 rerank 怎么用 metadata 怎么过 评估怎么建,以及一些把 RAG 做扎实要避开的工程坑。
问题背景:为什么"embedding + pinecone"远远不够
很多团队跟着教程跑通 RAG demo 觉得 embedding + 向量库就行 但生产化 RAG 远比想象的复杂:
- Embedding 模型选型:中英文差异巨大 通用 vs 领域差距 5x 召回率。
- Chunk 切分策略:固定长度 / 按段落 / 按语义 / 父子文档 召回质量差 2-3 倍。
- 向量库选型:Pinecone / Milvus / Qdrant / Weaviate / PgVector 各有优劣 错位成本巨大。
- 索引参数:HNSW M/ef IVF nlist/nprobe 这些参数决定召回率与延迟的 trade-off。
- Hybrid Search:纯向量 + BM25 关键词融合是高准确率必备。
- Rerank:粗排 top 100 + 精排 top 10 才能兼顾召回与准确。
一 Embedding 模型选型:别让 OpenAI ada-002 害了你
很多教程默认用 OpenAI ada-002 但 ada-002 中文表现差 法律医疗等领域更差。生产必须做模型 benchmark 选最适合自家数据的。
# 1 主流 embedding 模型(2024)
"""
模型 维度 中文 英文 成本 备注
text-embedding-3-small 1536 中 高 $0.02/1M OpenAI 新版
text-embedding-3-large 3072 高 高 $0.13/1M OpenAI 旗舰
text-embedding-ada-002 1536 低 中 $0.10/1M 老模型已 deprecated
bge-large-zh-v1.5 1024 很高 中 免费 中文 SOTA 自部署
bge-m3 1024 高 高 免费 多语言多任务
gte-large-zh 1024 高 中 免费 阿里 SOTA
m3e-large 1024 高 中 免费 moka SOTA
voyage-large-2 1536 中 很高 $0.12/1M 专业 retrieval
cohere-embed-v3 1024 高 高 $0.10/1M 企业首选
"""
# 2 自家数据 benchmark
from sentence_transformers import SentenceTransformer
import numpy as np
def evaluate_embedding(model_name, queries, docs, ground_truth, top_k=10):
"""评估模型在自家数据上的召回率"""
if model_name.startswith("text-embedding"):
from openai import OpenAI
client = OpenAI()
def encode(texts):
resp = client.embeddings.create(input=texts, model=model_name)
return np.array([d.embedding for d in resp.data])
else:
model = SentenceTransformer(model_name)
def encode(texts):
return model.encode(texts, normalize_embeddings=True)
doc_vecs = encode(docs)
query_vecs = encode(queries)
# 计算每个 query 的 top-k 文档
hits = 0
for i, qv in enumerate(query_vecs):
sims = doc_vecs @ qv
top_idx = np.argsort(-sims)[:top_k]
if any(d_idx in ground_truth[i] for d_idx in top_idx):
hits += 1
return hits / len(queries)
# 3 真实评估示例(法律案例)
queries = [
"二房东转租纠纷应该怎么处理",
"民法典第 703 条规定的租赁合同",
"工伤认定的法律依据",
# ... 200 个真实律师 query
]
docs = load_legal_cases(50000)
ground_truth = load_human_annotated_relevance() # 人工标注的相关案例
results = {}
for model in ["text-embedding-3-large", "bge-large-zh-v1.5", "bge-m3", "gte-large-zh"]:
results[model] = evaluate_embedding(model, queries, docs, ground_truth)
print(f"{model}: recall@10 = {results[model]:.2%}")
# 我们实测结果
# text-embedding-ada-002: recall@10 = 62%
# text-embedding-3-large: recall@10 = 78%
# bge-large-zh-v1.5: recall@10 = 91%
# bge-m3: recall@10 = 89%
# gte-large-zh: recall@10 = 88%
# 4 选型决策矩阵
"""
场景 推荐
纯英文 + 预算充足 text-embedding-3-large 或 voyage-large-2
纯中文 + 自部署 bge-large-zh-v1.5 (SOTA)
多语言混合 bge-m3
预算紧张 + 通用场景 bge-large-zh-v1.5 自部署
法律/医疗/金融领域 自部署 base 模型 + 领域微调
"""
通用模型选好之后,如果业务是法律/医疗/金融等高度专业的垂直领域,通用 embedding 哪怕是 SOTA 也只能达到 90% 召回率上限,要再上一个台阶必须做领域微调。下面是用对比学习把 bge 微调成"法律专用"的标准流程,我们靠这一步把 recall@10 从 91% 拉到 96%。
# 5 领域微调 embedding(高级)
# 用对比学习 (contrastive learning) 微调 bge 让它适应法律领域
from sentence_transformers import losses, InputExample
from torch.utils.data import DataLoader
# 准备 triplet (query, positive_doc, negative_doc)
train_examples = [
InputExample(texts=[query, pos_doc, neg_doc])
for query, pos_doc, neg_doc in load_legal_triplets()
]
train_loader = DataLoader(train_examples, batch_size=16, shuffle=True)
train_loss = losses.TripletLoss(model=base_model)
base_model.fit(
train_objectives=[(train_loader, train_loss)],
epochs=3,
warmup_steps=100,
output_path="./bge-legal-finetuned"
)
# 微调后 recall@10 从 91% 提升到 96%
# 6 评估微调后是否真的更好(避免 overfitting)
def compare_models(test_set, models):
for name, m in models.items():
recall = evaluate_embedding(m, test_set["q"], test_set["d"], test_set["gt"])
print(f"{name}: recall@10 = {recall:.2%}")
compare_models(
held_out_test_set, # 必须用没见过的 test set 评估
{
"bge-base": "BAAI/bge-large-zh-v1.5",
"bge-legal-finetuned": "./bge-legal-finetuned"
}
)
# 如果微调后在 held-out test set 上没提升 说明过拟合训练数据
实战经验:必须用自家数据做 benchmark 不能信厂商的通用 benchmark;中文 bge-large-zh-v1.5 是 SOTA 且免费 没理由用 ada-002;OpenAI text-embedding-3-large 通用最强但贵 大规模数据成本爆炸;高敏感领域(法律/医疗)花一周做领域微调 提升 5-10 个百分点很值;模型一旦定下不能轻易换 切换意味着所有数据重新 embed。
二 Chunk 切分策略:决定召回上限
chunk 切得不对 模型再好也救不回。固定长度 / 按段落 / 按语义 / 父子文档不同策略召回率差 2-3 倍。
# 1 固定长度切分(最简单 但最差)
def fixed_chunk(text, size=500, overlap=50):
chunks = []
for i in range(0, len(text), size - overlap):
chunks.append(text[i:i + size])
return chunks
# 问题 切断句子 切断语义 召回质量差
# 2 递归切分(LangChain RecursiveCharacterTextSplitter)
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""],
length_function=len,
)
chunks = splitter.split_text(text)
# 优先按段落 -> 句子 -> 词 切分 不切断语义单元
# 3 按语义切分(semantic chunking)
from sentence_transformers import SentenceTransformer
import numpy as np
def semantic_chunk(text, model, threshold=0.7):
"""相邻句子相似度低时切分 形成语义聚类"""
sentences = text.split("。")
if len(sentences) < 2:
return [text]
embeddings = model.encode(sentences, normalize_embeddings=True)
sims = [embeddings[i] @ embeddings[i+1] for i in range(len(embeddings)-1)]
chunks = []
current = [sentences[0]]
for i, sim in enumerate(sims):
if sim < threshold:
# 语义跳变 切一刀
chunks.append("。".join(current))
current = [sentences[i+1]]
else:
current.append(sentences[i+1])
if current:
chunks.append("。".join(current))
return chunks
# 4 父子文档(parent-child)
# 子文档(small chunk)用于精确召回 父文档(large chunk)用于上下文给 LLM
class ParentChildChunker:
def __init__(self, parent_size=2000, child_size=300):
self.parent_size = parent_size
self.child_size = child_size
def chunk(self, text):
parent_chunks = self.split(text, self.parent_size)
result = []
for p_idx, parent in enumerate(parent_chunks):
child_chunks = self.split(parent, self.child_size)
for child in child_chunks:
result.append({
"child_text": child,
"parent_text": parent,
"parent_id": p_idx
})
return result
# 入库
for item in chunker.chunk(document):
# 向量库存 child embedding 但 metadata 记 parent_id
vector_db.insert(
embedding=embed(item["child_text"]),
metadata={"parent_id": item["parent_id"], "parent_text": item["parent_text"]}
)
# 检索时
def retrieve_with_parent(query, top_k=5):
results = vector_db.search(embed(query), top_k=top_k)
# 用 parent_text 给 LLM 提供更完整上下文
return [r["metadata"]["parent_text"] for r in results]
# 5 结构化文档:按 HTML/Markdown 标题切
from langchain_text_splitters import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "Header1"),
("##", "Header2"),
("###", "Header3"),
]
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
docs = splitter.split_text(markdown_text)
# 每个 chunk 自带标题路径作为 metadata
# 检索时可以按标题路径过滤
# 6 chunk 大小经验值
"""
场景 推荐 chunk_size
QA 问答 300-500 字符
代码检索 一个函数为一 chunk
长文档 RAG (论文/合同) 500-1000 字符 + parent 2000-3000
对话历史 按 turn 切
"""
实战经验:固定切分必死 必须用 RecursiveCharacterTextSplitter 起步;语义切分对长文档效果显著 但慢 适合离线预处理;父子文档是 RAG 最佳实践 子精召回 + 父丰富上下文给 LLM;结构化文档按 Markdown/HTML 标题切 保留层级信息;chunk_size 太大召回不精 太小上下文不够 500 是大多数场景的甜点。我们 chunk 策略从固定 1000 改成父子(child 300, parent 1500) 召回率从 75% 涨到 92%。
三 向量数据库选型与部署
Pinecone / Milvus / Qdrant / Weaviate / PgVector 各有优劣 选错代价巨大。下面是生产视角的对比与部署细节。
# 1 主流向量库对比
"""
库 托管/自部署 性能 生态 学习曲线 成本(100万向量/月)
Pinecone 托管 高 中 低 $70+
Milvus 自部署 极高 极好 高 $200(自购服务器)
Qdrant 两种 高 好 中 $25(托管) / $50(自部署)
Weaviate 两种 中 好 中 $30(托管) / $50(自部署)
Chroma 自部署 中 简单 低 免费(小规模)
PgVector 自部署 中 PG生态 极低 免费(已有 PG)
Elasticsearch+kNN 两种 中 强大 中 视部署而定
"""
# 2 Milvus 生产部署(Docker Compose)
"""
version: '3.5'
services:
etcd:
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
volumes:
- ./volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
minio:
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- ./volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
standalone:
image: milvusdb/milvus:v2.3.10
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ./volumes/milvus:/var/lib/milvus
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- etcd
- minio
"""
# 3 Milvus 客户端使用
from pymilvus import MilvusClient, DataType
client = MilvusClient(uri="http://localhost:19530")
# 创建集合
schema = MilvusClient.create_schema(auto_id=True, enable_dynamic_field=True)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=1024)
schema.add_field("text", DataType.VARCHAR, max_length=10000)
schema.add_field("jurisdiction", DataType.VARCHAR, max_length=100)
schema.add_field("case_year", DataType.INT64)
schema.add_field("case_type", DataType.VARCHAR, max_length=100)
# 索引:HNSW 性能最好
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="COSINE",
params={"M": 32, "efConstruction": 200} # 关键参数
)
client.create_collection(
collection_name="legal_cases",
schema=schema,
index_params=index_params
)
# 4 元数据字段必须建标量索引
client.create_index(
collection_name="legal_cases",
field_name="jurisdiction",
index_type="Trie"
)
client.create_index(
collection_name="legal_cases",
field_name="case_year",
index_type="STL_SORT"
)
# 5 批量插入(关键 别一条一条插)
def batch_insert(client, collection, items, batch_size=1000):
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
client.insert(collection_name=collection, data=batch)
# 6 检索带过滤
results = client.search(
collection_name="legal_cases",
data=[query_embedding],
limit=10,
search_params={"params": {"ef": 200}}, # ef 越大召回越高 越慢
filter='jurisdiction == "上海" and case_year >= 2020',
output_fields=["text", "jurisdiction", "case_year"]
)
# 7 partition 提速(按高频过滤维度分区)
client.create_partition(collection_name="legal_cases", partition_name="shanghai")
client.create_partition(collection_name="legal_cases", partition_name="beijing")
# 检索时指定 partition 只搜该分区
results = client.search(
collection_name="legal_cases",
data=[query_embedding],
partition_names=["shanghai"],
limit=10
)
实战经验:小规模(< 100 万向量)优先 PgVector 已有 PG 零成本;中规模选 Qdrant 性价比最高;百万+ 选 Milvus 极致性能;托管图省心选 Pinecone 但贵且锁定;元数据过滤字段必须建标量索引 否则 HNSW 失效全表扫;partition 按高频过滤维度切分 提速 10 倍;批量插入 1000 条/批 否则 IO 瓶颈。
四 HNSW/IVF 索引参数与 Hybrid Search
HNSW 是当前向量检索 SOTA 但参数 M / ef / efConstruction 直接决定召回与延迟。Hybrid Search 融合向量 + BM25 是高准确率必备。
# 1 HNSW 参数 trade-off
"""
M 图的边数 越大召回越高 内存越多 8-64
推荐 16(默认) 高精度 32-48
efConstruction 建图时搜索深度 越大质量越高 建索引越慢
推荐 200(默认 100 太低)
ef (search) 查询时搜索深度 越大召回越高 查询越慢
推荐 100-500 按业务延迟需求调
"""
# 2 实测对比
"""
M=16, ef=10 建索引快 召回率 70% P99 延迟 30ms
M=16, ef=200 召回率 88% P99 延迟 80ms
M=32, ef=200 召回率 92% P99 延迟 150ms
M=48, ef=500 召回率 95% P99 延迟 300ms
我们生产值 M=32 efConstruction=200 ef=200
召回率 92% P99 100ms 平衡点
"""
# 3 IVF 适用场景(超大规模 10 亿+)
# IVF_FLAT / IVF_SQ8 / IVF_PQ
index_params.add_index(
field_name="embedding",
index_type="IVF_PQ",
metric_type="L2",
params={
"nlist": 1024, # 聚类中心数 sqrt(N) 经验
"m": 16, # PQ 子向量数
"nbits": 8
}
)
# 检索时
search_params = {"params": {"nprobe": 32}} # 搜索几个聚类 越大召回越高
# IVF_PQ 压缩 8x 但召回率掉 5-10%
# 4 Hybrid Search:向量 + BM25 融合
# Milvus 2.4+ 原生支持 sparse vector
from pymilvus import MilvusClient
from BM25 import BM25Encoder # pip install bm25s
# 加 sparse 字段
schema.add_field("sparse_embedding", DataType.SPARSE_FLOAT_VECTOR)
# BM25 索引
bm25 = BM25Encoder()
bm25.fit(corpus)
# 同时存 dense 与 sparse
for doc in documents:
item = {
"embedding": dense_embed(doc), # bge dense
"sparse_embedding": bm25.encode(doc), # BM25 sparse
"text": doc
}
client.insert(collection_name="cases", data=[item])
# Hybrid 检索:RRF 融合
from pymilvus import RRFRanker, AnnSearchRequest
dense_req = AnnSearchRequest(
data=[dense_embed(query)],
anns_field="embedding",
param={"params": {"ef": 200}},
limit=20
)
sparse_req = AnnSearchRequest(
data=[bm25.encode(query)],
anns_field="sparse_embedding",
param={},
limit=20
)
results = client.hybrid_search(
collection_name="cases",
reqs=[dense_req, sparse_req],
ranker=RRFRanker(), # Reciprocal Rank Fusion 融合
limit=10
)
# 5 关键词优先场景(用户搜 "民法典第 703 条")
def hybrid_with_keyword_boost(query):
# 检测精确关键词模式
import re
if re.search(r"民法典第\s*\d+\s*条", query):
# 关键词检索优先
return bm25_search(query, top_k=10)
else:
# 语义检索为主
return hybrid_search(query, top_k=10)
# 6 BGE-M3 一体化方案
# bge-m3 同时输出 dense + sparse + ColBERT 向量
from FlagEmbedding import BGEM3FlagModel
model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=True)
# 同时获取三种向量
output = model.encode(
texts,
return_dense=True,
return_sparse=True,
return_colbert_vecs=True
)
# output["dense_vecs"] 稠密向量
# output["lexical_weights"] 稀疏(类 BM25)
# output["colbert_vecs"] token-level 精排
实战经验:HNSW 是 99% 场景的最佳选择 不要默认 M=16 太低 至少 32;Hybrid Search 必上 纯向量对精确关键词无能 + BM25 解决精确匹配;BGE-M3 是 2024 黑马 一个模型搞定 dense + sparse + colbert 工程简化;IVF_PQ 只有 10 亿级才考虑 压缩带来召回损失;ef 参数线上动态调 按延迟预算决定 不要固化。我们加 hybrid search 后 律师搜 "民法典第 703 条"召回率从 50% 涨到 95%。
[mermaid]
flowchart TD
A[用户 query] --> B{是否含精确关键词}
B -->|是| C[BM25 优先]
B -->|否| D[Embedding 编码]
D --> E[向量库 Top 100 粗排]
E --> F[BM25 Top 100]
C --> F
F --> G[RRF 融合]
G --> H[Rerank 模型 Top 10]
H --> I[Metadata 过滤]
I --> J{结果数 >= 5}
J -->|否| K[扩大召回 Top 200 重排]
K --> H
J -->|是| L[父文档 expand]
L --> M[传给 LLM]
M --> N[生成答案 + 引用]
五 Rerank 重排:粗排+精排两阶段
粗排召回 top 100 必须用 rerank 模型精排到 top 10 否则 LLM 拿到的上下文质量太差。直接让 LLM 排序又慢又贵。
# 1 Cohere Rerank(托管 简单)
import cohere
co = cohere.Client("api-key")
def cohere_rerank(query, docs, top_k=10):
results = co.rerank(
query=query,
documents=docs,
top_n=top_k,
model="rerank-multilingual-v3.0"
)
return [(r.index, r.relevance_score) for r in results.results]
# 2 BGE Reranker(自部署 中文 SOTA)
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("BAAI/bge-reranker-large", max_length=512)
def bge_rerank(query, docs, top_k=10):
pairs = [[query, doc] for doc in docs]
scores = reranker.predict(pairs)
ranked = sorted(zip(range(len(docs)), scores), key=lambda x: -x[1])
return ranked[:top_k]
# 3 完整两阶段检索 pipeline
def two_stage_retrieve(query, top_k_recall=100, top_k_final=10):
# 阶段 1 粗排 向量库召回 top 100
query_emb = embed_model.encode([query])[0]
candidates = vector_db.search(
collection_name="cases",
data=[query_emb],
limit=top_k_recall,
search_params={"params": {"ef": 200}},
output_fields=["text", "metadata"]
)
# 阶段 2 精排 Rerank
docs = [c["text"] for c in candidates[0]]
reranked = bge_rerank(query, docs, top_k=top_k_final)
# 返回精排后的结果
return [
{
"text": docs[idx],
"score": float(score),
"metadata": candidates[0][idx]["metadata"]
}
for idx, score in reranked
]
# 4 Rerank 性能优化
class BatchedReranker:
"""大批量请求合并 reduce GPU 利用率"""
def __init__(self, model, batch_size=32):
self.model = model
self.batch_size = batch_size
self.queue = []
self.lock = threading.Lock()
async def rerank(self, query, docs):
future = asyncio.Future()
with self.lock:
self.queue.append((query, docs, future))
if len(self.queue) >= self.batch_size:
self.flush()
return await future
def flush(self):
batch = self.queue[:self.batch_size]
self.queue = self.queue[self.batch_size:]
all_pairs = []
boundaries = [0]
for q, ds, _ in batch:
all_pairs.extend([[q, d] for d in ds])
boundaries.append(len(all_pairs))
scores = self.model.predict(all_pairs)
for i, (_, _, future) in enumerate(batch):
start, end = boundaries[i], boundaries[i+1]
future.set_result(scores[start:end])
# 5 Rerank 模型对比
"""
模型 推理速度 准确率 部署
cohere-rerank-v3 远程 API 高 无运维(贵)
bge-reranker-large 本地 GPU 很高 自部署(免费)
bge-reranker-base 本地 GPU 高 自部署 更快
jina-reranker-v2-base-multilingual 本地 GPU 高 自部署
mxbai-rerank-large-v1 本地 GPU 高 自部署 多语言
ColBERT(BGE-M3) 本地 GPU 极高 自部署 复杂
推荐
中文场景 bge-reranker-large 自部署
多语言场景 mxbai-rerank-large-v1 / jina-reranker-v2
省事场景 cohere-rerank-v3
"""
# 6 监控 rerank 效果
class RerankMonitor:
def __init__(self):
self.recall_before = []
self.recall_after = []
def log(self, query, candidates, reranked, ground_truth):
# 粗排召回率
cand_ids = [c["id"] for c in candidates]
recall_b = len(set(cand_ids) & set(ground_truth)) / len(ground_truth)
# 精排后 top-k 命中率
rerank_ids = [r["id"] for r in reranked]
recall_a = len(set(rerank_ids) & set(ground_truth)) / len(ground_truth)
self.recall_before.append(recall_b)
self.recall_after.append(recall_a)
def report(self):
return {
"avg_recall_before": np.mean(self.recall_before),
"avg_recall_after": np.mean(self.recall_after),
"improvement": np.mean(self.recall_after) - np.mean(self.recall_before)
}
实战经验:rerank 必上 否则 LLM 拿到一堆相关但不精的文档 答案质量差;cohere-rerank 省事但贵 月 200 美元起 自部署 bge-reranker-large 性能相当且免费;粗排召回 100 + 精排 10 是黄金组合 召回 50 不够 召回 200 浪费;rerank GPU 推理 batched 处理 单 A10 能跑 100 QPS;监控粗排与精排召回率差 持续优化。我们加 rerank 后 LLM 答案准确率从 75% 涨到 92%。
六 持续评估与生产化运维
RAG 不评估等于盲调 离线 benchmark + 在线 A/B + 用户反馈三层评估闭环是迭代基础。生产化还要考虑增量更新 / 数据漂移 / 成本治理。
# 1 离线评估 召回率与端到端
def eval_retrieval(test_set, retriever):
metrics = {"recall@5": [], "recall@10": [], "mrr": [], "ndcg": []}
for case in test_set:
results = retriever.retrieve(case["query"], top_k=10)
result_ids = [r["id"] for r in results]
relevant_ids = case["relevant_ids"]
# Recall@k
metrics["recall@5"].append(
len(set(result_ids[:5]) & set(relevant_ids)) / len(relevant_ids)
)
metrics["recall@10"].append(
len(set(result_ids[:10]) & set(relevant_ids)) / len(relevant_ids)
)
# MRR
for i, rid in enumerate(result_ids):
if rid in relevant_ids:
metrics["mrr"].append(1 / (i + 1))
break
else:
metrics["mrr"].append(0)
# NDCG@10
from sklearn.metrics import ndcg_score
y_true = [[1 if r in relevant_ids else 0 for r in result_ids]]
y_score = [[10 - i for i in range(10)]]
metrics["ndcg"].append(ndcg_score(y_true, y_score))
return {k: np.mean(v) for k, v in metrics.items()}
# 2 端到端 RAG 评估(RAGAs 框架)
from ragas import evaluate
from ragas.metrics import (
faithfulness, # 答案是否忠于检索内容
answer_relevancy, # 答案是否回答问题
context_precision, # 检索内容精度
context_recall, # 检索内容召回
answer_similarity, # 答案语义相似度
)
results = evaluate(
dataset=test_dataset, # {question, answer, contexts, ground_truth}
metrics=[faithfulness, answer_relevancy, context_precision, context_recall]
)
# 3 增量更新 新文档实时入库
def incremental_index(new_docs):
# 切 chunk
chunks = []
for doc in new_docs:
for chunk in chunker.chunk(doc["text"]):
chunks.append({
**chunk,
"doc_id": doc["id"],
"indexed_at": time.time()
})
# 批量 embed
texts = [c["text"] for c in chunks]
embeddings = embed_model.encode(texts, batch_size=64, show_progress_bar=True)
# 入库
items = [
{**chunk, "embedding": emb}
for chunk, emb in zip(chunks, embeddings)
]
vector_db.insert(collection_name="cases", data=items)
# 4 旧文档删除与更新
def update_document(doc_id, new_text):
# 先删旧 chunk
vector_db.delete(
collection_name="cases",
filter=f'doc_id == "{doc_id}"'
)
# 再插新 chunk
incremental_index([{"id": doc_id, "text": new_text}])
增量更新搞定了"新数据进来"的问题,但 RAG 系统真正难的不是"加数据",而是判断旧索引是不是还有效。query 分布在变,业务术语在变,法律条文在更新,半年前 92% 召回的系统现在可能悄悄掉到 80% 你都不知道。下面这套是我们在生产上跑的漂移监测 + 成本治理 + 缓存层组合,把"看似稳定其实在退化"的盲区彻底点亮。
# 5 数据漂移监测
class DriftMonitor:
def __init__(self, baseline_queries):
self.baseline_emb = embed_model.encode(baseline_queries)
def check(self, recent_queries):
recent_emb = embed_model.encode(recent_queries)
# 与 baseline 分布对比
from scipy.spatial.distance import jensenshannon
# 简化 用 cosine sim 平均
sims = [self.baseline_emb @ qe for qe in recent_emb]
avg_sim = np.mean([np.mean(s) for s in sims])
if avg_sim < 0.6:
alert("query 分布显著漂移 考虑重新评估或微调")
return avg_sim
# 6 成本治理
class CostTracker:
def __init__(self):
self.costs = defaultdict(float)
def track_embedding(self, n_tokens, model="bge-large-zh-v1.5"):
if model.startswith("text-embedding"):
cost_per_1m = 0.13 if "large" in model else 0.02
self.costs["embedding"] += n_tokens / 1e6 * cost_per_1m
# 自部署模型成本算 GPU hour
def track_rerank(self, n_pairs, model="cohere-rerank-v3"):
if model.startswith("cohere"):
self.costs["rerank"] += n_pairs / 1000 * 1.0 # $1/1k pairs
def report(self):
return dict(self.costs)
# 7 缓存层 减少重复 embedding
from cachetools import TTLCache
import hashlib
embed_cache = TTLCache(maxsize=10000, ttl=3600)
def cached_embed(text):
key = hashlib.md5(text.encode()).hexdigest()
if key in embed_cache:
return embed_cache[key]
emb = embed_model.encode([text])[0]
embed_cache[key] = emb
return emb
# 高频重复 query 缓存命中率 50%+ embed API 调用量减半
实战经验:离线评估必须 recall + MRR + NDCG 多指标 单指标有盲区;RAGAs 框架做端到端评估 答案质量 + 检索质量都管;增量更新必须支持 不能每次全量重 embed;数据漂移监测每周跑 query 分布偏移 6+ 个月就该重新评估模型;成本治理 embed 与 rerank 大头 缓存 + batch 能省 50%;父文档检索 + 缓存 + hybrid search 是生产 RAG 三件套。我们一套体系下来 5 万判例 RAG 月成本从 800 美元降到 120 美元 召回率 92% 律师满意度 95%。
关键概念速查
| 概念 | 关键参数/工具 | 推荐 | 备注 |
|---|---|---|---|
| Embedding 模型 | 自家数据 benchmark | 必做 | 中文 bge-large-zh SOTA |
| Chunk 切分 | 父子文档 child300 parent1500 | 推荐 | 固定切分必死 |
| 向量库选型 | 百万级 Milvus 小数据 PgVector | 按规模 | 避免锁定 |
| HNSW 参数 | M=32 efConstruction=200 ef=200 | 必调 | 默认太低 |
| Hybrid Search | Dense + BM25 + RRF | 必上 | 关键词 + 语义 |
| Rerank | bge-reranker-large 自部署 | 必上 | top100 → top10 |
| 元数据索引 | Trie / STL_SORT | 必建 | 否则 HNSW 失效 |
| Partition | 按高频过滤维度 | 推荐 | 提速 10x |
| RAGAs 评估 | faithfulness + relevancy | 必跑 | 端到端质量 |
| Embedding 缓存 | TTLCache 1h | 必做 | 省 50% API 调用 |
避坑清单
- 不要默认 OpenAI ada-002 中文场景必换 bge-large-zh-v1.5。
- 不要固定长度切 chunk 必须 RecursiveCharacterTextSplitter 起步。
- 不要小数据用 Milvus 杀鸡用牛刀 PgVector 已经够用。
- 不要默认 HNSW M=16 生产至少 M=32 efConstruction=200。
- 不要纯向量检索 必须 hybrid (向量 + BM25) 否则关键词无能。
- 不要直接拿向量召回喂 LLM 必须 rerank 精排到 top 10。
- 不要不建标量索引就 metadata 过滤 直接全表扫崩。
- 不要每次 query 都 embed 必须缓存层省 50% 成本。
- 不要不做端到端 RAGAs 评估 单看召回率有盲区。
- 不要切换 embedding 模型不重新 embed 维度对不上直接报错。
总结
把 embedding 与向量检索这套从我们踩过的所有坑里反过来看 你会发现真正影响 RAG 准确率的不是 LLM 模型大小 而是检索系统的工程化深度。同样一个 GPT-4 + 5 万判例 用 ada-002 + Pinecone + 固定切分 + 纯向量 召回 70% 答案质量差 律师骂街;换 bge-large-zh + Milvus + 父子切分 + hybrid + rerank + 父文档展开 召回 92% 答案精准 律师满意度飙升。RAG 不是"喂数据给 LLM"的活儿 它是一个 embedding 选型 + chunk 切分 + 向量库工程化 + 索引调参 + 混合检索 + 重排 + 元数据过滤 + 持续评估的完整系统工程。
另一个常见的认知误区是把 RAG 当一次性工程 觉得入库一次就完事。但事实是 RAG 是一个持续演进的循环 数据会增 模型会优化 query 分布会漂移 不持续评估和调参 三个月前的"高准确率"系统现在已经悄悄退化。RAG 工程化的核心是 把检索当一个独立的子系统 建立数据-索引-检索-评估-反馈的完整闭环 用工程化手段保证质量不退步。
打个比方 RAG 检索系统像一个专业图书馆。Embedding 模型是图书管理员的专业素养(普通管理员只会按书名查 专业管理员能按主题/作者/年代灵活查)Chunk 切分是分类编目规则(按章节切还是按页切影响检索精度)向量库是书架与索引系统(开架还是闭架 索引粒度多细)HNSW 参数是查找算法的速度精度权衡(随便翻还是精确定位)Hybrid Search 是关键词检索 + 主题分类双管齐下(读者只记得书名片段也能查到)Rerank 是经验丰富的管理员二次推荐(粗筛 100 本精选 10 本)元数据过滤是楼层分馆筛选(只查法学院分馆)持续评估是读者满意度调查(知道哪些查询效果差)。哪一环没做 这个图书馆可能能借出书 但读者经常找不到想要的 要么相关性差 要么遗漏关键文献 要么响应慢。
所以下一次再有人跟你说"做 RAG 就 embedding + 向量库"你可以反问他 模型 benchmark 了吗 chunk 怎么切 hybrid 上了吗 rerank 上了吗 metadata 索引建了吗 partition 划了吗 RAGAs 跑了吗 缓存做了吗 这些工作没做完 RAG 只是一个能跑通 demo 的玩具 不是一个能在客户那里稳定服务的智能检索系统。从踩坑到投产 中间隔着一整套检索工程方法论 这条路没有捷径 但走完之后 你的 RAG 会从"返回的东西全是似是而非"变成"律师/医生/分析师真正离不开的工作助手" 从每月成本几千美元变成几百美元 从准确率 60% 变成 92% 投产即用。
—— 别看了 · 2026