多 Agent 协作与工具调用编排完全指南:从一次"7 个 tool 塞一个 prompt 用户问账单 Agent 去查工单"看懂为什么 LangChain ReAct 远远不够

2024 年我们给一家 SaaS 公司做 AI 客户成功助手业务场景是客户提一个问题比如我的账单为什么多了 300 美元系统需要调用账单 API 加用量 API 加工单系统加知识库综合给出解释第一版用 LangChain ReAct Agent 一个大 prompt 塞所有 tool description 跑通 demo 老板说 AI 真神奇上生产第二天就崩了第一种最让我傻眼是 ReAct loop 失控一个简单问题 Agent 来回调 7 次工具最后才回答 token 烧爆单次成本 0 4 美元用户体验 30 秒等待第二种最难缠是 tool 选择错乱 7 个 tool 塞在一个 prompt 里 GPT 4 经常选错用户问账单它去查工单用户问用量它去查权限第三种最离谱是 tool 调用参数 hallucinate Agent 给账单 API 传一个不存在的 user id API 返回 404 Agent 又重试 5 次最后说系统出错第四种最致命是错误恢复一个 tool 失败整个 Agent 链中断用户拿不到任何答案而不是 fallback 到部分结果加道歉第五种最莫名其妙是状态管理多轮对话第二轮 Agent 忘了第一轮的上下文用户骂我刚才说的你都不记得第六种最坑是并发与速率限制 200 个用户同时用 OpenAI rate limit 撞墙客户全在等

2024 年我们给一家 SaaS 公司做"AI 客户成功助手" 业务场景是客户提一个问题(比如"我的账单为什么多了 300 美元")系统需要调用账单 API + 用量 API + 工单系统 + 知识库 综合给出解释。第一版用 LangChain ReAct Agent 一个大 prompt 塞所有 tool description 跑通 demo 老板说"AI 真神奇" 上生产第二天就崩了。第一种最让我傻眼是 ReAct loop 失控 一个简单问题 Agent 来回调 7 次工具最后才回答 token 烧爆 单次成本 0.4 美元 用户体验 30 秒等待;第二种最难缠是 tool 选择错乱 7 个 tool 塞在一个 prompt 里 GPT-4 经常选错 用户问"账单"它去查"工单"用户问"用量"它去查"权限";第三种最离谱是 tool 调用参数 hallucinate Agent 给账单 API 传一个不存在的 user_id="xxx-yyy-zzz" API 返回 404 Agent 又重试 5 次最后说"系统出错";第四种最致命是错误恢复 一个 tool 失败整个 Agent 链中断 用户拿不到任何答案 而不是 fallback 到部分结果 + 道歉;第五种最莫名其妙是状态管理 多轮对话第二轮 Agent 忘了第一轮的上下文 用户骂"我刚才说的你都不记得";第六种最坑是并发与速率限制 200 个用户同时用 OpenAI rate limit 撞墙 客户全在等。真正能投产的多 Agent 系统是子 Agent 拆分 + 状态机编排 + tool schema 严格校验 + 错误降级 + 短期/长期记忆分层 + token 与速率治理 + 端到端 trace 的完整工程体系,任何一环失守都可能让你的 AI 助手从"看起来智能"变成"用户骂 AI 还不如人工"。本文从踩坑视角梳理多 Agent 协作的工程要点,Agent 怎么拆 tool 怎么管 状态怎么存 错误怎么降 token 怎么省 监控怎么建,以及一些把 Agent 系统做扎实要避开的工程坑。

问题背景:为什么单 Agent + ReAct 远远不够

很多团队跟着 LangChain 教程做 Agent demo 一个 LLM + 一堆 tool 跑通就上线 但生产化 Agent 远比想象的复杂:

  • Agent 拆分:单 Agent 大 prompt 一定 tool 选错 必须按职责拆 supervisor + specialist。
  • Tool schema:JSON schema 严格校验 + 参数 enum 限制 否则 hallucinate 参数。
  • 状态管理:短期 memory + 长期 vector store 分层 否则用户上下文丢失。
  • 错误降级:tool 失败要 fallback 不要整链断;LLM 失败要重试 + 模型切换。
  • Token 治理:context 压缩 + tool 结果裁剪 + 模型分级 否则成本爆炸。
  • 并发控制:LLM API 速率限制 + 用户 quota + 队列削峰 否则线上撞墙。

一 Agent 拆分:Supervisor + Specialist 模式

单 Agent + 多 tool 在 tool 数超过 5 个后准确率断崖式下降 必须拆成 supervisor 路由 + specialist 专精的多 Agent 架构。

# 1 LangGraph 多 Agent 架构(2024 主流)
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, List
import operator

class AgentState(TypedDict):
    messages: Annotated[List, operator.add]
    next_agent: str
    user_id: str
    customer_context: dict
    final_answer: str

# Supervisor 只负责路由 不做业务
def supervisor_node(state: AgentState):
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    prompt = f"""你是客户成功助手的总调度。根据用户问题决定调用哪个专家。

用户问题:{state['messages'][-1].content}

可用专家:
- billing_agent  账单相关 计费 发票 退款
- usage_agent  用量相关 API 调用量 配额 性能
- support_agent  工单相关 历史问题 已知故障
- knowledge_agent  产品文档 使用方法

只输出专家名字 不解释。"""
    decision = llm.invoke(prompt).content.strip()
    return {"next_agent": decision}

# Specialist Agent 专精一个领域
def billing_agent_node(state: AgentState):
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    tools = [get_invoice, get_payment_history, calculate_refund]
    agent = create_tool_calling_agent(llm, tools)
    result = agent.invoke({
        "input": state['messages'][-1].content,
        "user_id": state['user_id']
    })
    return {"messages": [AIMessage(content=result['output'])],
            "next_agent": "finalize"}

# 类似定义 usage_agent_node / support_agent_node / knowledge_agent_node ...

# 终结节点 整合输出
def finalize_node(state: AgentState):
    return {"final_answer": state['messages'][-1].content}

# 构建图
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("billing_agent", billing_agent_node)
graph.add_node("usage_agent", usage_agent_node)
graph.add_node("support_agent", support_agent_node)
graph.add_node("knowledge_agent", knowledge_agent_node)
graph.add_node("finalize", finalize_node)

graph.set_entry_point("supervisor")

# 条件路由
graph.add_conditional_edges(
    "supervisor",
    lambda s: s["next_agent"],
    {
        "billing_agent": "billing_agent",
        "usage_agent": "usage_agent",
        "support_agent": "support_agent",
        "knowledge_agent": "knowledge_agent",
    }
)
graph.add_edge("billing_agent", "finalize")
graph.add_edge("usage_agent", "finalize")
graph.add_edge("support_agent", "finalize")
graph.add_edge("knowledge_agent", "finalize")
graph.add_edge("finalize", END)

app = graph.compile()

# 2 多 Agent 协作(需多个专家协同)
class MultiExpertState(TypedDict):
    messages: Annotated[List, operator.add]
    plan: List[str]
    completed: List[str]
    results: dict

def planner_node(state: MultiExpertState):
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    plan = llm.invoke(f"""把问题拆解成子任务序列 每个子任务对应一个专家:
问题:{state['messages'][-1].content}

输出 JSON 数组 例如 ["billing_agent:查询本月账单","usage_agent:对比上月用量"]""").content
    import json
    return {"plan": json.loads(plan)}

def execute_next(state: MultiExpertState):
    pending = [p for p in state['plan'] if p not in state['completed']]
    if not pending:
        return {"next_agent": "synthesizer"}
    next_task = pending[0]
    agent_name = next_task.split(":")[0]
    return {"next_agent": agent_name}

# Supervisor 路由 + planner 拆解 + 各 specialist 执行 + synthesizer 整合

# 3 子 Agent 之间的消息隔离
# 子 Agent 不要直接看到父 Agent 的全部历史
# 只传递必要的 context
def filtered_messages_for_agent(state, agent_name):
    # 只传相关的最近 3 轮 + supervisor 摘要
    summary = summarize_for(state['messages'], agent_name)
    recent = state['messages'][-3:]
    return [SystemMessage(content=summary)] + recent

实战经验:Agent 拆分是大于 5 个 tool 必做 否则 tool 选择准确率从 95% 掉到 60%;LangGraph 比 LangChain Agent 灵活得多 状态机控制流可视化可调试;supervisor 用 gpt-4o-mini 就够 不需要 gpt-4o 路由决策很简单;子 Agent 消息隔离 + 摘要传递 比塞全部历史省 70% token;planner-executor-synthesizer 三段式适合复杂任务 simple 任务 supervisor + specialist 两段就行。我们 Agent 拆分后 tool 选择准确率从 65% 涨到 92% token 消耗减半。

二 Tool Schema 与参数校验

LLM 调 tool 经常 hallucinate 参数 必须用严格的 JSON schema 校验 + enum 限制 + 失败重试 + 参数 typo 检测。

# 1 严格 schema 定义(Pydantic)
from pydantic import BaseModel, Field, validator
from typing import Literal, Optional
from datetime import date

class GetInvoiceInput(BaseModel):
    """获取用户某月发票"""
    user_id: str = Field(
        description="用户 ID 格式 user_xxx",
        pattern=r"^user_[a-z0-9]{8,}$"  # 严格 pattern
    )
    year: int = Field(ge=2020, le=2030, description="年份")
    month: int = Field(ge=1, le=12, description="月份 1-12")
    invoice_type: Literal["regular", "credit", "debit"] = Field(
        default="regular",
        description="发票类型"
    )

    @validator("user_id")
    def validate_user_id(cls, v):
        if not v.startswith("user_"):
            raise ValueError("user_id 必须以 user_ 开头")
        return v

# 2 注册为 LangChain Tool
from langchain_core.tools import tool

@tool(args_schema=GetInvoiceInput)
def get_invoice(user_id: str, year: int, month: int, invoice_type: str = "regular") -> dict:
    """获取用户某月发票详细信息 包含金额 项目 应付日期"""
    return billing_api.get_invoice(user_id, year, month, invoice_type)

# 3 OpenAI function calling 自动转 schema
tools_schema = [convert_to_openai_tool(get_invoice)]

llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools([get_invoice])

# 4 参数 hallucinate 检测与重试
class ToolCallValidator:
    def __init__(self, max_retries=3):
        self.max_retries = max_retries

    async def invoke_with_retry(self, llm, messages):
        for attempt in range(self.max_retries):
            response = await llm.ainvoke(messages)
            if not response.tool_calls:
                return response

            valid_calls = []
            errors = []
            for call in response.tool_calls:
                try:
                    # Pydantic 自动校验
                    tool_schema = get_tool_schema(call['name'])
                    validated = tool_schema(**call['args'])
                    valid_calls.append((call['name'], validated.dict()))
                except ValidationError as e:
                    errors.append(f"tool={call['name']} error={e}")

            if not errors:
                return await execute_tools(valid_calls)

            # 把错误反馈给 LLM 让它修正
            messages.append(response)
            messages.append(SystemMessage(content=(
                f"tool 参数校验失败 错误如下\n{chr(10).join(errors)}\n"
                f"请检查参数格式 重新生成 tool call"
            )))
        raise RuntimeError("tool call validation failed after retries")

# 5 防御 SQL/路径注入
class SafeQueryInput(BaseModel):
    table: Literal["orders", "users", "products"]   # 严格枚举
    filters: dict = Field(description="过滤条件")

    @validator("filters")
    def validate_filters(cls, v):
        for key in v.keys():
            # 字段名只允许字母数字下划线
            if not key.replace("_", "").isalnum():
                raise ValueError(f"非法字段名 {key}")
            # 防 SQL 关键字
            if any(kw in str(v[key]).upper() for kw in ["DROP", "DELETE", "UPDATE", ";"]):
                raise ValueError(f"非法值 {v[key]}")
        return v

# 6 Tool 描述与示例(few-shot 提升准确率)
@tool(args_schema=GetInvoiceInput)
def get_invoice(...) -> dict:
    """获取用户某月发票详细信息。

    使用场景
    - 用户问"我的账单是多少"
    - 用户问"为什么这个月扣了 300 美元"

    不要用于
    - 历史付款记录(用 get_payment_history)
    - 退款(用 calculate_refund)

    示例
    用户问"6 月账单":get_invoice(user_id="user_abc123", year=2024, month=6)
    """
    ...

# 7 Tool 返回结果裁剪(节省 token)
def get_invoice_compact(user_id, year, month):
    result = billing_api.get_invoice(user_id, year, month)
    # 只返回 LLM 真正需要的字段 而不是 200 字段大对象
    return {
        "total": result["total_amount"],
        "currency": result["currency"],
        "due_date": result["due_date"].isoformat(),
        "line_items": [
            {"name": item["name"], "amount": item["amount"]}
            for item in result["items"][:10]   # 只取前 10 行
        ],
        "_note": "如需完整发票 请调 get_invoice_detail"
    }

实战经验:Pydantic schema + 严格 pattern 是 hallucinate 参数的克星;Literal/enum 限制让 LLM 没法瞎填;参数校验失败要把错误反馈给 LLM 让它重新生成 不要直接报错;tool description 必须写"使用场景"和"不要用于" 否则 LLM 选错 tool;tool 返回结果必须裁剪 200 字段大对象会把 context window 撑爆;防御注入 字段名白名单 + 关键字 blacklist 必须做。我们加 schema 校验后 tool 调用错误率从 18% 降到 0.5%。

三 状态与记忆管理

多轮对话必须有短期 memory(最近 N 轮)+ 长期 memory(用户偏好/历史)分层 否则用户上下文丢失。

# 1 短期 memory checkpoint(LangGraph 内置)
from langgraph.checkpoint.postgres import PostgresSaver

checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = graph.compile(checkpointer=checkpointer)

# 每次调用传 thread_id 自动恢复状态
config = {"configurable": {"thread_id": f"user_{user_id}_session_{session_id}"}}
result = app.invoke(
    {"messages": [HumanMessage("我上个月账单为什么这么贵")]},
    config=config
)

# 第二轮 自动恢复第一轮 state
result2 = app.invoke(
    {"messages": [HumanMessage("能退款吗")]},
    config=config
)

# 2 长期 memory(vector store)
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

long_term_memory = Chroma(
    collection_name="user_memory",
    embedding_function=OpenAIEmbeddings(model="text-embedding-3-small"),
    persist_directory="./memory_db"
)

def remember(user_id: str, fact: str):
    """LLM 主动判断"这个值得记" 调此函数"""
    long_term_memory.add_texts(
        texts=[fact],
        metadatas=[{"user_id": user_id, "timestamp": time.time()}]
    )

def recall(user_id: str, query: str, k=5) -> List[str]:
    """检索相关记忆"""
    results = long_term_memory.similarity_search(
        query,
        k=k,
        filter={"user_id": user_id}
    )
    return [r.page_content for r in results]

# 3 自动提炼对话事实(每轮结束跑)
def extract_facts(conversation: str) -> List[str]:
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    prompt = f"""从对话中提炼值得长期记忆的用户事实(偏好/习惯/上下文)
不要记 ephemeral 信息(如本轮问题答案)

对话:
{conversation}

输出 JSON 数组 例如:
["用户使用企业版 Pro 套餐","用户公司在欧洲 需要欧元发票","用户对自动续费敏感"]"""
    return json.loads(llm.invoke(prompt).content)

# 在对话结束时
facts = extract_facts(serialize_messages(state['messages']))
for f in facts:
    remember(user_id, f)

# 4 context 窗口管理(滑动 + 摘要)
class ConversationManager:
    def __init__(self, max_recent=10, summary_threshold=20):
        self.max_recent = max_recent
        self.summary_threshold = summary_threshold

    def get_context(self, all_messages, user_id):
        if len(all_messages) <= self.max_recent:
            return all_messages

        # 超过阈值 旧消息摘要
        if len(all_messages) > self.summary_threshold:
            old = all_messages[:-self.max_recent]
            recent = all_messages[-self.max_recent:]
            summary = self.summarize(old)
            relevant_memories = recall(user_id, recent[-1].content)
            return [
                SystemMessage(content=f"对话历史摘要:{summary}"),
                SystemMessage(content=f"相关用户记忆:{'; '.join(relevant_memories)}"),
                *recent
            ]
        return all_messages

    def summarize(self, messages):
        llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
        text = "\n".join([f"{m.type}: {m.content}" for m in messages])
        return llm.invoke(f"用 200 字摘要以下对话 保留关键事实:\n{text}").content

# 5 用户级偏好(结构化存储)
class UserPreference(BaseModel):
    user_id: str
    language: str = "zh-CN"
    timezone: str = "Asia/Shanghai"
    notification: dict = {}
    preferred_response_style: Literal["concise", "detailed"] = "concise"

# 系统 prompt 注入
def build_system_prompt(user_id):
    pref = load_preference(user_id)
    return f"""你是客户成功助手。

用户偏好
- 语言:{pref.language}
- 时区:{pref.timezone}
- 回答风格:{pref.preferred_response_style}"""

# 6 多用户 session 隔离
# 用 thread_id = f"{user_id}:{session_id}" 保证 checkpoint 不串
# 千万别只用 user_id 多 tab 多设备会串台
config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}

实战经验:checkpoint 是多轮对话的基础 用 Postgres saver 不要内存 saver 重启就丢;长期 memory 用 vector store + LLM 主动 remember 不要把全部对话塞进去;事实提炼用 gpt-4o-mini 跑 成本几乎为 0;context 滑动窗口 + 旧消息摘要 是 token 治理的关键;用户偏好结构化存 PG 表 不要全塞 vector store;thread_id 必须 user_id:session_id 否则多设备串台。我们加分层记忆后 用户满意度从 60% 涨到 88%。

四 错误降级与重试策略

tool 失败 / LLM 失败 / 超时都要有 fallback 不能整链断 用户体验是"系统出错了"还是"部分信息 + 道歉"差别巨大。

# 1 Tool 级别 retry 与 fallback
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class APIError(Exception): pass
class TransientError(APIError): pass

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type(TransientError)
)
def call_billing_api(user_id, year, month):
    try:
        return billing_api.get_invoice(user_id, year, month)
    except requests.Timeout:
        raise TransientError("billing API timeout")
    except requests.HTTPError as e:
        if e.response.status_code >= 500:
            raise TransientError(f"billing 5xx {e}")
        raise APIError(f"billing 4xx {e}")

# Fallback 链:主 API -> 缓存 -> 部分数据 -> 空结果 + 致歉
def get_invoice_with_fallback(user_id, year, month):
    try:
        return call_billing_api(user_id, year, month)
    except TransientError:
        # 主 API 挂 用 5 分钟缓存
        cached = cache.get(f"invoice:{user_id}:{year}:{month}")
        if cached:
            return {**cached, "_fallback": "cached_5min_ago"}
        # 缓存也没有 返回最近一次发票总额(部分数据)
        last = get_last_known_invoice(user_id)
        if last:
            return {"total": last["total"], "_fallback": "last_known_invoice"}
        # 兜底 空数据 + 标记
        return {"total": None, "_fallback": "service_unavailable",
                "_user_message": "账单系统暂时无法访问 请稍后再试"}

# 2 LLM 级别 fallback(模型降级)
from langchain.chat_models import init_chat_model

models = [
    init_chat_model("gpt-4o", temperature=0),                  # 主
    init_chat_model("claude-3-5-sonnet-20241022", temperature=0),  # 备
    init_chat_model("gpt-4o-mini", temperature=0),             # 降级
]

async def llm_invoke_with_fallback(messages):
    for i, llm in enumerate(models):
        try:
            return await asyncio.wait_for(llm.ainvoke(messages), timeout=15)
        except (asyncio.TimeoutError, openai.RateLimitError, anthropic.RateLimitError) as e:
            log.warning(f"model {i} failed: {e} - trying next")
            continue
    raise RuntimeError("all models failed")

# 3 Agent 链级 fallback(部分回答总比没回答好)
async def safe_agent_execute(state):
    results = {}
    errors = {}
    for agent in state['planned_agents']:
        try:
            results[agent] = await execute_agent(agent, state)
        except Exception as e:
            errors[agent] = str(e)
            log.error(f"agent {agent} failed", exc_info=True)

    if not results:
        return {"final_answer": "抱歉 系统暂时无法处理您的请求 请稍后再试或联系人工客服。",
                "errors": errors}

    # 部分成功 整合可用结果 + 标注失败部分
    synthesized = synthesize(results)
    if errors:
        synthesized += f"\n\n注:部分信息暂时无法获取({list(errors.keys())}) 我们正在修复。"
    return {"final_answer": synthesized, "errors": errors}

# 4 LLM 输出降级(超时切短 prompt)
async def llm_invoke_with_budget(messages, max_seconds=10):
    try:
        return await asyncio.wait_for(llm.ainvoke(messages), timeout=max_seconds)
    except asyncio.TimeoutError:
        # 简化 prompt 重试 去掉 history 只留当前问题
        simplified = [m for m in messages if isinstance(m, (SystemMessage, HumanMessage))][-2:]
        return await llm.ainvoke(simplified)

单次调用的重试与降级只解决"这一次怎么办" 还有更隐蔽的问题:当上游(账单 API / billing 数据库)持续抛错时 如果每次都老老实实重试 3 次再降级 那么 1 秒内 1000 个用户的请求会瞬间砸上去 1 万次重试调用 反过来把已经不堪重负的上游彻底压垮 形成雪崩。生产系统必须引入 circuit breaker(熔断器) 在上游持续失败时主动短路 给上游一个喘息窗口 同时给用户更快的失败响应。

# 5 Circuit breaker(上游持续失败时短路)
class CircuitBreaker:
    def __init__(self, threshold=5, reset_seconds=60):
        self.failures = 0
        self.last_failure = 0
        self.threshold = threshold
        self.reset_seconds = reset_seconds
        self.state = "closed"   # closed / open / half-open

    def call(self, fn, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure > self.reset_seconds:
                self.state = "half-open"
            else:
                raise RuntimeError("circuit open")
        try:
            result = fn(*args, **kwargs)
            self.failures = 0
            self.state = "closed"
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure = time.time()
            if self.failures >= self.threshold:
                self.state = "open"
                log.error(f"circuit breaker OPEN for {fn.__name__}")
            raise

billing_breaker = CircuitBreaker(threshold=5, reset_seconds=60)

# 用法
def get_invoice_protected(...):
    return billing_breaker.call(call_billing_api, ...)

# 6 用户体验降级文案
FALLBACK_MESSAGES = {
    "billing_down": "账单系统暂时繁忙 我已为您记录了请求 1 小时内回复",
    "all_models_down": "AI 服务暂不可用 已为您转接人工客服",
    "tool_timeout": "查询超时 我可以为您查询更简单的信息或转人工",
}

实战经验:tool retry 必须区分 transient(5xx/timeout)与 permanent(4xx)只重试 transient;LLM fallback 多模型必备 OpenAI rate limit 撞墙时切 Claude;agent 部分失败要 graceful degrade 不要整链断 用户体验差天;circuit breaker 防止雪崩 上游挂 60 秒不再打 给上游喘息;降级文案要人话不要"系统错误 errcode 500"。我们错误降级体系上线后 用户感知的"系统出错"率从 8% 降到 0.3%。

[mermaid]
flowchart TD
A[用户提问] --> B[Supervisor 路由]
B --> C{选择专家}
C -->|账单| D[Billing Agent]
C -->|用量| E[Usage Agent]
C -->|工单| F[Support Agent]
C -->|知识| G[Knowledge Agent]
D --> H{Tool 调用}
H -->|成功| I[结果裁剪]
H -->|超时| J[缓存 fallback]
H -->|失败| K[Circuit Breaker]
J --> I
K --> L[降级答案]
I --> M[Synthesizer 整合]
L --> M
M --> N[输出给用户]
N --> O[长期记忆提炼]
O --> P[Checkpoint 保存]

五 Token 与成本治理

Agent 系统跑起来烧 token 很猛 必须从 context 压缩 + tool 结果裁剪 + 模型分级 + 缓存四方面同时治理。

# 1 Tiktoken 精确计算 token
import tiktoken

def count_tokens(text: str, model="gpt-4o") -> int:
    enc = tiktoken.encoding_for_model(model)
    return len(enc.encode(text))

def estimate_messages_tokens(messages, model="gpt-4o") -> int:
    return sum(count_tokens(m.content, model) for m in messages) + len(messages) * 4

# 2 Context 压缩(超阈值自动摘要)
class ContextCompressor:
    def __init__(self, max_tokens=8000):
        self.max_tokens = max_tokens

    def compress(self, messages):
        current = estimate_messages_tokens(messages)
        if current <= self.max_tokens:
            return messages

        # 保留 system + 最近 5 条 中间压成摘要
        system = [m for m in messages if isinstance(m, SystemMessage)]
        recent = messages[-5:]
        middle = messages[len(system):-5]

        summary = summarize(middle)
        compressed = system + [SystemMessage(f"早期对话摘要:{summary}")] + recent
        log.info(f"context compressed: {current} -> {estimate_messages_tokens(compressed)}")
        return compressed

# 3 Tool 结果裁剪(只给 LLM 真正需要的)
def smart_truncate_tool_result(result, max_tokens=500):
    if isinstance(result, dict):
        # 大对象只保留关键字段
        important_keys = ["id", "name", "status", "amount", "total", "summary"]
        compact = {k: v for k, v in result.items() if k in important_keys}
        if count_tokens(json.dumps(compact)) > max_tokens:
            compact["summary"] = summarize_dict(result)
            return compact
        return compact
    if isinstance(result, list):
        if len(result) > 10:
            return result[:10] + [f"... ({len(result) - 10} more items truncated)"]
    return result

# 4 模型分级路由(简单任务用便宜模型)
MODEL_TIERS = {
    "simple": "gpt-4o-mini",      # 路由 / 摘要 / 简单分类 $0.15/1M
    "medium": "gpt-4o",            # 多 tool 推理 / 复杂回答 $2.5/1M
    "complex": "claude-3-5-sonnet-20241022",  # 长 context 深度推理 $3/1M
}

def classify_task_complexity(question, context_size):
    if context_size > 100_000:
        return "complex"
    word_count = len(question.split())
    if word_count < 20 and "?" in question:
        return "simple"
    return "medium"

# 5 LLM 响应缓存(高频 query 命中)
from functools import lru_cache
import hashlib

class LLMCache:
    def __init__(self, redis_client, ttl=3600):
        self.redis = redis_client
        self.ttl = ttl

    def get_key(self, messages, model):
        content = json.dumps([{"role": m.type, "content": m.content} for m in messages])
        return f"llm:{model}:{hashlib.sha256(content.encode()).hexdigest()[:16]}"

    async def invoke(self, llm, messages):
        # 只缓存 temperature=0 且不含时间敏感词
        if not is_cacheable(messages):
            return await llm.ainvoke(messages)
        key = self.get_key(messages, llm.model_name)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        result = await llm.ainvoke(messages)
        self.redis.setex(key, self.ttl, json.dumps({"content": result.content}))
        return result

def is_cacheable(messages):
    content = " ".join(m.content for m in messages).lower()
    # 含 "今天" "现在" "实时" 等不缓存
    return not any(kw in content for kw in ["今天", "现在", "实时", "刚才", "最新"])

# 6 成本监控与限额
class TokenBudget:
    def __init__(self, redis):
        self.redis = redis

    def check_and_consume(self, user_id, tokens, model):
        cost = self.calculate_cost(tokens, model)
        daily_key = f"cost:{user_id}:{date.today()}"
        current = float(self.redis.get(daily_key) or 0)

        if current + cost > self.get_user_limit(user_id):
            raise QuotaExceededError(f"daily limit reached: ${current:.2f}")
        self.redis.incrbyfloat(daily_key, cost)
        self.redis.expire(daily_key, 86400 * 2)

    def calculate_cost(self, tokens, model):
        rates = {"gpt-4o": 2.5, "gpt-4o-mini": 0.15, "claude-3-5-sonnet-20241022": 3.0}
        return tokens / 1_000_000 * rates[model]

# 7 端到端 token 统计(Langfuse 或自建)
class TokenTracker:
    def __init__(self):
        self.usage = []

    def log(self, agent, model, prompt_tokens, completion_tokens):
        self.usage.append({
            "agent": agent,
            "model": model,
            "prompt": prompt_tokens,
            "completion": completion_tokens,
            "cost": (prompt_tokens / 1e6 * input_rate(model) +
                     completion_tokens / 1e6 * output_rate(model)),
            "timestamp": time.time()
        })

    def report(self):
        return {
            "total_cost": sum(u['cost'] for u in self.usage),
            "by_agent": group_by_sum(self.usage, "agent", "cost"),
            "by_model": group_by_sum(self.usage, "model", "cost"),
        }

实战经验:tiktoken 精确算 token 是治理起点 不要拍脑袋估;context 压缩 + tool 结果裁剪是 token 优化的核心 平均能省 60%;模型分级路由 80% 任务 gpt-4o-mini 跑得起 只有 20% 需要 gpt-4o;LLM 响应缓存对客服场景命中率 30%+ 几乎免费;每用户每天限额必上 否则一个恶意/异常用户能烧掉一个月预算;Langfuse / Helicone 是 LLM 可观测性的好工具。我们 token 治理后 单用户月成本从 8 美元降到 1.5 美元。

六 并发、监控与 trace

多用户并发 + LLM API rate limit + 端到端 trace 是生产 Agent 稳定运行的最后一公里。

# 1 并发控制(用户级 + 全局级)
import asyncio
from collections import defaultdict

class RateLimiter:
    def __init__(self, global_qps=50, per_user_qps=2):
        self.global_sem = asyncio.Semaphore(global_qps)
        self.user_sems = defaultdict(lambda: asyncio.Semaphore(per_user_qps))

    async def acquire(self, user_id):
        await self.global_sem.acquire()
        await self.user_sems[user_id].acquire()

    def release(self, user_id):
        self.global_sem.release()
        self.user_sems[user_id].release()

# 用法
@asynccontextmanager
async def rate_limited(user_id):
    await limiter.acquire(user_id)
    try:
        yield
    finally:
        limiter.release(user_id)

async def handle_request(user_id, question):
    async with rate_limited(user_id):
        return await app.ainvoke({"messages": [HumanMessage(question)]}, ...)

# 2 队列削峰(突发流量)
from asyncio import Queue

class RequestQueue:
    def __init__(self, max_workers=20, max_queue=200):
        self.queue = Queue(maxsize=max_queue)
        self.workers = [asyncio.create_task(self.worker()) for _ in range(max_workers)]

    async def submit(self, task):
        try:
            self.queue.put_nowait(task)
        except asyncio.QueueFull:
            raise TooManyRequestsError("队列已满 请稍后再试")

    async def worker(self):
        while True:
            task = await self.queue.get()
            try:
                await task()
            except Exception:
                log.exception("task failed")
            finally:
                self.queue.task_done()

# 3 OpenTelemetry trace(端到端追踪)
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="otel-collector:4317"))
)
tracer = trace.get_tracer("ai-assistant")

async def handle_with_trace(user_id, question):
    with tracer.start_as_current_span("user_request") as span:
        span.set_attribute("user_id", user_id)
        span.set_attribute("question", question)
        try:
            result = await app.ainvoke(...)
            span.set_attribute("answer_length", len(result['final_answer']))
            return result
        except Exception as e:
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            raise

# Agent 内部
async def billing_agent_node_traced(state):
    with tracer.start_as_current_span("billing_agent") as span:
        with tracer.start_as_current_span("llm_call") as llm_span:
            response = await llm.ainvoke(...)
            llm_span.set_attribute("prompt_tokens", response.usage.prompt_tokens)
            llm_span.set_attribute("completion_tokens", response.usage.completion_tokens)
        for call in response.tool_calls:
            with tracer.start_as_current_span(f"tool:{call['name']}"):
                result = await execute_tool(call)
        return {...}

# 4 Langfuse 集成(LLM 专用可观测性)
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

langfuse = Langfuse(public_key="pk-...", secret_key="sk-...")

@observe(as_type="generation")
async def llm_call_traced(messages, model):
    response = await llm.ainvoke(messages)
    langfuse_context.update_current_observation(
        input=messages,
        output=response.content,
        model=model,
        usage={
            "input": response.usage.prompt_tokens,
            "output": response.usage.completion_tokens
        }
    )
    return response

OTEL 解决系统层 trace、Langfuse 解决 LLM prompt/completion 可观测 但这两层都偏"细节" 当你早上一打开仪表盘想知道"昨晚整体怎么样"时 需要的是聚合后的业务指标:billing agent 的请求量在哪个时段最高 P99 延迟是不是周末更慢 哪个 tool 错误率突然飙升 哪个用户在 24 小时内提交了 500 次请求触发限额。下面这套 Prometheus 业务指标 + 用户反馈闭环 + 离线 eval 是把 trace 数据变成"运营级洞察"的关键拼图。

# 5 业务指标(Prometheus)
from prometheus_client import Counter, Histogram

agent_requests = Counter("agent_requests_total", "Total agent requests", ["agent", "status"])
agent_latency = Histogram("agent_latency_seconds", "Agent latency", ["agent"])
tool_calls = Counter("tool_calls_total", "Tool calls", ["tool", "status"])

async def billing_agent_metrics(state):
    start = time.time()
    try:
        result = await billing_agent_logic(state)
        agent_requests.labels(agent="billing", status="success").inc()
        return result
    except Exception:
        agent_requests.labels(agent="billing", status="error").inc()
        raise
    finally:
        agent_latency.labels(agent="billing").observe(time.time() - start)

# 6 用户反馈闭环
@app.post("/feedback")
async def collect_feedback(req: FeedbackRequest):
    db.insert("feedback", {
        "trace_id": req.trace_id,
        "user_id": req.user_id,
        "rating": req.rating,
        "comment": req.comment,
        "timestamp": time.time()
    })
    # 差评自动生成测试 case
    if req.rating <= 2:
        await save_for_review(req)
    return {"ok": True}

# 7 离线评估(eval set 持续跑)
@dataclass
class EvalCase:
    question: str
    expected_agent: str
    expected_tools: List[str]
    expected_answer_contains: List[str]

async def run_evals(cases: List[EvalCase]):
    results = []
    for case in cases:
        actual = await app.ainvoke({"messages": [HumanMessage(case.question)]})
        score = {
            "agent_match": actual['next_agent'] == case.expected_agent,
            "tools_match": set(actual['tools_called']) >= set(case.expected_tools),
            "answer_contains": all(s in actual['final_answer'] for s in case.expected_answer_contains)
        }
        results.append({"case": case.question, "score": score})
    return results

实战经验:用户级 + 全局级双层 rate limit 必上 否则一个用户的 burst 影响所有人;队列削峰应对突发流量 配合 429 返回让前端排队;OpenTelemetry + Langfuse 双轨观测 OTEL 看系统层 Langfuse 看 LLM 层;Prometheus 业务指标必上 P99 延迟 + 错误率分 agent 看;用户反馈闭环 + 差评 case 进 eval set 是模型持续优化的关键;eval 必须每次发版前跑 防止 regression。我们 trace 体系建好后 平均故障定位时间从 30 分钟降到 3 分钟。

关键概念速查

问题 关键工具/参数 推荐 备注
Agent 拆分 LangGraph supervisor + specialist 大于 5 tool 必拆 单 Agent 准确率断崖
Tool schema Pydantic + Literal + pattern 必上 防参数 hallucinate
状态管理 PostgresSaver checkpoint 必上 多轮对话基础
长期记忆 vector store + LLM remember 推荐 用户偏好沉淀
Tool fallback cached -> partial -> apology 必上 不要整链断
LLM fallback 多模型降级链 必上 rate limit 撞墙时
Token 治理 压缩 + 裁剪 + 分级 + 缓存 必做 能省 60%+
模型分级 mini / 4o / sonnet 必做 80% 任务 mini 够
Rate limit 用户级 + 全局级双层 必上 防一人烧爆
Trace OTEL + Langfuse 必上 故障定位 10x 提速

避坑清单

  1. 不要单 Agent + 大 prompt 塞所有 tool 大于 5 个 tool 准确率断崖。
  2. 不要 tool 不写 Pydantic schema LLM 必 hallucinate 参数。
  3. 不要 tool description 只写一句话 必须含使用场景与禁用场景。
  4. 不要 tool 返回结果不裁剪 200 字段大对象撑爆 context。
  5. 不要 checkpoint 用内存 saver 重启就丢必须 PostgresSaver。
  6. 不要 thread_id 只用 user_id 多设备多 tab 必串台。
  7. 不要 tool 失败整链断必须 fallback 链(缓存 -> 部分 -> 致歉)。
  8. 不要不做模型分级 80% 任务 mini 足够别全用 gpt-4o 烧钱。
  9. 不要不做用户级 rate limit 一个恶意用户能烧爆整月预算。
  10. 不要不上 Langfuse / OTEL trace 出问题根本无法定位是哪个 Agent 哪个 tool 哪个 LLM call 错。

总结

把多 Agent 系统从我们踩过的所有坑里反过来看 你会发现真正能投产的 AI 助手不是"LangChain + GPT-4 + 一堆 tool"凑出来的 demo 而是一个 Agent 拆分 + tool schema 严格化 + 状态记忆分层 + 错误降级 + token 治理 + 并发控制 + 端到端 trace 的完整工程体系。同样一个业务场景 同一份 LLM 配置错了 准确率 60% 成本 8 美元/用户/月 用户体验"AI 不如人工";配置对了 准确率 92% 成本 1.5 美元/用户/月 用户每天主动用。Agent 工程化不是"prompt 工程"那么简单 它是把 LLM 这个不确定性极强的组件 用工程手段约束成确定性服务的全套方法论。

另一个常见的认知误区是把 Agent 当"魔法盒子" 觉得"用 GPT-4 就万事大吉" 实际上 GPT-4 给你的是基础能力 但能力强不等于业务可靠 业务可靠需要工程化把不确定性收敛在可接受范围内。真正生产级 Agent 系统 LLM 调用只是一小块 周围的 schema 校验 / 状态管理 / 错误降级 / 监控 trace 才是占 80% 代码量与 80% 价值的部分。

打个比方 多 Agent 系统像一个客服中心。Supervisor 是前台分诊员(根据问题转给账单组/用量组/技术组)Specialist Agent 是各部门客服(只精通自己领域)Tool 是部门可以查的内部系统(账单系统/CRM/工单系统)Schema 是工单填写规范(字段必填+格式校验 错填会被退回)Memory 是客户档案(基础信息 + 历史互动 跨部门共享)错误降级 是"系统繁忙时给个备选方案而不是说话算话"(部分信息+道歉好过完全沉默)Token 治理 是话务员通话时长控制(简单问题 30 秒解决 别拖 10 分钟)Rate limit 是排队叫号系统(防止一个人霸占所有客服)Trace 是通话录音(出问题能复盘到哪一环)。哪一环没做 这个客服中心可能能接电话 但要么答非所问 要么把客户转 5 次还没解决 要么繁忙时全部挂断。

所以下一次再有人跟你说"做个 AI 助手就 LangChain + GPT-4 就行" 你可以反问他 Agent 拆了吗 tool schema 严格吗 checkpoint 用 Postgres 了吗 错误降级链建了吗 模型分级路由了吗 用户级 rate limit 上了吗 Langfuse trace 接了吗 eval set 跑了吗。这些工作没做完 你的 Agent 只是一个能跑通 demo 的玩具 不是一个能在生产服务真实用户的 AI 助手。从踩坑到投产 中间隔着一整套 Agent 工程方法论 这条路没有捷径 但走完之后 你的 AI 助手会从"演示惊艳上线翻车"变成"老板拍桌叫好 用户每天主动用" 从单用户月成本 8 美元变成 1.5 美元 从准确率 60% 变成 92% 真正成为业务的核心生产力。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

Kafka 消费者组与 exactly-once 完全指南:从一次"Pod OOM 重启 rebalance 后 12 万条对账重复消费财务通宵对账"看懂为什么 enable.auto.commit=true 远远不够

2026-5-25 11:25:18

技术教程

Nginx 性能调优与超大并发完全指南:从一次"直播开播 5 分钟 worker_connections 1024 撞墙全站 502"看懂为什么 apt install nginx 远远不够

2026-5-25 11:38:23

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索