编程 AI应用可观测性工程2026:LLM调用追踪、评估体系与成本监控全栈实践

2026-06-18 18:33:15 +0800 CST views 11

AI 应用可观测性工程 2026:LLM 调用追踪、评估体系与成本监控全栈实践

前言

你的 AI 应用上线了,用户在用,但你真的知道它在做什么吗?

传统应用的可观测性体系(Metrics / Logs / Traces)在 LLM 应用上遭遇了全新挑战:一个 LLM 调用可能耗时 3-30 秒,成本从 0.01 美元到 1 美元不等,而「正确性」这个指标根本无法用日志里的状态码来衡量。更棘手的是,LLM 的输出是非确定性的——相同输入可能产生不同输出,这让传统「有错误就告警」的逻辑完全失效。

本文系统介绍 2026 年 AI 应用可观测性工程的完整体系,从 Tracing 到评估,从成本监控到质量告警,构建真正可运营的 LLM 监控平台。


一、AI 可观测性的特殊挑战

1.1 与传统应用的根本差异

维度传统应用LLM 应用
延迟ms 级,P99 < 1s秒级,P99 可能 30s+
成本固定基础设施成本按 Token 计费,高度可变
错误定义HTTP 状态码 / 异常语义错误(幻觉、格式错误、逻辑谬误)
质量评估精确的成功 / 失败模糊的好 / 差,需人工或 LLM 评估
调试确定性,可复现非确定性,相同输入可能不同输出

传统应用的监控哲学是「对错二分」——200 OK 就成功,500 就失败。但 LLM 的世界是概率分布,输出质量是一个连续光谱,必须重新定义「可观测」的边界。

1.2 AI 可观测性的四层需求金字塔

LLM 应用的可观测性需要覆盖四个层次:

┌─────────────────────────────────────────┐
│         业务层:任务成功率、用户满意度     │
├─────────────────────────────────────────┤
│    质量层:输出正确性、幻觉率、格式合规率   │
├─────────────────────────────────────────┤
│      成本层:Token 消耗、API 费用、缓存命中率 │
├─────────────────────────────────────────┤
│      基础层:延迟、错误率、并发、重试次数    │
└─────────────────────────────────────────┘

每一层都有独特的数据采集方式和告警策略,我们逐一拆解。


二、追踪体系:从单次调用到完整链路

2.1 OpenTelemetry for LLM:行业事实标准

OpenTelemetry 已成为 LLM 追踪的事实标准。2026 年发布的 GenAI 语义约定(Semantic Conventions)定义了标准属性命名空间,让所有 LLM 提供商的追踪数据格式统一:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# ── 初始化追踪 Provider ──
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("llm-app")

def traced_llm_call(prompt: str, model: str = "gpt-4o", temperature: float = 0.7):
    with tracer.start_as_current_span("llm.chat") as span:
        # 遵循 GenAI 语义约定(gen_ai.* 命名空间)
        span.set_attribute("gen_ai.system", "openai")
        span.set_attribute("gen_ai.operation.name", "chat")
        span.set_attribute("gen_ai.request.model", model)
        span.set_attribute("gen_ai.request.temperature", temperature)
        span.set_attribute("gen_ai.request.max_tokens", 2048)

        # 记录输入(注意 PII 处理,只截取前 1000 字符避免 Span 过大)
        span.set_attribute("gen_ai.prompt", prompt[:1000])

        import openai
        client = openai.OpenAI()
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            temperature=temperature,
        )

        # 记录输出和用量统计
        usage = response.usage
        span.set_attribute("gen_ai.usage.input_tokens", usage.prompt_tokens)
        span.set_attribute("gen_ai.usage.output_tokens", usage.completion_tokens)
        span.set_attribute("gen_ai.usage.total_tokens", usage.total_tokens)
        span.set_attribute("gen_ai.response.model", response.model)

        content = response.choices[0].message.content
        span.set_attribute("gen_ai.completion", content[:1000])

        # 计算单次调用成本(以 GPT-4o 2026 年定价为参考)
        input_cost = usage.prompt_tokens * 2.5 / 1_000_000  # $2.5 / 1M tokens
        output_cost = usage.completion_tokens * 10 / 1_000_000  # $10 / 1M tokens
        span.set_attribute("gen_ai.cost.usd", round(input_cost + output_cost, 6))

        return content

关键设计原则:所有 gen_ai.* 属性严格遵循 OpenTelemetry GenAI 语义约定,保证跨平台兼容性。禁止将完整 prompt(含用户隐私数据)记录到 Span 属性中。

2.2 多层 Span 架构:RAG 完整追踪

真实生产环境中的 LLM 调用往往是 RAG(检索增强生成)链路的一部分,需要用 Span 嵌套追踪每个子步骤:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
import time

class RAGTracer:
    def __init__(self, service_name: str):
        self.tracer = trace.get_tracer(service_name)
        self.spans = {}

    def trace_retrieval(self, query: str, top_k: int = 5):
        with self.tracer.start_as_current_span("rag.retrieval") as span:
            span.set_attribute("rag.query.length", len(query))
            span.set_attribute("rag.top_k", top_k)
            span.set_attribute("rag.retriever.type", "dense+sparse-hybrid")

            start = time.time()
            chunks = self.vector_store.similarity_search(query, k=top_k)
            retrieval_ms = (time.time() - start) * 1000

            span.set_attribute("rag.retrieval.duration_ms", round(retrieval_ms, 2))
            span.set_attribute("rag.chunks.retrieved", len(chunks))
            span.set_attribute("rag.chunks.avg_length", sum(len(c.page_content) for c in chunks) // len(chunks))

            # 计算召回质量分数
            relevance_scores = [c.metadata.get("score", 0.0) for c in chunks]
            span.set_attribute("rag.relevance.avg_score", round(sum(relevance_scores) / len(relevance_scores), 3))
            span.set_attribute("rag.relevance.min_score", round(min(relevance_scores), 3))

            return chunks

    def trace_generation(self, prompt: str, model: str):
        with self.tracer.start_as_current_span("rag.generation") as span:
            span.set_attribute("gen_ai.system", "openai")
            span.set_attribute("gen_ai.request.model", model)
            span.set_attribute("gen_ai.prompt.length", len(prompt))

            start = time.time()
            answer = self.llm.call(prompt)
            generation_ms = (time.time() - start) * 1000

            span.set_attribute("gen_ai.usage.total_tokens", answer.usage.total_tokens)
            span.set_attribute("gen_ai.generation.duration_ms", round(generation_ms, 2))
            span.set_attribute("gen_ai.completion.length", len(answer.content))

            return answer

    def trace_full_rag(self, query: str):
        """
        完整 RAG 链路:从查询解析 → 检索 → 重排序 → 生成 → 评估
        每个步骤都是一个子 Span,父 Span 聚合全链路指标
        """
        with self.tracer.start_as_current_span("rag.full_pipeline") as parent_span:
            parent_span.set_attribute("rag.query", query[:500])

            # Step 1: 查询改写(可选,优化召回)
            rewritten = self.query_rewriter.rewrite(query)
            parent_span.set_attribute("rag.query.rewritten", rewritten[:200])
            parent_span.set_attribute("rag.query.rewritten.enabled", rewritten != query)

            # Step 2: 向量检索
            chunks = self.trace_retrieval(rewritten)

            # Step 3: 重排序(可选,使用 Cross-Encoder)
            if self.reranker:
                reranked = self.reranker.rerank(query, chunks)
                parent_span.set_attribute("rag.reranking.enabled", True)
                parent_span.set_attribute("rag.reranking.top", len(reranked))
            else:
                reranked = chunks
                parent_span.set_attribute("rag.reranking.enabled", False)

            # Step 4: 构建 Prompt 并生成
            context = "\n\n".join(c.page_content for c in reranked)
            prompt = f"基于以下上下文回答问题。\n\n上下文:{context}\n\n问题:{query}"
            answer = self.trace_generation(prompt, model="gpt-4o")

            # Step 5: 后处理(格式校验、敏感词过滤等)
            with self.tracer.start_as_current_span("rag.postprocess") as post_span:
                cleaned = self.post_processor.clean(answer.content)
                post_span.set_attribute("rag.postprocess.format_valid", self.post_processor.is_valid_json(cleaned))

            parent_span.set_attribute("rag.chunks.used", len(reranked))
            parent_span.set_attribute("rag.answer.length", len(cleaned))

            return cleaned

通过这种嵌套 Span 结构,可以在 Jaeger / Tempo 等追踪平台中看到一个完整 RAG 请求的瀑布图,精准定位是检索质量差还是生成质量差。

2.3 多 Agent 链路追踪:跨 Agent 上下文传播

多 Agent 系统中,每个 Agent 都有独立的 LLM 调用,但需要共享同一个 TraceID 才能还原完整协作链路:

import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time

# ── 用 ContextVar 实现 Trace 上下文跨协程传播 ──
current_trace_id: ContextVar[str] = ContextVar('trace_id', default='')
current_span_id: ContextVar[str] = ContextVar('span_id', default='')

@dataclass
class AgentSpan:
    trace_id: str
    span_id: str
    parent_span_id: str
    agent_name: str
    agent_role: str
    input_summary: str
    output_summary: str
    model: str
    start_time: float
    end_time: Optional[float] = None
    input_tokens: int = 0
    output_tokens: int = 0
    error: Optional[str] = None

    @property
    def duration_ms(self) -> float:
        if self.end_time is None:
            return 0.0
        return (self.end_time - self.start_time) * 1000

class AgentTracer:
    """
    轻量级多 Agent 追踪器,支持:
    - 自动生成 TraceID 并在所有 Agent 间共享
    - 每个 Agent 的 LLM 调用记录为独立 Span
    - 最终导出为标准 OTLP 格式或 JSON 供自建平台消费
    """

    def __init__(self, trace_id: str = None):
        self.trace_id = trace_id or str(uuid.uuid4())
        self.spans: list[AgentSpan] = []
        current_trace_id.set(self.trace_id)

    def start_span(self, agent_name: str, agent_role: str, input_summary: str, model: str) -> AgentSpan:
        span = AgentSpan(
            trace_id=self.trace_id,
            span_id=str(uuid.uuid4()),
            parent_span_id=current_span_id.get(''),
            agent_name=agent_name,
            agent_role=agent_role,
            input_summary=input_summary[:200],
            model=model,
            start_time=time.time(),
            output_summary='',
        )
        self.spans.append(span)
        current_span_id.set(span.span_id)
        return span

    def end_span(self, span: AgentSpan, output_summary: str = '', **usage_kwargs):
        span.end_time = time.time()
        span.output_summary = output_summary[:200]
        span.input_tokens = usage_kwargs.get('input_tokens', 0)
        span.output_tokens = usage_kwargs.get('output_tokens', 0)
        # 恢复父级 SpanID(支持嵌套)
        parent = next((s for s in self.spans if s.span_id == span.parent_span_id), None)
        if parent:
            current_span_id.set(parent.span_id)
        else:
            current_span_id.set('')

    def export_otlp(self) -> dict:
        """导出为 OTLP Span 格式,可发送给任何 OTLP 兼容后端"""
        return {
            "resourceSpans": [{
                "resource": {
                    "attributes": [
                        {"key": "service.name", "value": {"stringValue": "multi-agent-system"}},
                        {"key": "multiagent.trace_id", "value": {"stringValue": self.trace_id}},
                    ]
                },
                "scopeSpans": [{
                    "spans": [
                        {
                            "traceId": self.trace_id,
                            "spanId": s.span_id,
                            "parentSpanId": s.parent_span_id or "0000000000000000",
                            "name": f"agent.{s.agent_name}",
                            "kind": 1,  # SPAN_KIND_INTERNAL
                            "startTimeUnixNano": int(s.start_time * 1e9),
                            "endTimeUnixNano": int(s.end_time * 1e9) if s.end_time else 0,
                            "attributes": [
                                {"key": "agent.name", "value": {"stringValue": s.agent_name}},
                                {"key": "agent.role", "value": {"stringValue": s.agent_role}},
                                {"key": "gen_ai.system", "value": {"stringValue": s.model}},
                                {"key": "gen_ai.usage.input_tokens", "value": {"intValue": s.input_tokens}},
                                {"key": "gen_ai.usage.output_tokens", "value": {"intValue": s.output_tokens}},
                                {"key": "error", "value": {"stringValue": s.error}} if s.error else None,
                            ],
                        }
                        for s in self.spans
                    ]
                }]
            }]
        }

    def get_topology(self) -> dict:
        """生成 Agent 调用拓扑图数据,用于可视化"""
        return {
            "trace_id": self.trace_id,
            "nodes": [
                {"id": s.span_id, "label": s.agent_name, "role": s.agent_role, "duration_ms": round(s.duration_ms, 1)}
                for s in self.spans
            ],
            "edges": [
                {"source": s.parent_span_id, "target": s.span_id}
                for s in self.spans if s.parent_span_id
            ],
            "summary": {
                "total_agents": len(set(s.agent_name for s in self.spans)),
                "total_calls": len(self.spans),
                "total_duration_ms": sum(s.duration_ms for s in self.spans),
                "total_tokens": sum(s.input_tokens + s.output_tokens for s in self.spans),
            }
        }

2.4 LangFuse 集成:专用 LLM 追踪平台

对于不想自建追踪基础设施的团队,LangFuse 是目前最成熟的开源 LLM 可观测性平台:

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from langfuse.decorators import retry

langfuse = Langfuse(
    public_key="your-public-key",
    secret_key="your-secret-key",
    host="https://cloud.langfuse.com"  # 或自部署
)

@observe(name="rag-pipeline", metadata={"pipeline_version": "v2.1.0"})
def rag_answer(question: str, user_id: str = None):
    """
    完整 RAG 流程追踪:LangFuse 自动处理追踪、采样和存储
    """
    # 顶层标注输入
    langfuse_context.update_current_observation(
        input={"question": question, "user_id": user_id},
        metadata={"source": "api", "env": "production"},
    )

    # ── 检索阶段 ──
    with langfuse_context.observe_span(name="retrieval", metadata={"retriever": "pg_vector"}):
        chunks = vector_store.similarity_search(question, k=5)
        langfuse_context.update_current_observation(
            output={"num_chunks": len(chunks)},
            metadata={"avg_chunk_length": sum(len(c.page_content) for c in chunks) // max(len(chunks), 1)},
        )

    # ── 生成阶段 ──
    with langfuse_context.observe_span(name="generation", metadata={"model": "gpt-4o"}):
        prompt = build_rag_prompt(question, chunks)
        langfuse_context.update_current_observation(
            input={"prompt_length": len(prompt), "num_context_chunks": len(chunks)},
        )
        answer = llm.call(prompt)
        langfuse_context.update_current_observation(
            output={"answer_length": len(answer.content), "finish_reason": answer.finish_reason},
        )

    # ── 更新顶层 Trace ──
    langfuse_context.update_current_trace(
        output={"answer": answer.content},
        tags=["rag", "production", "v2.1"],
        user_id=user_id,
    )

    return answer.content

# ── 自动重试带追踪 ──
@retry(max_attempts=3, delay=1.0, exponential_backoff=True)
@observe(name="llm-call")
def llm_call_with_retry(prompt: str, model: str):
    return client.chat.completions.create(model=model, messages=[{"role": "user", "content": prompt}])

三、评估体系:衡量输出质量

3.1 传统指标的失效

传统应用的「错误率」指标在 LLM 场景下几乎毫无意义。一个返回 200 的 LLM 调用,可能输出了完全 hallucinate(幻觉)的错误信息。必须建立新的评估维度:

评估维度定义采集方式
格式合规率输出是否符合预期的结构(JSON/ Markdown / 纯文本)正则 / JSON Schema 校验
幻觉率回答与检索上下文的一致程度RAG 召回率 + LLM Judge
相关性得分回答与用户问题的语义匹配度Embedding Cosine 相似度 / LLM Judge
安全合规率是否包含敏感词 / 越狱内容关键词匹配 + 分类模型
任务完成率用户意图是否被正确理解和处理人工 / 自动化评分

3.2 LLM-as-Judge:用模型评估模型

2026 年最主流的自动化评估方案是 LLM Judge——用一个强模型评估目标模型的输出质量:

from openai import OpenAI
from pydantic import BaseModel, field_validator
from typing import Literal

client = OpenAI()

class EvaluationResult(BaseModel):
    relevance_score: float  # 0-10
    factual_accuracy: float  # 0-10
    hallucination_detected: bool
    format_compliant: bool
    overall_quality: Literal["excellent", "good", "acceptable", "poor"]
    reasoning: str
    issues: list[str]

    @field_validator("relevance_score", "factual_accuracy")
    @classmethod
    def must_be_in_range(cls, v):
        if not 0 <= v <= 10:
            raise ValueError(f"Score {v} out of range [0, 10]")
        return v

def llm_judge_evaluate(question: str, answer: str, context: str = "") -> EvaluationResult:
    """
    用 GPT-4o 评估目标模型输出的质量。
    评估维度:相关性、事实准确性、幻觉检测、格式合规。
    """
    judge_prompt = f"""你是一个严格的 AI 输出质量评估专家。
请评估以下 AI 回答在四个维度的质量,并给出评分和详细理由。

【用户问题】
{question}

【AI 回答】
{answer}

【参考上下文(可选)】
{context}

请以 JSON 格式返回评估结果:
{{
  "relevance_score": 0-10,          # 回答与问题的相关性
  "factual_accuracy": 0-10,         # 事实准确性
  "hallucination_detected": true/false,  # 是否检测到幻觉
  "format_compliant": true/false,  # 格式是否合规
  "overall_quality": "excellent|good|acceptable|poor",
  "reasoning": "详细评估理由",
  "issues": ["问题1", "问题2"]
}}
"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": judge_prompt}],
        response_format={"type": "json_object"},
        temperature=0.1,  # 低温度保证评估一致性
    )

    import json
    raw = json.loads(response.choices[0].message.content)
    return EvaluationResult(**raw)

# ── 生产级评估管道:异步批量评估 ──
import asyncio
from dataclasses import dataclass, asdict

@dataclass
class EvaluationRecord:
    trace_id: str
    question: str
    answer: str
    context: str
    result: EvaluationResult
    evaluated_at: float

class EvaluationPipeline:
    def __init__(self, judge_model: str = "gpt-4o", batch_size: int = 10):
        self.client = OpenAI()
        self.judge_model = judge_model
        self.batch_size = batch_size
        self.results: list[EvaluationRecord] = []

    async def evaluate_async(self, trace_id: str, question: str, answer: str, context: str = ""):
        """异步评估单个回答"""
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            None,
            lambda: llm_judge_evaluate(question, answer, context)
        )
        record = EvaluationRecord(
            trace_id=trace_id,
            question=question,
            answer=answer,
            context=context,
            result=result,
            evaluated_at=time.time(),
        )
        self.results.append(record)
        return record

    async def run_batch(self, items: list[dict]):
        """批量评估,可用于每日离线批处理"""
        tasks = [
            self.evaluate_async(
                trace_id=item["trace_id"],
                question=item["question"],
                answer=item["answer"],
                context=item.get("context", ""),
            )
            for item in items
        ]
        await asyncio.gather(*tasks, return_exceptions=True)

    def get_summary(self) -> dict:
        """生成评估汇总报告"""
        if not self.results:
            return {}
        valid = [r for r in self.results if isinstance(r.result, EvaluationResult)]
        return {
            "total_evaluated": len(valid),
            "avg_relevance": sum(r.result.relevance_score for r in valid) / len(valid),
            "avg_accuracy": sum(r.result.factual_accuracy for r in valid) / len(valid),
            "hallucination_rate": sum(1 for r in valid if r.result.hallucination_detected) / len(valid),
            "format_compliance": sum(1 for r in valid if r.result.format_compliant) / len(valid),
            "quality_distribution": {
                q: sum(1 for r in valid if r.result.overall_quality == q)
                for q in ["excellent", "good", "acceptable", "poor"]
            }
        }

3.3 统计级评估:P50 / P90 / P99 分布监控

除了逐条评估,还需要宏观层面的质量趋势监控:

import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta

class QualityMonitor:
    """
    质量趋势监控器:统计质量得分分布,检测质量漂移
    """

    def __init__(self, window_hours: int = 24):
        self.window_hours = window_hours
        self.scores: dict[str, list[float]] = defaultdict(list)
        self.hallucination_events: list[dict] = []
        self.format_violations: list[dict] = []

    def record(self, evaluation: EvaluationResult, trace_id: str, model: str, timestamp: float = None):
        ts = timestamp or time.time()
        self.scores[f"{model}_relevance"].append(evaluation.relevance_score)
        self.scores[f"{model}_accuracy"].append(evaluation.factual_accuracy)
        if evaluation.hallucination_detected:
            self.hallucination_events.append({"trace_id": trace_id, "timestamp": ts})
        if not evaluation.format_compliant:
            self.format_violations.append({"trace_id": trace_id, "timestamp": ts})

    def get_distribution(self, metric: str, model: str) -> dict:
        """计算指定指标的分位数分布"""
        key = f"{model}_{metric}"
        data = np.array(self.scores[key])
        if len(data) == 0:
            return {}
        return {
            "p50": round(float(np.percentile(data, 50)), 2),
            "p90": round(float(np.percentile(data, 90)), 2),
            "p99": round(float(np.percentile(data, 99)), 2),
            "mean": round(float(np.mean(data)), 2),
            "std": round(float(np.std(data)), 2),
            "count": len(data),
        }

    def detect_quality_drift(self, metric: str, model: str, threshold_pct: float = 0.1) -> bool:
        """
        检测质量漂移:如果 p90 得分在过去 window 内下降超过 threshold_pct,触发告警
        """
        dist = self.get_distribution(metric, model)
        if dist["p90"] < 7.0:  # 硬编码阈值,实际应用中应可配置
            return True
        return False

    def generate_report(self, model: str) -> str:
        relevance = self.get_distribution("relevance", model)
        accuracy = self.get_distribution("accuracy", model)
        hallucination_rate = len(self.hallucination_events) / max(len(self.scores[f"{model}_relevance"]), 1)
        format_compliance = 1 - (len(self.format_violations) / max(len(self.scores[f"{model}_relevance"]), 1))

        return f"""## LLM 质量监控报告(过去 {self.window_hours} 小时)

### 模型:{model}

**相关性得分**
- P50: {relevance.get('p50')} | P90: {relevance.get('p90')} | P99: {relevance.get('p99')}
- 均值: {relevance.get('mean')} | 标准差: {relevance.get('std')}

**事实准确性得分**
- P50: {accuracy.get('p50')} | P90: {accuracy.get('p90')} | P99: {accuracy.get('p99')}
- 均值: {accuracy.get('mean')}

**综合指标**
- 幻觉率: {hallucination_rate:.2%}
- 格式合规率: {format_compliance:.2%}
- 样本量: {relevance.get('count', 0)}
"""

四、成本监控:Token 计费的精细化治理

4.1 为什么 LLM 成本监控是工程难题

LLM 成本监控远比传统 API 调用复杂:

  1. 输入输出分离计费:GPT-4o 输入 $2.5/M tokens,输出 $10/M tokens,比例不固定
  2. 上下文长度非线性影响:2048 token 的 prompt 和 128K token 的 prompt 价格差 60 倍
  3. 缓存命中间接降本:OpenAI 的 Cache API 和 Anthropic 的 Thinking Budget 都能节省成本,但需要正确计量
  4. 多模型混合成本:生产系统往往同时调用多个模型,成本结构复杂

4.2 生产级成本追踪架构

from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import time

class CostAlertLevel(Enum):
    NORMAL = "normal"
    WARNING = "warning"   # 单次调用 > $0.05
    CRITICAL = "critical" # 单次调用 > $0.50

@dataclass
class LLMCallRecord:
    """完整的 LLM 调用记录"""
    call_id: str
    trace_id: str
    model: str
    operation: str

    # Token 用量
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

    # 成本(以美元计)
    input_cost: float
    output_cost: float
    total_cost: float

    # 效率指标
    tokens_per_second: float  # 吞吐量
    first_token_latency_ms: float  # 首 token 延迟(流式)

    # 缓存状态
    cache_hit: bool = False
    cache_discount: float = 0.0  # 缓存节省的比例

    # 元数据
    timestamp: float = field(default_factory=time.time)
    duration_ms: float = 0.0
    status: str = "success"  # success | error | timeout

class CostTracker:
    """
    生产级 LLM 成本追踪器,支持:
    - 单次调用成本精确计算
    - 实时累计成本和趋势
    - 异常调用自动告警
    - 缓存节省统计
    """

    # 2026 年主流模型定价($/M tokens)
    PRICING = {
        "gpt-4o": {"input": 2.5, "output": 10.0},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-sonnet-4": {"input": 3.0, "output": 15.0},
        "claude-opus-4": {"input": 15.0, "output": 75.0},
        "gemini-2.5-pro": {"input": 1.25, "output": 5.0},
        "deepseek-v3": {"input": 0.27, "output": 1.10},
    }

    # 告警阈值(美元)
    ALERT_THRESHOLDS = {
        "warning": 0.05,
        "critical": 0.50,
    }

    def __init__(self):
        self.records: list[LLMCallRecord] = []
        self.cost_by_model: dict[str, float] = defaultdict(float)
        self.cost_by_operation: dict[str, float] = defaultdict(float)
        self.cache_savings: float = 0.0
        self.alerts: list[dict] = []

    def record(
        self,
        trace_id: str,
        model: str,
        operation: str,
        usage: dict,
        duration_ms: float,
        cache_hit: bool = False,
    ) -> LLMCallRecord:
        pricing = self.PRICING.get(model, {"input": 0, "output": 0})
        input_cost = usage["prompt_tokens"] * pricing["input"] / 1_000_000
        output_cost = usage["completion_tokens"] * pricing["output"] / 1_000_000

        # 缓存折扣(OpenAI Cached Tokens 享受 90% 折扣)
        cache_discount = 0.9 if cache_hit else 0.0
        cached_input_cost = input_cost * (1 - cache_discount)
        total_cost = cached_input_cost + output_cost

        record = LLMCallRecord(
            call_id=str(uuid.uuid4()),
            trace_id=trace_id,
            model=model,
            operation=operation,
            prompt_tokens=usage["prompt_tokens"],
            completion_tokens=usage["completion_tokens"],
            total_tokens=usage["total_tokens"],
            input_cost=round(cached_input_cost, 6),
            output_cost=round(output_cost, 6),
            total_cost=round(total_cost, 6),
            tokens_per_second=round(usage["completion_tokens"] / max(duration_ms / 1000, 0.001), 1),
            first_token_latency_ms=0.0,
            cache_hit=cache_hit,
            cache_discount=cache_discount,
            duration_ms=duration_ms,
        )

        self.records.append(record)
        self.cost_by_model[model] += total_cost
        self.cost_by_operation[operation] += total_cost

        if cache_hit:
            self.cache_savings += input_cost * cache_discount

        # 触发告警检查
        if total_cost >= self.ALERT_THRESHOLDS["critical"]:
            self.alerts.append({
                "level": "critical",
                "trace_id": trace_id,
                "model": model,
                "cost": total_cost,
                "reason": f"单次调用成本 ${total_cost:.4f} 超过 critical 阈值 ${self.ALERT_THRESHOLDS['critical']}",
            })
        elif total_cost >= self.ALERT_THRESHOLDS["warning"]:
            self.alerts.append({
                "level": "warning",
                "trace_id": trace_id,
                "model": model,
                "cost": total_cost,
            })

        return record

    def get_dashboard(self, window_hours: int = 24) -> dict:
        """生成成本监控大盘数据"""
        cutoff = time.time() - window_hours * 3600
        recent = [r for r in self.records if r.timestamp >= cutoff]

        total_cost = sum(r.total_cost for r in recent)
        total_tokens = sum(r.total_tokens for r in recent)
        avg_cost_per_call = total_cost / max(len(recent), 1)

        # 按模型分解成本
        cost_breakdown = {}
        for model, cost in self.cost_by_model.items():
            model_records = [r for r in recent if r.model == model]
            if model_records:
                cost_breakdown[model] = {
                    "total_cost": round(cost, 4),
                    "call_count": len(model_records),
                    "avg_cost_per_call": round(cost / len(model_records), 4),
                    "total_tokens": sum(r.total_tokens for r in model_records),
                    "cache_savings": round(sum(r.input_cost * r.cache_discount for r in model_records), 4),
                }

        return {
            "window_hours": window_hours,
            "total_cost_usd": round(total_cost, 4),
            "total_calls": len(recent),
            "avg_cost_per_call_usd": round(avg_cost_per_call, 4),
            "total_tokens": total_tokens,
            "cache_savings_usd": round(self.cache_savings, 4),
            "cache_savings_rate": round(self.cache_savings / max(total_cost + self.cache_savings, 0.001), 4),
            "cost_by_model": cost_breakdown,
            "cost_by_operation": dict(sorted(self.cost_by_operation.items(), key=lambda x: -x[1])[:10]),
            "active_alerts": self.alerts[-10:],  # 最近 10 条告警
        }

4.3 成本优化:缓存与 Prompt 压缩

成本监控的最终目的是优化。三个最有效的降本手段:

# ── 策略 1:语义缓存(Semantic Cache)──
class SemanticCache:
    """
    基于 Embedding 相似度的语义缓存。
    当新请求与历史请求的语义相似度 > 0.95 时,直接返回缓存结果。
    节省 100% 的 LLM 调用成本。
    """

    def __init__(self, threshold: float = 0.95, max_entries: int = 10000):
        self.threshold = threshold
        self.cache: list[tuple[np.ndarray, dict]] = []  # (embedding, {prompt, response, usage})
        self.stats = {"hits": 0, "misses": 0}

    def _embed(self, text: str) -> np.ndarray:
        return self.embedding_model.encode(text)

    def get(self, prompt: str) -> Optional[dict]:
        query_emb = self._embed(prompt)
        for cached_emb, cached_data in self.cache:
            similarity = float(np.dot(query_emb, cached_emb) / (np.linalg.norm(query_emb) * np.linalg.norm(cached_emb)))
            if similarity >= self.threshold:
                self.stats["hits"] += 1
                return cached_data
        self.stats["misses"] += 1
        return None

    def put(self, prompt: str, response: dict, usage: dict):
        entry = (self._embed(prompt), {"prompt": prompt, "response": response, "usage": usage})
        self.cache.append(entry)
        if len(self.cache) > self.max_entries:
            self.cache.pop(0)

    def hit_rate(self) -> float:
        total = self.stats["hits"] + self.stats["misses"]
        return self.stats["hits"] / max(total, 1)


# ── 策略 2:Prompt 压缩 ──
def compress_prompt(prompt: str, max_chars: int = 8000) -> str:
    """
    简单版 Prompt 压缩:
    1. 移除冗余空白
    2. 截断过长的上下文引用
    3. 对于超过 max_chars 的输入,启用 summary/abstract 中间步骤
    """
    import re
    cleaned = re.sub(r'\s+', ' ', prompt).strip()
    if len(cleaned) <= max_chars:
        return cleaned
    # 激进压缩:保留前 60% + 后 40%
    keep_front = int(max_chars * 0.6)
    keep_back = max_chars - keep_front
    return cleaned[:keep_front] + f"\n... [压缩内容,原始长度 {len(cleaned)} 字符] ...\n" + cleaned[-keep_back:]

五、告警体系:三位一体的智能告警

5.1 告警分层设计

LLM 应用告警必须分层,不能用传统应用的「错误率 > 1% 就告警」策略:

from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio

class AlertSeverity(Enum):
    P0_CRITICAL = "critical"   # 立即处理:服务不可用或成本异常
    P1_HIGH = "high"           # 2小时内处理:质量严重下降
    P2_MEDIUM = "medium"       # 24小时内处理:趋势异常
    P3_LOW = "low"             # 例行关注:统计偏差

@dataclass
class Alert:
    id: str
    severity: AlertSeverity
    title: str
    description: str
    metric: str
    current_value: float
    threshold: float
    affected_traces: list[str]
    suggested_action: str
    created_at: float

class LLMLAlertManager:
    """
    AI 应用智能告警管理器。
    支持多层指标联动告警,减少告警风暴。
    """

    # ── 告警规则配置 ──
    RULES = [
        # P0: 成本异常
        {"severity": AlertSeverity.P0_CRITICAL, "metric": "single_call_cost", "op": "gt", "threshold": 0.50,
         "window": "instant", "cooldown_minutes": 5,
         "title": "LLM 单次调用成本异常",
         "action": "立即检查是否 prompt 过长或陷入循环调用"},

        # P0: 错误率飙升
        {"severity": AlertSeverity.P0_CRITICAL, "metric": "error_rate", "op": "gt", "threshold": 0.05,
         "window": 300, "cooldown_minutes": 5,
         "title": "LLM 错误率超过 5%",
         "action": "检查 LLM API 服务状态和模型可用性"},

        # P1: 质量严重下降
        {"severity": AlertSeverity.P1_HIGH, "metric": "avg_relevance_p90", "op": "lt", "threshold": 7.0,
         "window": 3600, "cooldown_minutes": 30,
         "title": "回答相关性 P90 低于 7.0",
         "action": "检查 RAG 检索质量和 Prompt 有效性"},

        # P1: 幻觉率上升
        {"severity": AlertSeverity.P1_HIGH, "metric": "hallucination_rate", "op": "gt", "threshold": 0.15,
         "window": 3600, "cooldown_minutes": 60,
         "title": "幻觉率超过 15%",
         "action": "检查检索上下文质量和模型 temperature 设置"},

        # P2: 成本趋势上升
        {"severity": AlertSeverity.P2_MEDIUM, "metric": "cost_per_hour", "op": "gt", "threshold": None,  # 动态阈值
         "window": 3600, "cooldown_minutes": 120,
         "title": "小时成本环比上升 50%",
         "action": "分析高消耗请求的分布,考虑启用缓存"},

        # P2: 延迟上升
        {"severity": AlertSeverity.P2_MEDIUM, "metric": "avg_latency_p95", "op": "gt", "threshold": 15000,
         "window": 1800, "cooldown_minutes": 30,
         "title": "LLM P95 延迟超过 15 秒",
         "action": "检查是否触及模型限速,考虑降级或限流"},

        # P3: Token 效率下降
        {"severity": AlertSeverity.P3_LOW, "metric": "output_token_ratio", "op": "lt", "threshold": 0.3,
         "window": 7200, "cooldown_minutes": 240,
         "title": "输出 Token 占比低于 30%(Prompt 膨胀)",
         "action": "审查 Prompt 设计,移除冗余上下文"},
    ]

    def __init__(self, notifier: Callable[[Alert], None] = None):
        self.notifier = notifier or self._default_notifier
        self.cooldowns: dict[str, float] = {}
        self.active_alerts: list[Alert] = []

    def _default_notifier(self, alert: Alert):
        print(f"[{alert.severity.value.upper()}] {alert.title}: {alert.description}")

    def check_rules(self, metrics: dict) -> list[Alert]:
        """检查所有告警规则,返回触发的告警列表"""
        triggered = []
        for rule in self.RULES:
            # 检查冷却期
            rule_id = f"{rule['metric']}_{rule['op']}_{rule['threshold']}"
            last_fired = self.cooldowns.get(rule_id, 0)
            cooldown_sec = rule.get("cooldown_minutes", 30) * 60
            if time.time() - last_fired < cooldown_sec:
                continue

            current = metrics.get(rule["metric"], 0)
            threshold = rule["threshold"]

            # 动态阈值(环比)
            if rule["metric"] == "cost_per_hour" and threshold is None:
                threshold = metrics.get("cost_per_hour_baseline", 0) * 1.5

            fired = False
            if rule["op"] == "gt" and current > threshold:
                fired = True
            elif rule["op"] == "lt" and current < threshold:
                fired = True

            if fired:
                alert = Alert(
                    id=str(uuid.uuid4()),
                    severity=rule["severity"],
                    title=rule["title"],
                    description=f"{rule['metric']} = {current:.4f},阈值 = {threshold}",
                    metric=rule["metric"],
                    current_value=current,
                    threshold=threshold,
                    affected_traces=metrics.get("recent_error_traces", [])[:5],
                    suggested_action=rule["action"],
                    created_at=time.time(),
                )
                triggered.append(alert)
                self.cooldowns[rule_id] = time.time()
                self.active_alerts.append(alert)

        # 按严重性排序,只通知最高优先级的
        triggered.sort(key=lambda a: [AlertSeverity.P0_CRITICAL, AlertSeverity.P1_HIGH, AlertSeverity.P2_MEDIUM, AlertSeverity.P3_LOW].index(a.severity))

        for alert in triggered[:3]:  # 每次最多发 3 条告警
            self.notifier(alert)

        return triggered

5.2 告警抑制:避免告警风暴

LLM 应用中,一次模型故障可能导致同一 TraceID 产生几十条相关告警。用 TraceID 聚合来抑制:

def group_alerts_by_trace(alerts: list[Alert]) -> list[dict]:
    """按 TraceID 聚合告警,减少告警风暴"""
    from collections import defaultdict
    by_trace = defaultdict(list)
    for alert in alerts:
        for trace_id in alert.affected_traces:
            by_trace[trace_id].append(alert)

    grouped = []
    for trace_id, trace_alerts in by_trace.items():
        # 同 TraceID 告警合并为一条,severity 取最高
        max_severity = min(a.severity for a in trace_alerts)
        grouped.append({
            "trace_id": trace_id,
            "severity": max_severity,
            "alert_count": len(trace_alerts),
            "titles": [a.title for a in trace_alerts],
            "suggested_action": trace_alerts[0].suggested_action,  # 取最紧急的
        })
    return grouped

六、实战:构建完整的可观测性数据管道

6.1 架构总览

┌─────────────────────────────────────────────────────────────────┐
│                        Your LLM Application                      │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────────┐    │
│   │  LangFuse│  │ Prometheus│  │  Loki   │  │   Jaeger    │    │
│   │(LLM专有) │  │(Metrics)  │  │ (Logs)  │  │  (Traces)   │    │
│   └────┬─────┘  └────┬─────┘  └────┬─────┘  └──────┬───────┘    │
└────────┼─────────────┼─────────────┼────────────────┼───────────┘
         │             │             │                │
         ▼             ▼             ▼                ▼
┌─────────────────────────────────────────────────────────────────┐
│                   OpenTelemetry Collector                        │
│  ┌──────────────┐  ┌───────────────┐  ┌───────────────────┐   │
│  │ Span Enricher │  │ Tail Sampler  │  │  Cost Calculator  │   │
│  │ (注入业务标签)│  │ (尾部采样)     │  │  (计算真实成本)   │   │
│  └──────────────┘  └───────────────┘  └───────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
         │             │             │                │
         ▼             ▼             ▼                ▼
┌─────────────┐  ┌─────────────┐  ┌──────────────────────────────┐
│  Grafana    │  │  Langfuse   │  │   自建指标存储 (ClickHouse)   │
│  (Metrics)  │  │ (LLM Traces)│  │   (低成本历史分析)            │
└─────────────┘  └─────────────┘  └──────────────────────────────┘

6.2 OTEL Collector 配置:尾部采样 + 成本计算

# otel-collector-config.yaml(精简版核心配置)
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  # 内存保护:超过 80% 内存时丢弃低优先级数据
  memory_limiter:
    check_interval: 5s
    limit_mib: 512
    spike_limit_mib: 100

  # 尾部采样:先缓存所有 Span,决策时看到完整请求
  tail_sampling:
    decision_wait: 10s
    policies:
      # 保留所有错误请求
      - name: errors-policy
        type: status_code
        status_code: { status_codes: [ERROR] }

      # 保留超过 5 秒的慢请求
      - name: slow-traces-policy
        type: latency
        latency: { threshold_ms: 5000 }

      # 保留特定业务标签的请求(如高价值用户)
      - name: business-context-policy
        type: string_attribute
        string_attribute: { key: "biz.user_tier", values: ["premium", "enterprise"] }

      # 采样 1% 的正常请求(节省成本)
      - name: probabilistic-policy
        type: probabilistic
        probabilistic: { sampling_percentage: 1 }

  # 自定义:计算 LLM 调用成本
  transform:
    error_mode: ignore
    trace_statements:
      - context: span
        statements:
          - set(attributes["llm.cost.usd"],
                attributes["gen_ai.usage.input_tokens"] * 2.5 / 1000000 +
                attributes["gen_ai.usage.output_tokens"] * 10.0 / 1000000
              ) where attributes["gen_ai.system"] != nil

exporters:
  prometheus:
    endpoint: "0.0.0.0:8889"
    namespace: "llm_app"
    const_labels:
      service: "your-llm-service"

  otlp/tempo:
    endpoint: "tempo:4317"
    tls:
      insecure: true

  loki:
    endpoint: "http://loki:3100/loki/api/v1/push"

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, tail_sampling, transform]
      exporters: [otlp/tempo]
    metrics:
      receivers: [otlp]
      processors: [memory_limiter]
      exporters: [prometheus]
    logs:
      receivers: [otlp]
      processors: [memory_limiter]
      exporters: [loki]

6.3 Grafana Dashboard JSON 配置

{
  "title": "LLM 应用可观测性大盘",
  "panels": [
    {
      "title": "实时调用量 & 成本",
      "type": "timeseries",
      "targets": [
        {
          "expr": "sum(rate(llm_app_calls_total[5m])) by (model)",
          "legendFormat": "{{model}} QPS"
        },
        {
          "expr": "sum(rate(llm_app_cost_usd_total[5m])) by (model)",
          "legendFormat": "{{model}} 成本 $/s"
        }
      ],
      "gridPos": {"x": 0, "y": 0, "w": 12, "h": 8}
    },
    {
      "title": "Token 效率分布",
      "type": "histogram",
      "targets": [
        {
          "expr": "histogram_quantile(0.50, sum(rate(llm_app_tokens_bucket[5m])) by (le, model))",
          "legendFormat": "P50 - {{model}}"
        },
        {
          "expr": "histogram_quantile(0.95, sum(rate(llm_app_tokens_bucket[5m])) by (le, model))",
          "legendFormat": "P95 - {{model}}"
        },
        {
          "expr": "histogram_quantile(0.99, sum(rate(llm_app_tokens_bucket[5m])) by (le, model))",
          "legendFormat": "P99 - {{model}}"
        }
      ],
      "gridPos": {"x": 12, "y": 0, "w": 12, "h": 8}
    },
    {
      "title": "质量趋势(LLM Judge 评分)",
      "type": "timeseries",
      "targets": [
        {
          "expr": "avg(llm_app_quality_relevance_p90{service=~\"$service\"})",
          "legendFormat": "相关性 P90"
        },
        {
          "expr": "avg(llm_app_quality_accuracy_p90{service=~\"$service\"})",
          "legendFormat": "准确性 P90"
        },
        {
          "expr": "llm_app_hallucination_rate{service=~\"$service\"}",
          "legendFormat": "幻觉率"
        }
      ],
      "gridPos": {"x": 0, "y": 8, "w": 12, "h": 8}
    },
    {
      "title": "按业务标签的成本分解",
      "type": "piechart",
      "targets": [
        {
          "expr": "sum(increase(llm_app_cost_usd_total[24h])) by (operation)",
          "legendFormat": "{{operation}}"
        }
      ],
      "gridPos": {"x": 12, "y": 8, "w": 12, "h": 8}
    }
  ]
}

七、总结:AI 可观测性工程的五大支柱

7.1 技术选型建议

场景推荐方案
快速起步 / 小团队LangFuse Cloud + Grafana + Prometheus
中等规模 / 已有 OTel 基础设施自建 OTel Collector + Tempo + Langfuse
大规模 / 多租户 / 企业级全自建:ClickHouse + Kafka + Grafana + 自研评估引擎

7.2 实施路线图

第一阶段(1-2 周):基础 Tracing

  • 接入 OpenTelemetry,自动注入 gen_ai.* 属性
  • 配置 Tail-Based Sampling,保留所有错误和慢请求
  • 在 Jaeger 中可视化完整调用链路

第二阶段(2-3 周):成本可见性

  • 实现 CostTracker,精确计算每次 LLM 调用成本
  • 构建成本监控大盘,按模型 / 操作 / 用户维度分解
  • 接入语义缓存,目标缓存命中率 > 30%

第三阶段(3-4 周):质量评估

  • 接入 LLM Judge,建立自动化评估管道
  • 监控幻觉率、相关性、格式合规率三大核心质量指标
  • 配置 P1 及以上质量告警

第四阶段(持续):智能化运维

  • 建立质量趋势分析,检测模型能力漂移
  • 引入 A/B Testing 框架,对比不同 Prompt / 模型的真实效果
  • 构建根因分析能力:从成本异常 → Trace → Logs → 定位根因

7.3 关键指标清单

✅ 基础层(必须)
  - 调用量 QPS(按模型 / 操作)
  - P50 / P95 / P99 延迟
  - 错误率(API 层面)
  - 单次 / 累计成本(按模型)

✅ 成本层(强烈建议)
  - 缓存命中率 & 节省金额
  - Token 效率(输出 / 输入比例)
  - 平均每次调用成本

✅ 质量层(进阶)
  - LLM Judge 相关性得分分布
  - 幻觉率(小时级统计)
  - 格式合规率

✅ 业务层(高阶)
  - 端到端任务成功率(用户意图达成)
  - 用户反馈关联(Thumb up/down → TraceID)

结语

LLM 应用的可观测性不是「多加几个 metrics」那么简单。它要求我们重新定义监控的边界:从二元对错走向概率评估,从单点告警走向端到端追踪,从被动响应走向主动的质量与成本治理。

2026 年的 AI 工程化竞争,可观测性能力将直接决定团队能否在成本、质量和可靠性之间找到最优平衡点。现在投入建设的可观测性基础设施,就是明天 AI 产品竞争力的护城河。

推荐文章

网络数据抓取神器 Pipet
2024-11-19 05:43:20 +0800 CST
在Rust项目中使用SQLite数据库
2024-11-19 08:48:00 +0800 CST
Elasticsearch 的索引操作
2024-11-19 03:41:41 +0800 CST
如何在 Vue 3 中使用 TypeScript?
2024-11-18 22:30:18 +0800 CST
一些高质量的Mac软件资源网站
2024-11-19 08:16:01 +0800 CST
程序员茄子在线接单