AI Agent 可观测性深度实战:当黑箱遇见全链路追踪——从 OTel GenAI 语义规范到 LoongSuite Pilot 端侧采集、Python 零代码插桩与安全审计的生产级完全指南
一、引言:为什么 AI Agent 的可观测性不再是可选项
2025 年以来,AI Agent 从实验室走向生产环境。Claude Code 深夜改了你的核心配置文件,你不知道;智能客服 Agent 自作主张取消了用户的订单,决策逻辑无从复盘;多智能体协同任务跑到一半挂了,哪个节点出的问题说不清楚。
这不是个案,这是所有规模化使用 AI Agent 的团队都会遇到的通病:Agent 跑起来了,但你看不见它在干什么。
传统的可观测三板斧——Metrics、Log、Trace——面对 AI Agent 这种新型计算范式,基本等于瞎子摸象。一个包含 10 轮 ReAct 推理的 Agent 任务,传统监控只能看到 10 条独立的 HTTP 请求,完全无法还原 Agent 的决策流程、工具调用顺序、Token 消耗路径。
本文将深入剖析 AI Agent 可观测性的核心难题,基于阿里云 LoongSuite 开源方案(包含 LoongSuite Pilot、LoongSuite Python Agent、LoongSuite GenAI 语义规范),从架构原理到代码实战,给出一套生产级可落地的完整方案。
二、AI Agent 可观测性的三大核心难题
2.1 执行过程黑盒化
传统微服务的调用链是确定性的:A 调 B,B 调 C,路径可预测。但 AI Agent 的执行路径是动态生成的——LLM 决定下一步调用什么工具,工具返回结果后 LLM 再决定下一步。一个 ReAct 循环可能跑 3 轮,也可能跑 30 轮,每次的分支都不同。
# 传统微服务调用链:确定性的
def traditional_flow(user_id):
user = user_service.get(user_id) # 调用1:获取用户
orders = order_service.list(user_id) # 调用2:获取订单
result = payment_service.check(user_id) # 调用3:检查支付
return {"user": user, "orders": orders, "payment": result}
# AI Agent 调用链:动态生成的,不可预测
async def agent_flow(user_query):
# 第1轮 ReAct:LLM 决定先搜索
search_result = await llm_call(user_query) # LLM调用1
tool_result = await tool_search(search_result) # 工具调用1
# 第2轮 ReAct:LLM 根据搜索结果决定读取文件
llm_result = await llm_call(tool_result) # LLM调用2
file_content = await tool_read_file(llm_result) # 工具调用2
# 第3轮 ReAct:LLM 决定还要执行命令
llm_result2 = await llm_call(file_content) # LLM调用3
cmd_result = await tool_bash(llm_result2) # 工具调用3
# ... 可能还有 N 轮,每轮的分支都不同
传统 Trace 只能看到一堆独立的 HTTP 请求,无法还原"第 2 轮推理触发了文件读取"这种因果关系。
2.2 行为轨迹难追溯
AI Agent 有很高的自主操作权限——读写文件、执行系统命令、调用第三方 API。在缺少审计能力的情况下,你无法回答:
- Agent 过去 24 小时修改了哪些文件?
- 那次
rm -rf是谁触发的?什么意图? - Agent 是否把敏感数据外传到了第三方 API?
2.3 成本难度量
大模型 Token 消耗是 Agent 的主要成本来源。一个复杂任务可能涉及数十次 LLM 调用,每次调用的 Token 量差异巨大。没有按 Agent、用户、任务维度的精细化成本拆分,企业根本无法做预算管控。
# 一次简单的 Agent 任务,Token 消耗可能惊人
task = "帮我分析这个项目的代码质量"
# LLM调用1:理解任务 → 2000 tokens
# 工具调用1:读取目录结构
# LLM调用2:决定分析哪些文件 → 3000 tokens
# 工具调用2-10:读取9个文件
# LLM调用3:分析代码 → 15000 tokens(输入大量代码)
# LLM调用4:生成报告 → 5000 tokens
# 总计:25000 tokens,按 GPT-4 定价约 $0.75
# 但如果 Agent 犯错重试,Token 可能翻 3-5 倍
三、OpenTelemetry GenAI 语义规范:基础与不足
3.1 OTel GenAI 规范现状
OpenTelemetry 从 2024 年初开始推动 GenAI 语义规范,目标是建立统一的可观测数据语言。目前已定义的核心属性包括:
| 属性 | 说明 |
|---|---|
gen_ai.operation.name | 操作类型:chat、embeddings、execute_tool 等 |
gen_ai.span.kind | Span 类型:LLM、CHAIN、AGENT、TOOL、RETRIEVER 等 |
gen_ai.request.model | 请求模型 |
gen_ai.response.model | 实际响应模型 |
gen_ai.usage.input_tokens | 输入 Token 数 |
gen_ai.usage.output_tokens | 输出 Token 数 |
gen_ai.input.messages | 输入消息 |
gen_ai.output.messages | 输出消息 |
gen_ai.response.finish_reasons | 停止原因 |
3.2 社区标准的三个关键缺口
缺口一:缺少层级化语义。 当 Agent 执行长程任务时,一个 Trace 中可能包含上百个 Span。原生标准无法区分哪些 Span 属于同一轮 ReAct,哪些是入口调用。调用链是一坨扁平的列表,读起来像看一本没有目录的书。
缺口二:缺少业务功能域抽象。 在实际业务中,Agent 的能力通常按 Skill(技能)组织——"点奶茶"涉及闪购 Skill、支付 Skill、配送 Skill。现有规范没有 Skill 这个抽象层,无法回答"哪个 Skill 错误率最高"。
缺口三:缺少成本与安全语义。 Token 消耗只有数量没有成本,没有缓存命中字段,没有安全审计属性(如敏感文件访问、命令执行风险等级)。
四、LoongSuite GenAI 语义规范:补齐语义空白
阿里云基于 OTel GenAI 社区标准,结合内部海量实战经验,推出了 LoongSuite GenAI 可观测语义规范(已开源:github.com/alibaba/loongsuite-semantic-conventions-genai)。这不是另起炉灶,而是在社区标准上的厂商增强扩展。
4.1 扩展一:Entry Span 与 Step Span
Entry Span(gen_ai.span.kind = ENTRY)在 Agent 调用的入口处创建,用于记录最原始的用户输入和系统输出。为什么需要 Entry?因为下游的 LLM 调用会被 System Prompt、框架 Prompt 反复"污染",你需要一个干净的入口记录。
Step Span(gen_ai.operation.name = react)代表每次 ReAct 循环的层次化表达,通过 gen_ai.react.round 标识轮次。
# Step Span 的层次化结构示意
Trace: trace_id_abc123
├── ENTRY: "帮我重构这个函数" # 用户原始输入
│ ├── AGENT: code-refactor-agent
│ │ ├── STEP (round=1): # 第1轮 ReAct
│ │ │ ├── LLM: 决定先读取文件
│ │ │ └── TOOL: read_file("src/main.py")
│ │ ├── STEP (round=2): # 第2轮 ReAct
│ │ │ ├── LLM: 分析代码结构
│ │ │ └── TOOL: read_file("src/utils.py")
│ │ └── STEP (round=3): # 第3轮 ReAct
│ │ ├── LLM: 生成重构方案
│ │ └── TOOL: write_file("src/main.py", ...)
没有 Step Span,你看到的是一堆扁平的 LLM 和 TOOL 调用,根本无法区分"第 2 轮的 LLM 调用对应哪个工具调用"。有了 Step Span,Top-down 排查效率提升一个数量级。
4.2 扩展二:Skill 语义
在电商购物助手等场景中,用户指令由 Agent 路由到对应的 Skill 执行。LoongSuite 新增 gen_ai.skill.* 属性族:
# Skill 语义示例
span.set_attributes({
"gen_ai.skill.name": "flash-shopping", # 技能名称
"gen_ai.skill.version": "2.1.0", # 技能版本
"gen_ai.skill.invocation_id": "inv_12345", # 调用唯一标识
"gen_ai.skill.parent_skill": "qwen-assistant", # 父技能
})
这样你就能回答:哪个 Skill 错误率最高?新版本 Skill 上线后延迟是否劣化?LLM 调用占 Skill 总耗时的比例是多少?
4.3 扩展三:成本与安全语义
# 成本追踪扩展
span.set_attributes({
"gen_ai.usage.input_cost": 0.015, # 输入成本(美元)
"gen_ai.usage.output_cost": 0.045, # 输出成本(美元)
"gen_ai.usage.total_cost": 0.060, # 总成本
"gen_ai.usage.cache_read.input_tokens": 5000, # 缓存命中Token
"gen_ai.usage.cache_creation.input_tokens": 200, # 缓存写入Token
})
# 安全审计扩展
span.set_attributes({
"gen_ai.tool.risk_level": "high", # 工具风险等级
"gen_ai.tool.category": "command_execution", # 工具类别
"gen_ai.security.prompt_injection": True, # 是否触发注入检测
"gen_ai.security.sensitive_file_access": True, # 是否访问敏感文件
})
五、LoongSuite Pilot:Coding Agent 的端侧数据采集
5.1 为什么 Coding Agent 需要端侧采集
Claude Code、Cursor、Codex 这些 Coding Agent 运行在开发者本地设备,所有代码编辑、文件操作、终端命令都发生在本地。传统服务端探针完全无法感知。
你不可能让每个开发者手动记录 Agent 做了什么。需要一个自动化的、无感知的采集方案。
5.2 Pilot 的核心架构
┌─────────────────────────────────────────────────┐
│ LoongSuite Pilot (守护进程) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Claude │ │ Cursor │ │ Codex │ │
│ │ Code │ │ Plugin │ │ Plugin │ │
│ │ Plugin │ │ │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ ┌────▼──────────────▼──────────────▼─────┐ │
│ │ 采集基类 (BaseCollector) │ │
│ │ - 2-3个抽象方法即可接入新 Agent │ │
│ └─────────────────┬──────────────────────┘ │
│ │ │
│ ┌─────────────────▼──────────────────────┐ │
│ │ 数据处理与上传层 │ │
│ │ - 断点续采 │ │
│ │ - 采集粒度配置 │ │
│ │ - 数据去重 │ │
│ └────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
5.3 核心设计:插件化架构
Pilot 的插件化架构让接入新 Agent 只需实现 2-3 个抽象方法。不同 Agent 的数据形态差异很大——Claude Code 用 Hook 日志,Cursor 用 IDE 快照,Codex 用 SQLite 数据库——但采集基类已经把这些差异封装好了。
# 接入新 Agent 只需实现这些方法
class BaseCollector(ABC):
@abstractmethod
def detect_installation(self) -> bool:
"""检测 Agent 是否已安装"""
pass
@abstractmethod
def collect_events(self) -> List[AgentEvent]:
"""采集 Agent 事件"""
pass
@abstractmethod
def get_agent_info(self) -> AgentInfo:
"""获取 Agent 元信息"""
pass
# 接入 Cursor 只需这样
class CursorCollector(BaseCollector):
def detect_installation(self) -> bool:
return os.path.exists(os.path.expanduser("~/.cursor/extensions"))
def collect_events(self) -> List[AgentEvent]:
# Cursor 的数据在 IDE 快照中
snapshots = self._read_ide_snapshots()
return [self._parse_snapshot(s) for s in snapshots]
def get_agent_info(self) -> AgentInfo:
return AgentInfo(
name="cursor",
version=self._get_cursor_version(),
data_source="ide_snapshot"
)
5.4 采集粒度的安全平衡
不同团队对数据安全的要求不同。Pilot 支持按 Agent 类型灵活配置:
# pilot-config.yaml - 完整审计模式
collectors:
claude-code:
granularity: full # 采集消息内容、工具参数等详细信息
include_messages: true
include_tool_args: true
include_file_content: true
# pilot-config.yaml - 敏感数据模式
collectors:
claude-code:
granularity: metadata_only # 仅上报元数据
include_messages: false
include_tool_args: false
include_file_content: false
# 仍然上报:模型名、Token消耗、耗时、工具名
5.5 断点续采机制
本地设备面临网络波动、设备重启、终端关闭等不稳定场景。Pilot 内置断点续采:
class CheckpointManager:
"""断点续采管理器"""
def __init__(self, checkpoint_dir: str):
self.checkpoint_dir = checkpoint_dir
self.checkpoint_file = os.path.join(checkpoint_dir, "checkpoint.json")
def save_checkpoint(self, agent_name: str, last_event_id: str,
last_timestamp: int):
"""保存采集进度"""
checkpoints = self._load_checkpoints()
checkpoints[agent_name] = {
"last_event_id": last_event_id,
"last_timestamp": last_timestamp,
"updated_at": time.time()
}
with open(self.checkpoint_file, 'w') as f:
json.dump(checkpoints, f)
def get_checkpoint(self, agent_name: str) -> Optional[dict]:
"""获取上次采集进度"""
checkpoints = self._load_checkpoints()
return checkpoints.get(agent_name)
def should_resume(self, agent_name: str) -> bool:
"""判断是否需要续采"""
checkpoint = self.get_checkpoint(agent_name)
if not checkpoint:
return False
# 如果距上次采集不超过24小时,启动续采
return (time.time() - checkpoint["updated_at"]) < 86400
六、LoongSuite Python Agent:零代码探针插桩
6.1 从零到一:三行命令接入可观测
针对 LangChain、AgentScope、Dify 等框架开发的 Agent 应用,LoongSuite Python Agent 提供零代码自动插桩:
# 1. 安装
pip install loongsuite-distro
# 2. 自动检测并安装所需的插桩库
loongsuite-bootstrap
# 输出示例:
# Detected: langchain >= 0.1.0 → installing loongsuite-instrumentation-langchain
# Detected: dashscope >= 1.0.0 → installing loongsuite-instrumentation-dashscope
# Detected: mcp >= 0.1.0 → installing loongsuite-instrumentation-mcp
# 3. 一行命令启动
loongsuite-instrument \
--traces_exporter otlp \
--service_name my-agent-app \
python my_agent_app.py
6.2 自动识别的 Span 类型
探针自动识别并生成 11 种 GenAI Span 类型,覆盖 Agent 全生命周期:
| Span 类型 | 说明 | 关键属性 |
|---|---|---|
| ENTRY | 请求入口 | 用户原始输入、输出 |
| AGENT | Agent 执行单元 | Agent 名称、配置 |
| STEP | ReAct 推理步骤 | gen_ai.react.round |
| LLM | 大模型调用 | 模型名、Token消耗、消息 |
| TOOL | 工具调用 | 工具名、参数、结果 |
| MCP | MCP 协议调用 | 服务名、方法、参数 |
| CHAIN | 链式调用编排 | 链名、步骤序号 |
| RETRIEVER | 检索操作 | 查询、文档数 |
| EMBEDDING | 向量化操作 | 模型、维度、Token |
| RERANKER | 重排序操作 | 模型、文档数、分数 |
| WORKFLOW | 工作流编排 | 流程名、节点 |
6.3 实战:构建可观测的 LangChain Agent
下面是一个完整的可观测 LangChain Agent 示例:
# my_agent_app.py
import os
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import tool
from langchain import hub
# ===== 工具定义 =====
@tool
def search_codebase(query: str) -> str:
"""搜索代码库中的相关文件"""
# 模拟搜索逻辑
results = [
{"file": "src/main.py", "line": 42, "content": "def process_data(data):"},
{"file": "src/utils.py", "line": 15, "content": "def validate(input):"},
]
return str(results)
@tool
def read_file(filepath: str) -> str:
"""读取指定文件内容"""
try:
with open(filepath, 'r') as f:
return f.read()
except FileNotFoundError:
return f"Error: File {filepath} not found"
@tool
def run_command(cmd: str) -> str:
"""执行系统命令"""
import subprocess
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout or result.stderr
# ===== Agent 构建 =====
tools = [search_codebase, read_file, run_command]
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
base_url=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1"),
)
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=10,
handle_parsing_errors=True,
)
# ===== 运行 =====
if __name__ == "__main__":
result = agent_executor.invoke({
"input": "分析 src/main.py 中的 process_data 函数," \
"检查是否有性能问题,然后运行测试验证"
})
print(result["output"])
用 loongsuite-instrument 启动后,在 OTel Collector 中你会看到完整的 Trace 树:
Trace (trace_id: abc123)
├── ENTRY: "分析 src/main.py 中的 process_data 函数..."
│ └── AGENT: ReAct Agent
│ ├── STEP (round=1)
│ │ ├── LLM: "我需要先搜索 process_data 函数的位置"
│ │ └── TOOL: search_codebase("process_data")
│ ├── STEP (round=2)
│ │ ├── LLM: "找到了,让我读取文件内容"
│ │ └── TOOL: read_file("src/main.py")
│ ├── STEP (round=3)
│ │ ├── LLM: "我发现了一个 O(n²) 的性能问题"
│ │ └── TOOL: run_command("pytest tests/test_main.py")
│ └── STEP (round=4)
│ └── LLM: "测试通过,以下是分析结果..."
6.4 框架覆盖范围
LoongSuite Python Agent 已覆盖 17 个插桩库:
| 类别 | 框架 | 插桩库 |
|---|---|---|
| Agent 框架 | LangChain | loongsuite-instrumentation-langchain |
| Agent 框架 | LangGraph | loongsuite-instrumentation-langgraph |
| Agent 框架 | AgentScope | loongsuite-instrumentation-agentscope |
| 低代码平台 | Dify | loongsuite-instrumentation-dify |
| 模型调用 | OpenAI | loongsuite-instrumentation-openai |
| 模型调用 | DashScope | loongsuite-instrumentation-dashscope |
| 协议 | MCP | loongsuite-instrumentation-mcp |
| 向量存储 | Chroma | loongsuite-instrumentation-chroma |
| 向量存储 | Milvus | loongsuite-instrumentation-milvus |
| ... | 还有 8 个框架 | ... |
七、个人通用助理的链路追踪
7.1 扁平打点 vs 完整链路
以 OpenClaw 为例,其内置的 diagnostics-otel 扩展采用事件驱动架构,每个事件独立创建 Span,彼此之间没有父子关系和 Trace Context 传播。这本质上是"独立打点",不是"链路追踪"。
# OpenClaw 内置观测(扁平打点)
Span 1: llm_call (trace_id: aaa) # 独立
Span 2: tool_exec (trace_id: bbb) # 独立
Span 3: llm_call (trace_id: ccc) # 独立
# 无法知道这些调用属于同一个请求
# LoongSuite 插件(完整链路追踪)
Trace (trace_id: xxx) # 共享同一个 traceId
├── ENTRY: 用户输入
│ └── AGENT: openclaw-agent
│ ├── STEP (round=1)
│ │ ├── LLM: 决定执行工具
│ │ └── TOOL: 执行工具
│ └── STEP (round=2)
│ └── LLM: 生成回复
7.2 Span 语义模型
# LoongSuite OpenClaw 插件的核心 Span 构建逻辑
class OpenClawTracer:
def trace_request(self, request: UserRequest):
# 创建 Entry Span - 记录原始输入
with self.tracer.start_as_current_span(
"entry",
attributes={"gen_ai.span.kind": "ENTRY"}
) as entry_span:
entry_span.set_attribute("gen_ai.input.messages",
request.raw_input)
# 创建 Agent Span
with self.tracer.start_as_current_span(
"agent",
attributes={"gen_ai.span.kind": "AGENT"}
) as agent_span:
for round_num, step in enumerate(request.react_steps, 1):
# 创建 Step Span
with self.tracer.start_as_current_span(
f"step-{round_num}",
attributes={
"gen_ai.span.kind": "STEP",
"gen_ai.react.round": round_num,
}
) as step_span:
# LLM 调用
llm_result = self._trace_llm_call(step.llm_call)
# 工具调用
if step.tool_call:
self._trace_tool_call(step.tool_call)
7.3 三个关键差异
| 维度 | 内置观测 | LoongSuite 插件 |
|---|---|---|
| 链路完整性 | 扁平独立打点 | ENTRY→AGENT→STEP→LLM/TOOL 完整调用树 |
| 数据丰富度 | 仅模型用量指标 | 完整消息、工具参数、系统指令、工具结果 |
| 上下文传播 | 无 | OTel Context 传播,跨进程可关联 |
八、Token 消耗与成本追踪实战
8.1 成本数据模型
# 成本追踪的数据模型
class TokenUsage:
input_tokens: int # 输入Token数
output_tokens: int # 输出Token数
total_tokens: int # 总Token数
input_cost: float # 输入成本(美元)
output_cost: float # 输出成本(美元)
total_cost: float # 总成本(美元)
cache_read_tokens: int # 缓存命中Token数
cache_creation_tokens: int # 缓存写入Token数
# 按维度聚合
class CostAggregation:
by_agent: Dict[str, float] # 按Agent聚合
by_user: Dict[str, float] # 按用户聚合
by_model: Dict[str, float] # 按模型聚合
by_time: Dict[str, float] # 按时间段聚合
by_skill: Dict[str, float] # 按技能聚合
8.2 缓存策略有效性评估
def evaluate_cache_strategy(traces: List[Trace]) -> CacheReport:
"""评估缓存策略有效性"""
report = CacheReport()
for trace in traces:
for span in trace.spans:
if span.kind == "LLM":
total_input = span.get("gen_ai.usage.input_tokens", 0)
cache_hit = span.get("gen_ai.usage.cache_read.input_tokens", 0)
# 缓存命中率
hit_rate = cache_hit / total_input if total_input > 0 else 0
report.cache_hit_rates.append(hit_rate)
# 缓存节省的成本
# 命中的Token按缓存定价(通常便宜50%)
saved_cost = (cache_hit / 1000) * 0.005 # 假设缓存价格 $0.005/1K
report.total_saved += saved_cost
report.avg_hit_rate = sum(report.cache_hit_rates) / len(report.cache_hit_rates)
return report
8.3 Token 黑洞检测
def detect_token_blackholes(traces: List[Trace]) -> List[TokenBlackhole]:
"""检测Token消耗异常的会话"""
blackholes = []
# 按 Session 聚合 Token 消耗
session_costs = defaultdict(float)
for trace in traces:
session_id = trace.get_attribute("gen_ai.session.id")
for span in trace.spans:
if span.kind == "LLM":
session_costs[session_id] += span.get(
"gen_ai.usage.total_cost", 0
)
# 找出消耗异常高的 Session(超过P95的2倍)
costs = list(session_costs.values())
p95 = sorted(costs)[int(len(costs) * 0.95)] if costs else 0
for session_id, cost in session_costs.items():
if cost > p95 * 2:
blackholes.append(TokenBlackhole(
session_id=session_id,
total_cost=cost,
reason="cost_exceeds_2x_p95",
suggestion="检查是否存在无限循环或重复调用"
))
return blackholes
九、安全审计:从风险识别到威胁响应
9.1 安全审计的四个维度
┌────────────────────────────────────────────┐
│ 安全审计总览 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 高危命令 │ │ 网页外发 │ │ 敏感文件 │ │
│ │ 执行 │ │ 请求 │ │ 访问 │ │
│ │ 127次 │ │ 43次 │ │ 19次 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ ⚠️ 注入后高危操作:3个会话 │ │
│ │ 这是威胁置信度最高的信号! │ │
│ └──────────────────────────────────────┘ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 命令行 │ │ 提示词 │ │ 环比变化 │ │
│ │ 外发 │ │ 注入 │ │ │ │
│ │ 8次 │ │ 5次 │ │ ↑12% │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────┘
9.2 提示词注入检测
提示词注入后的高危操作是整个审计体系中威胁置信度最高的信号。普通高危操作可能是任务本身的合理需求,但注入后触发的高危行为意味着恶意指令已经驱动 Agent 付诸执行。
class PromptInjectionDetector:
"""提示词注入检测器"""
# 已知的注入模式
INJECTION_PATTERNS = [
r"ignore\s+(previous|all|above)\s+instructions",
r"you\s+are\s+now\s+",
r"system\s*:\s*",
r"forget\s+(everything|all)\s*",
r"new\s+instructions?\s*:",
]
def __init__(self):
self.patterns = [re.compile(p, re.IGNORECASE)
for p in self.INJECTION_PATTERNS]
def check_injection(self, messages: List[dict]) -> InjectionResult:
"""检查消息中是否存在注入尝试"""
for msg in messages:
content = msg.get("content", "")
for pattern in self.patterns:
if pattern.search(content):
return InjectionResult(
is_injection=True,
pattern=pattern.pattern,
matched_text=content[:200],
risk_level="high"
)
return InjectionResult(is_injection=False)
def check_post_injection_actions(self, trace: Trace) -> List[dict]:
"""检查注入后是否有高危操作"""
results = []
injection_found = False
for span in trace.spans:
# 检测注入
if span.kind == "ENTRY" or span.kind == "LLM":
messages = span.get("gen_ai.input.messages", "")
injection = self.check_injection(
[{"content": messages}] if isinstance(messages, str)
else messages
)
if injection.is_injection:
injection_found = True
injection_span = span
# 注入后的高危操作
if injection_found and span.kind == "TOOL":
tool_name = span.get("gen_ai.tool.name", "")
if tool_name in ["bash", "exec", "write_file", "delete_file"]:
results.append({
"injection_at": injection_span.span_id,
"dangerous_action": {
"tool": tool_name,
"args": span.get("gen_ai.tool.call.arguments", ""),
"timestamp": span.start_time,
},
"threat_level": "critical",
"recommendation": "立即人工复核此会话"
})
return results
9.3 高风险会话追溯
class RiskScorer:
"""会话风险评分器"""
WEIGHTS = {
"injection_hits": 10.0, # 注入命中数,权重最高
"dangerous_commands": 5.0, # 高危命令数
"sensitive_file_access": 3.0, # 敏感文件访问数
"data_exfiltration": 8.0, # 数据外发数
"post_injection_actions": 20.0, # 注入后高危操作,极端权重
}
def score_session(self, session_events: List[dict]) -> RiskScore:
"""计算会话的综合风险评分"""
counts = defaultdict(int)
for event in session_events:
event_type = event.get("type")
if event_type in self.WEIGHTS:
counts[event_type] += 1
total_score = sum(
counts[t] * w for t, w in self.WEIGHTS.items()
)
return RiskScore(
session_id=session_events[0]["session_id"],
total_score=total_score,
breakdown=dict(counts),
level=self._classify_level(total_score),
needs_human_review=total_score >= 15.0
)
def _classify_level(self, score: float) -> str:
if score >= 20:
return "CRITICAL"
elif score >= 10:
return "HIGH"
elif score >= 5:
return "MEDIUM"
return "LOW"
十、GenAI Utils:语义规范的工程化落地
10.1 架构设计
语义规范如果只停留在文档层面,价值为零。LoongSuite 通过 GenAI Utils 实现了规范的工程化落地:
┌─────────────────────────────────────────────┐
│ 各框架插桩库 │
│ LangChain DashScope MCP OpenAI ... │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ Invocation 数据对象 │ │
│ │ LLMInvocation │ │
│ │ ExecuteToolInvocation │ │
│ │ EmbeddingInvocation │ │
│ │ ... │ │
│ └──────────────┬──────────────────────┘ │
│ │ │
│ ┌──────────────▼──────────────────────┐ │
│ │ GenAI Utils │ │
│ │ ExtendedTelemetryHandler │ │
│ │ - Span 创建与属性挂载 │ │
│ │ - Metrics 记录 │ │
│ │ - Event 发送 │ │
│ │ - Context 管理 │ │
│ └──────────────┬──────────────────────┘ │
│ │ │
│ ┌──────────────▼──────────────────────┐ │
│ │ LoongSuite SemConv │ │
│ │ (语义规范定义) │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
关键设计决策:插桩层只做数据提取,所有遥测输出统一由 GenAI Utils 收口。当语义规范新增字段或调整结构时,只需修改 GenAI Utils,所有下游插桩库自动生效。
10.2 Invocation 类型覆盖
# GenAI Utils 支持的 Invocation 类型
class InvocationType(Enum):
LLM = "llm" # 大模型调用
INVOKE_AGENT = "invoke_agent" # Agent 调用
CREATE_AGENT = "create_agent" # Agent 创建
EXECUTE_TOOL = "execute_tool" # 工具执行
EMBEDDING = "embedding" # 向量化
RETRIEVE = "retrieve" # 检索
RERANK = "rerank" # 重排序
MEMORY = "memory" # 记忆操作
10.3 多语言支持
GenAI Utils 已有 Python、Node.js、Go 三个版本(均已开源),Java 版本即将发布。
// Node.js 版本的 GenAI Utils 示例
const { ExtendedTelemetryHandler, LLMInvocation } = require('@loongsuite/genai-utils');
const handler = new ExtendedTelemetryHandler(tracer, meter);
// 创建 LLM Invocation
const invocation = new LLMInvocation({
modelName: 'gpt-4o',
operationName: 'chat',
inputMessages: [{ role: 'user', content: 'Hello' }],
});
// 统一由 handler 处理遥测输出
handler.handleInvocation(invocation, (span) => {
// span 已自动创建,属性已挂载
// 业务逻辑执行...
invocation.setOutputMessages([{ role: 'assistant', content: 'Hi!' }]);
invocation.setUsage({ inputTokens: 10, outputTokens: 5 });
});
十一、完整实战:从零搭建可观测 Agent 系统
11.1 环境准备
# 安装 LoongSuite Python Agent
pip install loongsuite-distro
loongsuite-bootstrap
# 安装 OTel Collector(接收和转发遥测数据)
# 使用 Docker 快速启动
docker run -d --name otel-collector \
-p 4317:4317 \
-p 4318:4318 \
-v ./otel-config.yaml:/etc/otelcol/config.yaml \
otel/opentelemetry-collector-contrib:latest
11.2 OTel Collector 配置
# otel-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 5s
send_batch_size: 1024
exporters:
# 导出到 Jaeger(本地可视化)
otlp/jaeger:
endpoint: "jaeger:4317"
tls:
insecure: true
# 导出到阿里云 SLS(生产级存储)
alibabacloud_sls:
endpoint: "cn-hangzhou.log.aliyuncs.com"
project: "my-agent-observability"
logstore: "agent-traces"
access_key_id: "${SLS_AK}"
access_key_secret: "${SLS_SK}"
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp/jaeger, alibabacloud_sls]
11.3 可观测 Agent 完整示例
# observable_agent.py
import os
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import tool
from langchain import hub
# ===== 1. 工具定义(带风险标记) =====
@tool
def search_database(query: str) -> str:
"""搜索数据库(只读操作,低风险)"""
# 实际项目中这里连接数据库
return f"Search results for: {query}"
@tool
def execute_command(cmd: str) -> str:
"""执行系统命令(高风险,需审计)"""
import subprocess
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout or result.stderr
@tool
def read_file(path: str) -> str:
"""读取文件(风险取决于文件路径)"""
with open(path, 'r') as f:
return f.read()
@tool
def write_file(path: str, content: str) -> str:
"""写入文件(中高风险,需审计)"""
with open(path, 'w') as f:
f.write(content)
return f"Written to {path}"
# ===== 2. Agent 构建 =====
tools = [search_database, execute_command, read_file, write_file]
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
)
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=10,
)
# ===== 3. 启动(通过 loongsuite-instrument 自动注入探针) =====
if __name__ == "__main__":
# 使用 loongsuite-instrument 启动:
# loongsuite-instrument \
# --traces_exporter otlp \
# --service_name observable-agent \
# --endpoint http://localhost:4317 \
# python observable_agent.py
result = agent_executor.invoke({
"input": "搜索数据库中最近的错误日志,分析原因,然后写一份分析报告"
})
print(result["output"])
11.4 在 Jaeger 中查看 Trace
启动后,在 Jaeger UI 中你会看到:
- 服务列表:
observable-agent - 搜索 Trace:按
gen_ai.session.id搜索特定会话 - Trace 详情:完整的调用树,从 ENTRY 到每个 STEP、LLM、TOOL
- 属性查看:点击任何 Span 查看完整属性(消息、Token、成本等)
11.5 自定义告警规则
# Prometheus 告警规则
groups:
- name: agent_observability
rules:
# 单次请求 Token 消耗超过 100K
- alert: HighTokenUsage
expr: >
sum by (service_name) (
attribute_gen_ai_usage_total_tokens
) > 100000
for: 1m
labels:
severity: warning
annotations:
summary: "Agent 单次请求 Token 消耗异常"
# 检测到提示词注入
- alert: PromptInjectionDetected
expr: >
count by (service_name) (
attribute_gen_ai_security_prompt_injection == true
) > 0
for: 0m
labels:
severity: critical
annotations:
summary: "检测到提示词注入攻击"
# Agent 执行耗时超过 60 秒
- alert: SlowAgentExecution
expr: >
histogram_quantile(0.95,
sum by (le, service_name) (
rate(duration_seconds_bucket[5m])
)
) > 60
for: 5m
labels:
severity: warning
annotations:
summary: "Agent 执行耗时 P95 超过 60 秒"
十二、性能考量与优化
12.1 采集性能开销
可观测不是免费的。数据采集会引入额外开销,关键是在"看得清"和"跑得快"之间找到平衡。
# Pilot 的性能优化:异步批量上报
class AsyncBatchReporter:
"""异步批量上报器"""
def __init__(self, endpoint: str, batch_size: int = 100,
flush_interval: float = 5.0):
self.endpoint = endpoint
self.batch_size = batch_size
self.flush_interval = flush_interval
self.queue = asyncio.Queue()
self._running = False
async def start(self):
self._running = True
asyncio.create_task(self._flush_loop())
async def report(self, event: AgentEvent):
"""非阻塞上报"""
await self.queue.put(event)
async def _flush_loop(self):
"""定时批量刷新"""
batch = []
while self._running:
try:
event = await asyncio.wait_for(
self.queue.get(), timeout=self.flush_interval
)
batch.append(event)
if len(batch) >= self.batch_size:
await self._send_batch(batch)
batch = []
except asyncio.TimeoutError:
if batch:
await self._send_batch(batch)
batch = []
async def _send_batch(self, batch: List[AgentEvent]):
"""批量发送,失败重试1次"""
try:
async with aiohttp.ClientSession() as session:
await session.post(
self.endpoint,
json=[e.to_dict() for e in batch],
timeout=aiohttp.ClientTimeout(total=10)
)
except Exception as e:
# 重试1次
try:
await asyncio.sleep(1)
async with aiohttp.ClientSession() as session:
await session.post(
self.endpoint,
json=[e.to_dict() for e in batch],
timeout=aiohttp.ClientTimeout(total=10)
)
except Exception:
pass # 丢弃,避免内存泄漏
12.2 存储成本优化
# 采样策略:不是所有 Trace 都需要完整存储
class TraceSampler:
"""智能采样器"""
def __init__(self,
error_rate: float = 1.0, # 错误请求 100% 保留
slow_rate: float = 1.0, # 慢请求 100% 保留
normal_rate: float = 0.1, # 正常请求 10% 保留
injection_rate: float = 1.0): # 注入相关 100% 保留
self.error_rate = error_rate
self.slow_rate = slow_rate
self.normal_rate = normal_rate
self.injection_rate = injection_rate
def should_keep(self, trace: Trace) -> bool:
"""决定是否保留此 Trace"""
# 错误请求,100%保留
if trace.status_code != "OK":
return True
# 注入相关,100%保留
if any(span.get("gen_ai.security.prompt_injection")
for span in trace.spans):
return True
# 慢请求(超过P95阈值),100%保留
if trace.duration_ms > 30000: # 30秒
return True
# 正常请求,按概率采样
import random
return random.random() < self.normal_rate
十三、与业界方案的对比
| 维度 | 传统 APM | Langfuse/Phoenix | LoongSuite |
|---|---|---|---|
| 覆盖范围 | 服务端应用 | LLM 应用 | 三类 Agent 全覆盖 |
| Coding Agent 支持 | 无 | 有限 | 原生支持(Pilot) |
| 语义标准 | OTel 通用 | 自定义协议 | OTel GenAI + 扩展 |
| 链路完整性 | 扁平 | 部分层级 | 完整调用树 |
| 安全审计 | 基础 | 无 | 提示词注入检测+高危操作审计 |
| 成本追踪 | 无 | Token 级 | Token + 成本 + 缓存 |
| 零代码接入 | Java Agent | SDK | Python/Node.js/Go 自动插桩 |
| 开源状态 | 部分 | 是 | 语义规范 + Python Agent 开源 |
十四、总结与展望
AI Agent 的可观测性不是一个锦上添花的功能,而是生产级部署的必要条件。当 Agent 有了文件读写、命令执行、API 调用的权限,你需要的不仅仅是"它跑没跑"的浅层感知,而是"它干了什么、为什么这么干、花了多少钱、有没有安全风险"的深度透视。
LoongSuite 的核心价值在于三个层面:
全覆盖:从 Coding Agent(Pilot)到个人助理(专用插件)到框架应用(Python Agent),三类 Agent 形态一个不落。
语义标准化:基于 OTel GenAI 的扩展语义规范,填补了社区标准在层级化、业务域、成本安全方面的空白,且已开源贡献。
工程化闭环:从数据采集到语义建模到可视化分析到安全审计,GenAI Utils 确保规范不是一纸文档,而是可落地的工程能力。
开源地址:
- 语义规范:github.com/alibaba/loongsuite-semantic-conventions-genai
- Python Agent:github.com/alibaba/loongsuite-python-agent
未来的方向很明确:随着 Agent 从单体走向多智能体协同,可观测性需要从单进程 Trace 演进到跨 Agent 分布式 Trace;随着 Agent 从辅助工具走向自主决策,安全审计需要从事后追溯演进到实时拦截。这不是可选的演进方向,这是必经之路。