AI 应用可观测性工程 2026:LLM 调用追踪、评估体系与成本监控全栈实践
前言
你的 AI 应用上线了,用户在用,但你真的知道它在做什么吗?
传统应用的可观测性体系(Metrics / Logs / Traces)在 LLM 应用上遭遇了全新挑战:一个 LLM 调用可能耗时 3-30 秒,成本从 0.01 美元到 1 美元不等,而「正确性」这个指标根本无法用日志里的状态码来衡量。更棘手的是,LLM 的输出是非确定性的——相同输入可能产生不同输出,这让传统「有错误就告警」的逻辑完全失效。
本文系统介绍 2026 年 AI 应用可观测性工程的完整体系,从 Tracing 到评估,从成本监控到质量告警,构建真正可运营的 LLM 监控平台。
一、AI 可观测性的特殊挑战
1.1 与传统应用的根本差异
| 维度 | 传统应用 | LLM 应用 |
|---|---|---|
| 延迟 | ms 级,P99 < 1s | 秒级,P99 可能 30s+ |
| 成本 | 固定基础设施成本 | 按 Token 计费,高度可变 |
| 错误定义 | HTTP 状态码 / 异常 | 语义错误(幻觉、格式错误、逻辑谬误) |
| 质量评估 | 精确的成功 / 失败 | 模糊的好 / 差,需人工或 LLM 评估 |
| 调试 | 确定性,可复现 | 非确定性,相同输入可能不同输出 |
传统应用的监控哲学是「对错二分」——200 OK 就成功,500 就失败。但 LLM 的世界是概率分布,输出质量是一个连续光谱,必须重新定义「可观测」的边界。
1.2 AI 可观测性的四层需求金字塔
LLM 应用的可观测性需要覆盖四个层次:
┌─────────────────────────────────────────┐
│ 业务层:任务成功率、用户满意度 │
├─────────────────────────────────────────┤
│ 质量层:输出正确性、幻觉率、格式合规率 │
├─────────────────────────────────────────┤
│ 成本层:Token 消耗、API 费用、缓存命中率 │
├─────────────────────────────────────────┤
│ 基础层:延迟、错误率、并发、重试次数 │
└─────────────────────────────────────────┘
每一层都有独特的数据采集方式和告警策略,我们逐一拆解。
二、追踪体系:从单次调用到完整链路
2.1 OpenTelemetry for LLM:行业事实标准
OpenTelemetry 已成为 LLM 追踪的事实标准。2026 年发布的 GenAI 语义约定(Semantic Conventions)定义了标准属性命名空间,让所有 LLM 提供商的追踪数据格式统一:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# ── 初始化追踪 Provider ──
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("llm-app")
def traced_llm_call(prompt: str, model: str = "gpt-4o", temperature: float = 0.7):
with tracer.start_as_current_span("llm.chat") as span:
# 遵循 GenAI 语义约定(gen_ai.* 命名空间)
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("gen_ai.operation.name", "chat")
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.request.temperature", temperature)
span.set_attribute("gen_ai.request.max_tokens", 2048)
# 记录输入(注意 PII 处理,只截取前 1000 字符避免 Span 过大)
span.set_attribute("gen_ai.prompt", prompt[:1000])
import openai
client = openai.OpenAI()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
)
# 记录输出和用量统计
usage = response.usage
span.set_attribute("gen_ai.usage.input_tokens", usage.prompt_tokens)
span.set_attribute("gen_ai.usage.output_tokens", usage.completion_tokens)
span.set_attribute("gen_ai.usage.total_tokens", usage.total_tokens)
span.set_attribute("gen_ai.response.model", response.model)
content = response.choices[0].message.content
span.set_attribute("gen_ai.completion", content[:1000])
# 计算单次调用成本(以 GPT-4o 2026 年定价为参考)
input_cost = usage.prompt_tokens * 2.5 / 1_000_000 # $2.5 / 1M tokens
output_cost = usage.completion_tokens * 10 / 1_000_000 # $10 / 1M tokens
span.set_attribute("gen_ai.cost.usd", round(input_cost + output_cost, 6))
return content
关键设计原则:所有
gen_ai.*属性严格遵循 OpenTelemetry GenAI 语义约定,保证跨平台兼容性。禁止将完整 prompt(含用户隐私数据)记录到 Span 属性中。
2.2 多层 Span 架构:RAG 完整追踪
真实生产环境中的 LLM 调用往往是 RAG(检索增强生成)链路的一部分,需要用 Span 嵌套追踪每个子步骤:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
import time
class RAGTracer:
def __init__(self, service_name: str):
self.tracer = trace.get_tracer(service_name)
self.spans = {}
def trace_retrieval(self, query: str, top_k: int = 5):
with self.tracer.start_as_current_span("rag.retrieval") as span:
span.set_attribute("rag.query.length", len(query))
span.set_attribute("rag.top_k", top_k)
span.set_attribute("rag.retriever.type", "dense+sparse-hybrid")
start = time.time()
chunks = self.vector_store.similarity_search(query, k=top_k)
retrieval_ms = (time.time() - start) * 1000
span.set_attribute("rag.retrieval.duration_ms", round(retrieval_ms, 2))
span.set_attribute("rag.chunks.retrieved", len(chunks))
span.set_attribute("rag.chunks.avg_length", sum(len(c.page_content) for c in chunks) // len(chunks))
# 计算召回质量分数
relevance_scores = [c.metadata.get("score", 0.0) for c in chunks]
span.set_attribute("rag.relevance.avg_score", round(sum(relevance_scores) / len(relevance_scores), 3))
span.set_attribute("rag.relevance.min_score", round(min(relevance_scores), 3))
return chunks
def trace_generation(self, prompt: str, model: str):
with self.tracer.start_as_current_span("rag.generation") as span:
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.prompt.length", len(prompt))
start = time.time()
answer = self.llm.call(prompt)
generation_ms = (time.time() - start) * 1000
span.set_attribute("gen_ai.usage.total_tokens", answer.usage.total_tokens)
span.set_attribute("gen_ai.generation.duration_ms", round(generation_ms, 2))
span.set_attribute("gen_ai.completion.length", len(answer.content))
return answer
def trace_full_rag(self, query: str):
"""
完整 RAG 链路:从查询解析 → 检索 → 重排序 → 生成 → 评估
每个步骤都是一个子 Span,父 Span 聚合全链路指标
"""
with self.tracer.start_as_current_span("rag.full_pipeline") as parent_span:
parent_span.set_attribute("rag.query", query[:500])
# Step 1: 查询改写(可选,优化召回)
rewritten = self.query_rewriter.rewrite(query)
parent_span.set_attribute("rag.query.rewritten", rewritten[:200])
parent_span.set_attribute("rag.query.rewritten.enabled", rewritten != query)
# Step 2: 向量检索
chunks = self.trace_retrieval(rewritten)
# Step 3: 重排序(可选,使用 Cross-Encoder)
if self.reranker:
reranked = self.reranker.rerank(query, chunks)
parent_span.set_attribute("rag.reranking.enabled", True)
parent_span.set_attribute("rag.reranking.top", len(reranked))
else:
reranked = chunks
parent_span.set_attribute("rag.reranking.enabled", False)
# Step 4: 构建 Prompt 并生成
context = "\n\n".join(c.page_content for c in reranked)
prompt = f"基于以下上下文回答问题。\n\n上下文:{context}\n\n问题:{query}"
answer = self.trace_generation(prompt, model="gpt-4o")
# Step 5: 后处理(格式校验、敏感词过滤等)
with self.tracer.start_as_current_span("rag.postprocess") as post_span:
cleaned = self.post_processor.clean(answer.content)
post_span.set_attribute("rag.postprocess.format_valid", self.post_processor.is_valid_json(cleaned))
parent_span.set_attribute("rag.chunks.used", len(reranked))
parent_span.set_attribute("rag.answer.length", len(cleaned))
return cleaned
通过这种嵌套 Span 结构,可以在 Jaeger / Tempo 等追踪平台中看到一个完整 RAG 请求的瀑布图,精准定位是检索质量差还是生成质量差。
2.3 多 Agent 链路追踪:跨 Agent 上下文传播
多 Agent 系统中,每个 Agent 都有独立的 LLM 调用,但需要共享同一个 TraceID 才能还原完整协作链路:
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time
# ── 用 ContextVar 实现 Trace 上下文跨协程传播 ──
current_trace_id: ContextVar[str] = ContextVar('trace_id', default='')
current_span_id: ContextVar[str] = ContextVar('span_id', default='')
@dataclass
class AgentSpan:
trace_id: str
span_id: str
parent_span_id: str
agent_name: str
agent_role: str
input_summary: str
output_summary: str
model: str
start_time: float
end_time: Optional[float] = None
input_tokens: int = 0
output_tokens: int = 0
error: Optional[str] = None
@property
def duration_ms(self) -> float:
if self.end_time is None:
return 0.0
return (self.end_time - self.start_time) * 1000
class AgentTracer:
"""
轻量级多 Agent 追踪器,支持:
- 自动生成 TraceID 并在所有 Agent 间共享
- 每个 Agent 的 LLM 调用记录为独立 Span
- 最终导出为标准 OTLP 格式或 JSON 供自建平台消费
"""
def __init__(self, trace_id: str = None):
self.trace_id = trace_id or str(uuid.uuid4())
self.spans: list[AgentSpan] = []
current_trace_id.set(self.trace_id)
def start_span(self, agent_name: str, agent_role: str, input_summary: str, model: str) -> AgentSpan:
span = AgentSpan(
trace_id=self.trace_id,
span_id=str(uuid.uuid4()),
parent_span_id=current_span_id.get(''),
agent_name=agent_name,
agent_role=agent_role,
input_summary=input_summary[:200],
model=model,
start_time=time.time(),
output_summary='',
)
self.spans.append(span)
current_span_id.set(span.span_id)
return span
def end_span(self, span: AgentSpan, output_summary: str = '', **usage_kwargs):
span.end_time = time.time()
span.output_summary = output_summary[:200]
span.input_tokens = usage_kwargs.get('input_tokens', 0)
span.output_tokens = usage_kwargs.get('output_tokens', 0)
# 恢复父级 SpanID(支持嵌套)
parent = next((s for s in self.spans if s.span_id == span.parent_span_id), None)
if parent:
current_span_id.set(parent.span_id)
else:
current_span_id.set('')
def export_otlp(self) -> dict:
"""导出为 OTLP Span 格式,可发送给任何 OTLP 兼容后端"""
return {
"resourceSpans": [{
"resource": {
"attributes": [
{"key": "service.name", "value": {"stringValue": "multi-agent-system"}},
{"key": "multiagent.trace_id", "value": {"stringValue": self.trace_id}},
]
},
"scopeSpans": [{
"spans": [
{
"traceId": self.trace_id,
"spanId": s.span_id,
"parentSpanId": s.parent_span_id or "0000000000000000",
"name": f"agent.{s.agent_name}",
"kind": 1, # SPAN_KIND_INTERNAL
"startTimeUnixNano": int(s.start_time * 1e9),
"endTimeUnixNano": int(s.end_time * 1e9) if s.end_time else 0,
"attributes": [
{"key": "agent.name", "value": {"stringValue": s.agent_name}},
{"key": "agent.role", "value": {"stringValue": s.agent_role}},
{"key": "gen_ai.system", "value": {"stringValue": s.model}},
{"key": "gen_ai.usage.input_tokens", "value": {"intValue": s.input_tokens}},
{"key": "gen_ai.usage.output_tokens", "value": {"intValue": s.output_tokens}},
{"key": "error", "value": {"stringValue": s.error}} if s.error else None,
],
}
for s in self.spans
]
}]
}]
}
def get_topology(self) -> dict:
"""生成 Agent 调用拓扑图数据,用于可视化"""
return {
"trace_id": self.trace_id,
"nodes": [
{"id": s.span_id, "label": s.agent_name, "role": s.agent_role, "duration_ms": round(s.duration_ms, 1)}
for s in self.spans
],
"edges": [
{"source": s.parent_span_id, "target": s.span_id}
for s in self.spans if s.parent_span_id
],
"summary": {
"total_agents": len(set(s.agent_name for s in self.spans)),
"total_calls": len(self.spans),
"total_duration_ms": sum(s.duration_ms for s in self.spans),
"total_tokens": sum(s.input_tokens + s.output_tokens for s in self.spans),
}
}
2.4 LangFuse 集成:专用 LLM 追踪平台
对于不想自建追踪基础设施的团队,LangFuse 是目前最成熟的开源 LLM 可观测性平台:
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from langfuse.decorators import retry
langfuse = Langfuse(
public_key="your-public-key",
secret_key="your-secret-key",
host="https://cloud.langfuse.com" # 或自部署
)
@observe(name="rag-pipeline", metadata={"pipeline_version": "v2.1.0"})
def rag_answer(question: str, user_id: str = None):
"""
完整 RAG 流程追踪:LangFuse 自动处理追踪、采样和存储
"""
# 顶层标注输入
langfuse_context.update_current_observation(
input={"question": question, "user_id": user_id},
metadata={"source": "api", "env": "production"},
)
# ── 检索阶段 ──
with langfuse_context.observe_span(name="retrieval", metadata={"retriever": "pg_vector"}):
chunks = vector_store.similarity_search(question, k=5)
langfuse_context.update_current_observation(
output={"num_chunks": len(chunks)},
metadata={"avg_chunk_length": sum(len(c.page_content) for c in chunks) // max(len(chunks), 1)},
)
# ── 生成阶段 ──
with langfuse_context.observe_span(name="generation", metadata={"model": "gpt-4o"}):
prompt = build_rag_prompt(question, chunks)
langfuse_context.update_current_observation(
input={"prompt_length": len(prompt), "num_context_chunks": len(chunks)},
)
answer = llm.call(prompt)
langfuse_context.update_current_observation(
output={"answer_length": len(answer.content), "finish_reason": answer.finish_reason},
)
# ── 更新顶层 Trace ──
langfuse_context.update_current_trace(
output={"answer": answer.content},
tags=["rag", "production", "v2.1"],
user_id=user_id,
)
return answer.content
# ── 自动重试带追踪 ──
@retry(max_attempts=3, delay=1.0, exponential_backoff=True)
@observe(name="llm-call")
def llm_call_with_retry(prompt: str, model: str):
return client.chat.completions.create(model=model, messages=[{"role": "user", "content": prompt}])
三、评估体系:衡量输出质量
3.1 传统指标的失效
传统应用的「错误率」指标在 LLM 场景下几乎毫无意义。一个返回 200 的 LLM 调用,可能输出了完全 hallucinate(幻觉)的错误信息。必须建立新的评估维度:
| 评估维度 | 定义 | 采集方式 |
|---|---|---|
| 格式合规率 | 输出是否符合预期的结构(JSON/ Markdown / 纯文本) | 正则 / JSON Schema 校验 |
| 幻觉率 | 回答与检索上下文的一致程度 | RAG 召回率 + LLM Judge |
| 相关性得分 | 回答与用户问题的语义匹配度 | Embedding Cosine 相似度 / LLM Judge |
| 安全合规率 | 是否包含敏感词 / 越狱内容 | 关键词匹配 + 分类模型 |
| 任务完成率 | 用户意图是否被正确理解和处理 | 人工 / 自动化评分 |
3.2 LLM-as-Judge:用模型评估模型
2026 年最主流的自动化评估方案是 LLM Judge——用一个强模型评估目标模型的输出质量:
from openai import OpenAI
from pydantic import BaseModel, field_validator
from typing import Literal
client = OpenAI()
class EvaluationResult(BaseModel):
relevance_score: float # 0-10
factual_accuracy: float # 0-10
hallucination_detected: bool
format_compliant: bool
overall_quality: Literal["excellent", "good", "acceptable", "poor"]
reasoning: str
issues: list[str]
@field_validator("relevance_score", "factual_accuracy")
@classmethod
def must_be_in_range(cls, v):
if not 0 <= v <= 10:
raise ValueError(f"Score {v} out of range [0, 10]")
return v
def llm_judge_evaluate(question: str, answer: str, context: str = "") -> EvaluationResult:
"""
用 GPT-4o 评估目标模型输出的质量。
评估维度:相关性、事实准确性、幻觉检测、格式合规。
"""
judge_prompt = f"""你是一个严格的 AI 输出质量评估专家。
请评估以下 AI 回答在四个维度的质量,并给出评分和详细理由。
【用户问题】
{question}
【AI 回答】
{answer}
【参考上下文(可选)】
{context}
请以 JSON 格式返回评估结果:
{{
"relevance_score": 0-10, # 回答与问题的相关性
"factual_accuracy": 0-10, # 事实准确性
"hallucination_detected": true/false, # 是否检测到幻觉
"format_compliant": true/false, # 格式是否合规
"overall_quality": "excellent|good|acceptable|poor",
"reasoning": "详细评估理由",
"issues": ["问题1", "问题2"]
}}
"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": judge_prompt}],
response_format={"type": "json_object"},
temperature=0.1, # 低温度保证评估一致性
)
import json
raw = json.loads(response.choices[0].message.content)
return EvaluationResult(**raw)
# ── 生产级评估管道:异步批量评估 ──
import asyncio
from dataclasses import dataclass, asdict
@dataclass
class EvaluationRecord:
trace_id: str
question: str
answer: str
context: str
result: EvaluationResult
evaluated_at: float
class EvaluationPipeline:
def __init__(self, judge_model: str = "gpt-4o", batch_size: int = 10):
self.client = OpenAI()
self.judge_model = judge_model
self.batch_size = batch_size
self.results: list[EvaluationRecord] = []
async def evaluate_async(self, trace_id: str, question: str, answer: str, context: str = ""):
"""异步评估单个回答"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: llm_judge_evaluate(question, answer, context)
)
record = EvaluationRecord(
trace_id=trace_id,
question=question,
answer=answer,
context=context,
result=result,
evaluated_at=time.time(),
)
self.results.append(record)
return record
async def run_batch(self, items: list[dict]):
"""批量评估,可用于每日离线批处理"""
tasks = [
self.evaluate_async(
trace_id=item["trace_id"],
question=item["question"],
answer=item["answer"],
context=item.get("context", ""),
)
for item in items
]
await asyncio.gather(*tasks, return_exceptions=True)
def get_summary(self) -> dict:
"""生成评估汇总报告"""
if not self.results:
return {}
valid = [r for r in self.results if isinstance(r.result, EvaluationResult)]
return {
"total_evaluated": len(valid),
"avg_relevance": sum(r.result.relevance_score for r in valid) / len(valid),
"avg_accuracy": sum(r.result.factual_accuracy for r in valid) / len(valid),
"hallucination_rate": sum(1 for r in valid if r.result.hallucination_detected) / len(valid),
"format_compliance": sum(1 for r in valid if r.result.format_compliant) / len(valid),
"quality_distribution": {
q: sum(1 for r in valid if r.result.overall_quality == q)
for q in ["excellent", "good", "acceptable", "poor"]
}
}
3.3 统计级评估:P50 / P90 / P99 分布监控
除了逐条评估,还需要宏观层面的质量趋势监控:
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
class QualityMonitor:
"""
质量趋势监控器:统计质量得分分布,检测质量漂移
"""
def __init__(self, window_hours: int = 24):
self.window_hours = window_hours
self.scores: dict[str, list[float]] = defaultdict(list)
self.hallucination_events: list[dict] = []
self.format_violations: list[dict] = []
def record(self, evaluation: EvaluationResult, trace_id: str, model: str, timestamp: float = None):
ts = timestamp or time.time()
self.scores[f"{model}_relevance"].append(evaluation.relevance_score)
self.scores[f"{model}_accuracy"].append(evaluation.factual_accuracy)
if evaluation.hallucination_detected:
self.hallucination_events.append({"trace_id": trace_id, "timestamp": ts})
if not evaluation.format_compliant:
self.format_violations.append({"trace_id": trace_id, "timestamp": ts})
def get_distribution(self, metric: str, model: str) -> dict:
"""计算指定指标的分位数分布"""
key = f"{model}_{metric}"
data = np.array(self.scores[key])
if len(data) == 0:
return {}
return {
"p50": round(float(np.percentile(data, 50)), 2),
"p90": round(float(np.percentile(data, 90)), 2),
"p99": round(float(np.percentile(data, 99)), 2),
"mean": round(float(np.mean(data)), 2),
"std": round(float(np.std(data)), 2),
"count": len(data),
}
def detect_quality_drift(self, metric: str, model: str, threshold_pct: float = 0.1) -> bool:
"""
检测质量漂移:如果 p90 得分在过去 window 内下降超过 threshold_pct,触发告警
"""
dist = self.get_distribution(metric, model)
if dist["p90"] < 7.0: # 硬编码阈值,实际应用中应可配置
return True
return False
def generate_report(self, model: str) -> str:
relevance = self.get_distribution("relevance", model)
accuracy = self.get_distribution("accuracy", model)
hallucination_rate = len(self.hallucination_events) / max(len(self.scores[f"{model}_relevance"]), 1)
format_compliance = 1 - (len(self.format_violations) / max(len(self.scores[f"{model}_relevance"]), 1))
return f"""## LLM 质量监控报告(过去 {self.window_hours} 小时)
### 模型:{model}
**相关性得分**
- P50: {relevance.get('p50')} | P90: {relevance.get('p90')} | P99: {relevance.get('p99')}
- 均值: {relevance.get('mean')} | 标准差: {relevance.get('std')}
**事实准确性得分**
- P50: {accuracy.get('p50')} | P90: {accuracy.get('p90')} | P99: {accuracy.get('p99')}
- 均值: {accuracy.get('mean')}
**综合指标**
- 幻觉率: {hallucination_rate:.2%}
- 格式合规率: {format_compliance:.2%}
- 样本量: {relevance.get('count', 0)}
"""
四、成本监控:Token 计费的精细化治理
4.1 为什么 LLM 成本监控是工程难题
LLM 成本监控远比传统 API 调用复杂:
- 输入输出分离计费:GPT-4o 输入 $2.5/M tokens,输出 $10/M tokens,比例不固定
- 上下文长度非线性影响:2048 token 的 prompt 和 128K token 的 prompt 价格差 60 倍
- 缓存命中间接降本:OpenAI 的 Cache API 和 Anthropic 的 Thinking Budget 都能节省成本,但需要正确计量
- 多模型混合成本:生产系统往往同时调用多个模型,成本结构复杂
4.2 生产级成本追踪架构
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import time
class CostAlertLevel(Enum):
NORMAL = "normal"
WARNING = "warning" # 单次调用 > $0.05
CRITICAL = "critical" # 单次调用 > $0.50
@dataclass
class LLMCallRecord:
"""完整的 LLM 调用记录"""
call_id: str
trace_id: str
model: str
operation: str
# Token 用量
prompt_tokens: int
completion_tokens: int
total_tokens: int
# 成本(以美元计)
input_cost: float
output_cost: float
total_cost: float
# 效率指标
tokens_per_second: float # 吞吐量
first_token_latency_ms: float # 首 token 延迟(流式)
# 缓存状态
cache_hit: bool = False
cache_discount: float = 0.0 # 缓存节省的比例
# 元数据
timestamp: float = field(default_factory=time.time)
duration_ms: float = 0.0
status: str = "success" # success | error | timeout
class CostTracker:
"""
生产级 LLM 成本追踪器,支持:
- 单次调用成本精确计算
- 实时累计成本和趋势
- 异常调用自动告警
- 缓存节省统计
"""
# 2026 年主流模型定价($/M tokens)
PRICING = {
"gpt-4o": {"input": 2.5, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet-4": {"input": 3.0, "output": 15.0},
"claude-opus-4": {"input": 15.0, "output": 75.0},
"gemini-2.5-pro": {"input": 1.25, "output": 5.0},
"deepseek-v3": {"input": 0.27, "output": 1.10},
}
# 告警阈值(美元)
ALERT_THRESHOLDS = {
"warning": 0.05,
"critical": 0.50,
}
def __init__(self):
self.records: list[LLMCallRecord] = []
self.cost_by_model: dict[str, float] = defaultdict(float)
self.cost_by_operation: dict[str, float] = defaultdict(float)
self.cache_savings: float = 0.0
self.alerts: list[dict] = []
def record(
self,
trace_id: str,
model: str,
operation: str,
usage: dict,
duration_ms: float,
cache_hit: bool = False,
) -> LLMCallRecord:
pricing = self.PRICING.get(model, {"input": 0, "output": 0})
input_cost = usage["prompt_tokens"] * pricing["input"] / 1_000_000
output_cost = usage["completion_tokens"] * pricing["output"] / 1_000_000
# 缓存折扣(OpenAI Cached Tokens 享受 90% 折扣)
cache_discount = 0.9 if cache_hit else 0.0
cached_input_cost = input_cost * (1 - cache_discount)
total_cost = cached_input_cost + output_cost
record = LLMCallRecord(
call_id=str(uuid.uuid4()),
trace_id=trace_id,
model=model,
operation=operation,
prompt_tokens=usage["prompt_tokens"],
completion_tokens=usage["completion_tokens"],
total_tokens=usage["total_tokens"],
input_cost=round(cached_input_cost, 6),
output_cost=round(output_cost, 6),
total_cost=round(total_cost, 6),
tokens_per_second=round(usage["completion_tokens"] / max(duration_ms / 1000, 0.001), 1),
first_token_latency_ms=0.0,
cache_hit=cache_hit,
cache_discount=cache_discount,
duration_ms=duration_ms,
)
self.records.append(record)
self.cost_by_model[model] += total_cost
self.cost_by_operation[operation] += total_cost
if cache_hit:
self.cache_savings += input_cost * cache_discount
# 触发告警检查
if total_cost >= self.ALERT_THRESHOLDS["critical"]:
self.alerts.append({
"level": "critical",
"trace_id": trace_id,
"model": model,
"cost": total_cost,
"reason": f"单次调用成本 ${total_cost:.4f} 超过 critical 阈值 ${self.ALERT_THRESHOLDS['critical']}",
})
elif total_cost >= self.ALERT_THRESHOLDS["warning"]:
self.alerts.append({
"level": "warning",
"trace_id": trace_id,
"model": model,
"cost": total_cost,
})
return record
def get_dashboard(self, window_hours: int = 24) -> dict:
"""生成成本监控大盘数据"""
cutoff = time.time() - window_hours * 3600
recent = [r for r in self.records if r.timestamp >= cutoff]
total_cost = sum(r.total_cost for r in recent)
total_tokens = sum(r.total_tokens for r in recent)
avg_cost_per_call = total_cost / max(len(recent), 1)
# 按模型分解成本
cost_breakdown = {}
for model, cost in self.cost_by_model.items():
model_records = [r for r in recent if r.model == model]
if model_records:
cost_breakdown[model] = {
"total_cost": round(cost, 4),
"call_count": len(model_records),
"avg_cost_per_call": round(cost / len(model_records), 4),
"total_tokens": sum(r.total_tokens for r in model_records),
"cache_savings": round(sum(r.input_cost * r.cache_discount for r in model_records), 4),
}
return {
"window_hours": window_hours,
"total_cost_usd": round(total_cost, 4),
"total_calls": len(recent),
"avg_cost_per_call_usd": round(avg_cost_per_call, 4),
"total_tokens": total_tokens,
"cache_savings_usd": round(self.cache_savings, 4),
"cache_savings_rate": round(self.cache_savings / max(total_cost + self.cache_savings, 0.001), 4),
"cost_by_model": cost_breakdown,
"cost_by_operation": dict(sorted(self.cost_by_operation.items(), key=lambda x: -x[1])[:10]),
"active_alerts": self.alerts[-10:], # 最近 10 条告警
}
4.3 成本优化:缓存与 Prompt 压缩
成本监控的最终目的是优化。三个最有效的降本手段:
# ── 策略 1:语义缓存(Semantic Cache)──
class SemanticCache:
"""
基于 Embedding 相似度的语义缓存。
当新请求与历史请求的语义相似度 > 0.95 时,直接返回缓存结果。
节省 100% 的 LLM 调用成本。
"""
def __init__(self, threshold: float = 0.95, max_entries: int = 10000):
self.threshold = threshold
self.cache: list[tuple[np.ndarray, dict]] = [] # (embedding, {prompt, response, usage})
self.stats = {"hits": 0, "misses": 0}
def _embed(self, text: str) -> np.ndarray:
return self.embedding_model.encode(text)
def get(self, prompt: str) -> Optional[dict]:
query_emb = self._embed(prompt)
for cached_emb, cached_data in self.cache:
similarity = float(np.dot(query_emb, cached_emb) / (np.linalg.norm(query_emb) * np.linalg.norm(cached_emb)))
if similarity >= self.threshold:
self.stats["hits"] += 1
return cached_data
self.stats["misses"] += 1
return None
def put(self, prompt: str, response: dict, usage: dict):
entry = (self._embed(prompt), {"prompt": prompt, "response": response, "usage": usage})
self.cache.append(entry)
if len(self.cache) > self.max_entries:
self.cache.pop(0)
def hit_rate(self) -> float:
total = self.stats["hits"] + self.stats["misses"]
return self.stats["hits"] / max(total, 1)
# ── 策略 2:Prompt 压缩 ──
def compress_prompt(prompt: str, max_chars: int = 8000) -> str:
"""
简单版 Prompt 压缩:
1. 移除冗余空白
2. 截断过长的上下文引用
3. 对于超过 max_chars 的输入,启用 summary/abstract 中间步骤
"""
import re
cleaned = re.sub(r'\s+', ' ', prompt).strip()
if len(cleaned) <= max_chars:
return cleaned
# 激进压缩:保留前 60% + 后 40%
keep_front = int(max_chars * 0.6)
keep_back = max_chars - keep_front
return cleaned[:keep_front] + f"\n... [压缩内容,原始长度 {len(cleaned)} 字符] ...\n" + cleaned[-keep_back:]
五、告警体系:三位一体的智能告警
5.1 告警分层设计
LLM 应用告警必须分层,不能用传统应用的「错误率 > 1% 就告警」策略:
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio
class AlertSeverity(Enum):
P0_CRITICAL = "critical" # 立即处理:服务不可用或成本异常
P1_HIGH = "high" # 2小时内处理:质量严重下降
P2_MEDIUM = "medium" # 24小时内处理:趋势异常
P3_LOW = "low" # 例行关注:统计偏差
@dataclass
class Alert:
id: str
severity: AlertSeverity
title: str
description: str
metric: str
current_value: float
threshold: float
affected_traces: list[str]
suggested_action: str
created_at: float
class LLMLAlertManager:
"""
AI 应用智能告警管理器。
支持多层指标联动告警,减少告警风暴。
"""
# ── 告警规则配置 ──
RULES = [
# P0: 成本异常
{"severity": AlertSeverity.P0_CRITICAL, "metric": "single_call_cost", "op": "gt", "threshold": 0.50,
"window": "instant", "cooldown_minutes": 5,
"title": "LLM 单次调用成本异常",
"action": "立即检查是否 prompt 过长或陷入循环调用"},
# P0: 错误率飙升
{"severity": AlertSeverity.P0_CRITICAL, "metric": "error_rate", "op": "gt", "threshold": 0.05,
"window": 300, "cooldown_minutes": 5,
"title": "LLM 错误率超过 5%",
"action": "检查 LLM API 服务状态和模型可用性"},
# P1: 质量严重下降
{"severity": AlertSeverity.P1_HIGH, "metric": "avg_relevance_p90", "op": "lt", "threshold": 7.0,
"window": 3600, "cooldown_minutes": 30,
"title": "回答相关性 P90 低于 7.0",
"action": "检查 RAG 检索质量和 Prompt 有效性"},
# P1: 幻觉率上升
{"severity": AlertSeverity.P1_HIGH, "metric": "hallucination_rate", "op": "gt", "threshold": 0.15,
"window": 3600, "cooldown_minutes": 60,
"title": "幻觉率超过 15%",
"action": "检查检索上下文质量和模型 temperature 设置"},
# P2: 成本趋势上升
{"severity": AlertSeverity.P2_MEDIUM, "metric": "cost_per_hour", "op": "gt", "threshold": None, # 动态阈值
"window": 3600, "cooldown_minutes": 120,
"title": "小时成本环比上升 50%",
"action": "分析高消耗请求的分布,考虑启用缓存"},
# P2: 延迟上升
{"severity": AlertSeverity.P2_MEDIUM, "metric": "avg_latency_p95", "op": "gt", "threshold": 15000,
"window": 1800, "cooldown_minutes": 30,
"title": "LLM P95 延迟超过 15 秒",
"action": "检查是否触及模型限速,考虑降级或限流"},
# P3: Token 效率下降
{"severity": AlertSeverity.P3_LOW, "metric": "output_token_ratio", "op": "lt", "threshold": 0.3,
"window": 7200, "cooldown_minutes": 240,
"title": "输出 Token 占比低于 30%(Prompt 膨胀)",
"action": "审查 Prompt 设计,移除冗余上下文"},
]
def __init__(self, notifier: Callable[[Alert], None] = None):
self.notifier = notifier or self._default_notifier
self.cooldowns: dict[str, float] = {}
self.active_alerts: list[Alert] = []
def _default_notifier(self, alert: Alert):
print(f"[{alert.severity.value.upper()}] {alert.title}: {alert.description}")
def check_rules(self, metrics: dict) -> list[Alert]:
"""检查所有告警规则,返回触发的告警列表"""
triggered = []
for rule in self.RULES:
# 检查冷却期
rule_id = f"{rule['metric']}_{rule['op']}_{rule['threshold']}"
last_fired = self.cooldowns.get(rule_id, 0)
cooldown_sec = rule.get("cooldown_minutes", 30) * 60
if time.time() - last_fired < cooldown_sec:
continue
current = metrics.get(rule["metric"], 0)
threshold = rule["threshold"]
# 动态阈值(环比)
if rule["metric"] == "cost_per_hour" and threshold is None:
threshold = metrics.get("cost_per_hour_baseline", 0) * 1.5
fired = False
if rule["op"] == "gt" and current > threshold:
fired = True
elif rule["op"] == "lt" and current < threshold:
fired = True
if fired:
alert = Alert(
id=str(uuid.uuid4()),
severity=rule["severity"],
title=rule["title"],
description=f"{rule['metric']} = {current:.4f},阈值 = {threshold}",
metric=rule["metric"],
current_value=current,
threshold=threshold,
affected_traces=metrics.get("recent_error_traces", [])[:5],
suggested_action=rule["action"],
created_at=time.time(),
)
triggered.append(alert)
self.cooldowns[rule_id] = time.time()
self.active_alerts.append(alert)
# 按严重性排序,只通知最高优先级的
triggered.sort(key=lambda a: [AlertSeverity.P0_CRITICAL, AlertSeverity.P1_HIGH, AlertSeverity.P2_MEDIUM, AlertSeverity.P3_LOW].index(a.severity))
for alert in triggered[:3]: # 每次最多发 3 条告警
self.notifier(alert)
return triggered
5.2 告警抑制:避免告警风暴
LLM 应用中,一次模型故障可能导致同一 TraceID 产生几十条相关告警。用 TraceID 聚合来抑制:
def group_alerts_by_trace(alerts: list[Alert]) -> list[dict]:
"""按 TraceID 聚合告警,减少告警风暴"""
from collections import defaultdict
by_trace = defaultdict(list)
for alert in alerts:
for trace_id in alert.affected_traces:
by_trace[trace_id].append(alert)
grouped = []
for trace_id, trace_alerts in by_trace.items():
# 同 TraceID 告警合并为一条,severity 取最高
max_severity = min(a.severity for a in trace_alerts)
grouped.append({
"trace_id": trace_id,
"severity": max_severity,
"alert_count": len(trace_alerts),
"titles": [a.title for a in trace_alerts],
"suggested_action": trace_alerts[0].suggested_action, # 取最紧急的
})
return grouped
六、实战:构建完整的可观测性数据管道
6.1 架构总览
┌─────────────────────────────────────────────────────────────────┐
│ Your LLM Application │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ LangFuse│ │ Prometheus│ │ Loki │ │ Jaeger │ │
│ │(LLM专有) │ │(Metrics) │ │ (Logs) │ │ (Traces) │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └──────┬───────┘ │
└────────┼─────────────┼─────────────┼────────────────┼───────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ OpenTelemetry Collector │
│ ┌──────────────┐ ┌───────────────┐ ┌───────────────────┐ │
│ │ Span Enricher │ │ Tail Sampler │ │ Cost Calculator │ │
│ │ (注入业务标签)│ │ (尾部采样) │ │ (计算真实成本) │ │
│ └──────────────┘ └───────────────┘ └───────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌──────────────────────────────┐
│ Grafana │ │ Langfuse │ │ 自建指标存储 (ClickHouse) │
│ (Metrics) │ │ (LLM Traces)│ │ (低成本历史分析) │
└─────────────┘ └─────────────┘ └──────────────────────────────┘
6.2 OTEL Collector 配置:尾部采样 + 成本计算
# otel-collector-config.yaml(精简版核心配置)
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
# 内存保护:超过 80% 内存时丢弃低优先级数据
memory_limiter:
check_interval: 5s
limit_mib: 512
spike_limit_mib: 100
# 尾部采样:先缓存所有 Span,决策时看到完整请求
tail_sampling:
decision_wait: 10s
policies:
# 保留所有错误请求
- name: errors-policy
type: status_code
status_code: { status_codes: [ERROR] }
# 保留超过 5 秒的慢请求
- name: slow-traces-policy
type: latency
latency: { threshold_ms: 5000 }
# 保留特定业务标签的请求(如高价值用户)
- name: business-context-policy
type: string_attribute
string_attribute: { key: "biz.user_tier", values: ["premium", "enterprise"] }
# 采样 1% 的正常请求(节省成本)
- name: probabilistic-policy
type: probabilistic
probabilistic: { sampling_percentage: 1 }
# 自定义:计算 LLM 调用成本
transform:
error_mode: ignore
trace_statements:
- context: span
statements:
- set(attributes["llm.cost.usd"],
attributes["gen_ai.usage.input_tokens"] * 2.5 / 1000000 +
attributes["gen_ai.usage.output_tokens"] * 10.0 / 1000000
) where attributes["gen_ai.system"] != nil
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
namespace: "llm_app"
const_labels:
service: "your-llm-service"
otlp/tempo:
endpoint: "tempo:4317"
tls:
insecure: true
loki:
endpoint: "http://loki:3100/loki/api/v1/push"
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, tail_sampling, transform]
exporters: [otlp/tempo]
metrics:
receivers: [otlp]
processors: [memory_limiter]
exporters: [prometheus]
logs:
receivers: [otlp]
processors: [memory_limiter]
exporters: [loki]
6.3 Grafana Dashboard JSON 配置
{
"title": "LLM 应用可观测性大盘",
"panels": [
{
"title": "实时调用量 & 成本",
"type": "timeseries",
"targets": [
{
"expr": "sum(rate(llm_app_calls_total[5m])) by (model)",
"legendFormat": "{{model}} QPS"
},
{
"expr": "sum(rate(llm_app_cost_usd_total[5m])) by (model)",
"legendFormat": "{{model}} 成本 $/s"
}
],
"gridPos": {"x": 0, "y": 0, "w": 12, "h": 8}
},
{
"title": "Token 效率分布",
"type": "histogram",
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(llm_app_tokens_bucket[5m])) by (le, model))",
"legendFormat": "P50 - {{model}}"
},
{
"expr": "histogram_quantile(0.95, sum(rate(llm_app_tokens_bucket[5m])) by (le, model))",
"legendFormat": "P95 - {{model}}"
},
{
"expr": "histogram_quantile(0.99, sum(rate(llm_app_tokens_bucket[5m])) by (le, model))",
"legendFormat": "P99 - {{model}}"
}
],
"gridPos": {"x": 12, "y": 0, "w": 12, "h": 8}
},
{
"title": "质量趋势(LLM Judge 评分)",
"type": "timeseries",
"targets": [
{
"expr": "avg(llm_app_quality_relevance_p90{service=~\"$service\"})",
"legendFormat": "相关性 P90"
},
{
"expr": "avg(llm_app_quality_accuracy_p90{service=~\"$service\"})",
"legendFormat": "准确性 P90"
},
{
"expr": "llm_app_hallucination_rate{service=~\"$service\"}",
"legendFormat": "幻觉率"
}
],
"gridPos": {"x": 0, "y": 8, "w": 12, "h": 8}
},
{
"title": "按业务标签的成本分解",
"type": "piechart",
"targets": [
{
"expr": "sum(increase(llm_app_cost_usd_total[24h])) by (operation)",
"legendFormat": "{{operation}}"
}
],
"gridPos": {"x": 12, "y": 8, "w": 12, "h": 8}
}
]
}
七、总结:AI 可观测性工程的五大支柱
7.1 技术选型建议
| 场景 | 推荐方案 |
|---|---|
| 快速起步 / 小团队 | LangFuse Cloud + Grafana + Prometheus |
| 中等规模 / 已有 OTel 基础设施 | 自建 OTel Collector + Tempo + Langfuse |
| 大规模 / 多租户 / 企业级 | 全自建:ClickHouse + Kafka + Grafana + 自研评估引擎 |
7.2 实施路线图
第一阶段(1-2 周):基础 Tracing
- 接入 OpenTelemetry,自动注入
gen_ai.*属性 - 配置 Tail-Based Sampling,保留所有错误和慢请求
- 在 Jaeger 中可视化完整调用链路
第二阶段(2-3 周):成本可见性
- 实现 CostTracker,精确计算每次 LLM 调用成本
- 构建成本监控大盘,按模型 / 操作 / 用户维度分解
- 接入语义缓存,目标缓存命中率 > 30%
第三阶段(3-4 周):质量评估
- 接入 LLM Judge,建立自动化评估管道
- 监控幻觉率、相关性、格式合规率三大核心质量指标
- 配置 P1 及以上质量告警
第四阶段(持续):智能化运维
- 建立质量趋势分析,检测模型能力漂移
- 引入 A/B Testing 框架,对比不同 Prompt / 模型的真实效果
- 构建根因分析能力:从成本异常 → Trace → Logs → 定位根因
7.3 关键指标清单
✅ 基础层(必须)
- 调用量 QPS(按模型 / 操作)
- P50 / P95 / P99 延迟
- 错误率(API 层面)
- 单次 / 累计成本(按模型)
✅ 成本层(强烈建议)
- 缓存命中率 & 节省金额
- Token 效率(输出 / 输入比例)
- 平均每次调用成本
✅ 质量层(进阶)
- LLM Judge 相关性得分分布
- 幻觉率(小时级统计)
- 格式合规率
✅ 业务层(高阶)
- 端到端任务成功率(用户意图达成)
- 用户反馈关联(Thumb up/down → TraceID)
结语
LLM 应用的可观测性不是「多加几个 metrics」那么简单。它要求我们重新定义监控的边界:从二元对错走向概率评估,从单点告警走向端到端追踪,从被动响应走向主动的质量与成本治理。
2026 年的 AI 工程化竞争,可观测性能力将直接决定团队能否在成本、质量和可靠性之间找到最优平衡点。现在投入建设的可观测性基础设施,就是明天 AI 产品竞争力的护城河。