编程 万字深度解析 Langfuse:当 LLM 应用遇见「全链路可观测性」——从 Tracing 架构到生产级 LLM Ops 的完整技术指南(2026)

2026-07-02 05:42:52 +0800 CST views 9

万字深度解析 Langfuse:当 LLM 应用遇见「全链路可观测性」——从 Tracing 架构到生产级 LLM Ops 的完整技术指南(2026)

文章摘要:2026 年,LLM 应用已从「能用」迈向「好用」,但「看不见、调不清、评不准」的可观测性危机却愈演愈烈。Langfuse 作为开源 LLM 工程平台(YC W23),凭借 Tracing、Prompt Management、Evaluations、Datasets 四大核心能力,以及基于 ClickHouse 的高性能存储架构,正在成为 LLM Ops 领域的事实标准。本文从源码架构、核心算法、SDK 集成、生产部署到性能基准测试,为你完整拆解 Langfuse 的技术本质,并提供 15+ 可直接运行的生产级代码示例。


一、引言:LLM 应用的可观测性危机

1.1 从「调接口」到「工程化」的鸿沟

2023-2024 年,大多数团队做 LLM 应用的方式是:

# 2023 年的典型代码——没有任何可观测性
import openai

response = openai.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "告诉我怎么优化代码"}]
)
print(response.choices[0].message.content)

这段代码在生产环境中跑起来后,你会发现几个致命问题:

  1. 看不见:用户反馈「回答质量下降」,你根本不知道是哪次调用出了问题
  2. 调不清:Prompt 改了一版,不知道是变好了还是变差了
  3. 算不准:每月 API 账单 5000 美金,但不知道钱花在哪次调用上
  4. 搜不到:想复现某个 Bad Case,发现日志里根本没有记录上下文

1.2 Langfuse 的破局之道

Langfuse 的核心设计哲学是:让 LLM 应用从「黑盒」变成「白盒」

它的四大核心功能恰好对应四大痛点:

痛点Langfuse 解决方案核心价值
看不见Tracing(全链路追踪)每次调用的完整上下文、耗时、成本
调不清Prompt Management(提示词管理)版本控制、A/B 测试、协作迭代
算不准Scores + Cost Tracking(评分与成本追踪)Token 消耗、LLM-as-Judge 自动评估
搜不到Datasets + Evaluations(数据集与评估)结构化测试集、回归测试、性能基准

二、Langfuse 架构深度解析

2.1 技术栈概览

Langfuse 是一个 TypeScript 全栈应用,技术选型非常克制且实用:

前端:Next.js 14+ (App Router) + Tailwind CSS
后端:Next.js API Routes + tRPC
数据库:
  - PostgreSQL (主数据库,Prisma ORM)
  - ClickHouse (分析数据库,处理 Trace 数据)
  - Redis (缓存 + 队列)
消息队列:Redis Bull Queue
SDK:Python + TypeScript/JavaScript
部署:Docker Compose / Kubernetes (Helm)

为什么选择 ClickHouse?

这是 Langfuse 架构中最值得深思的决策之一。

传统的 APM 工具(如 Jaeger、Zipkin)使用 Elasticsearch 或 Cassandra 存储 Trace 数据。但 LLM 应用的 Tracing 有其特殊性:

  1. 写入密集:每次 LLM 调用产生多条 Trace(request → response → tool calls)
  2. 查询复杂:需要按用户、Session、时间范围、模型类型做多维聚合
  3. 数据量大:生产环境每天可能产生百万级 Trace

ClickHouse 的列式存储和向量化执行引擎,恰好完美适配这种场景。

2.2 Tracing 数据模型

Langfuse 的 Tracing 系统基于三个核心概念:TraceObservationSession

2.2.1 Trace(追踪)

一个 Trace 代表一次完整的用户请求链路。

from langfuse import Langfuse

langfuse = Langfuse(
    public_key="pk-lf-...",
    secret_key="sk-lf-...",
    host="https://cloud.langfuse.com"  # 或私有化部署地址
)

# 创建一个 Trace
trace = langfuse.trace(
    name="chat-completion",
    user_id="user_12345",
    metadata={
        "environment": "production",
        "app_version": "2.1.0",
        "feature_flag": "new_prompt_v2"
    },
    tags=["chat", "production"]
)

2.2.2 Observation(观测点)

一个 Observation 代表 Trace 中的一个具体步骤,分为三种类型:

  1. LLM Observation:记录 LLM API 调用
  2. Span Observation:记录非 LLM 的逻辑块(如检索、嵌入、工具调用)
  3. Event Observation:记录离散事件(如用户反馈)
# 创建一个 LLM Observation
observation = trace.generation(
    name="openai-chat",
    model="gpt-4o",
    model_parameters={
        "temperature": 0.7,
        "max_tokens": 1024
    },
    input=[
        {"role": "system", "content": "你是一个有用的助手"},
        {"role": "user", "content": "如何优化 Python 代码性能?"}
    ],
    output="优化 Python 代码性能可以从以下几个方面入手...",
    usage={
        "input": 45,   # Prompt Token 数
        "output": 128,  # Completion Token 数
        "total": 173    # 总 Token 数
    },
    cost=0.0054,  # 本次调用的成本(美元)
    latency=2.34   # 响应延迟(秒)
)

2.2.3 Session(会话)

Session 用于将多个 Trace 关联到同一个用户会话。

# 创建带 Session 的 Trace
trace = langfuse.trace(
    name="multi-turn-chat",
    session_id="session_abc123",  # 关键:关联到会话
    user_id="user_12345"
)

2.3 ClickHouse 表结构设计(源码解析)

Langfuse 在 ClickHouse 中设计了两张核心表:tracesobservations

以下是从源码中提取的表结构(简化版):

-- traces 表:存储 Trace 元数据
CREATE TABLE traces
(
    id String,
    timestamp DateTime64(3),
    name String,
    user_id String,
    session_id String,
    metadata Map(String, String),
    tags Array(String),
    project_id String,
    created_at DateTime64(3),
    updated_at DateTime64(3)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (project_id, timestamp, id);

-- observations 表:存储 Observation 数据
CREATE TABLE observations
(
    id String,
    trace_id String,
    timestamp DateTime64(3),
    name String,
    type Enum8('LLM' = 1, 'SPAN' = 2, 'EVENT' = 3),
    model String,
    input String,        -- JSON 字符串
    output String,       -- JSON 字符串
    usage_json String,   -- Token 使用量 JSON
    cost Float64,
    latency Float64,
    level Enum8('DEFAULT' = 1, 'WARNING' = 2, 'ERROR' = 3),
    status_message String,
    project_id String,
    created_at DateTime64(3)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (project_id, trace_id, timestamp);

性能优化关键点

  1. 分区策略:按月份分区,便于数据过期删除
  2. 排序键(project_id, timestamp, id),优化多租户场景下的查询性能
  3. 字段类型:使用 MapArray 类型存储元数据,避免 JOIN

三、核心功能技术深度解析

3.1 Tracing:从手动埋点到自动注入

3.1.1 手动埋点(Manual Tracing)

最灵活的方式,适合精细化控制:

from langfuse import Langfuse
from openai import OpenAI
import time

langfuse = Langfuse()
client = OpenAI()

def rag_query(user_question: str, user_id: str):
    trace = langfuse.trace(
        name="rag-pipeline",
        user_id=user_id,
        metadata={"question_length": len(user_question)}
    )
    
    # 第一步:向量检索(Span Observation)
    start_time = time.time()
    retrieval_span = trace.span(
        name="vector-retrieval",
        input={"query": user_question}
    )
    
    retrieved_docs = vector_db.search(user_question, top_k=5)
    
    retrieval_span.end(
        output={"documents": retrieved_docs},
        usage={"retrieved_count": len(retrieved_docs)}
    )
    
    # 第二步:LLM 生成(LLM Observation)
    gen = trace.generation(
        name="answer-generation",
        model="gpt-4o",
        input=[
            {"role": "system", "content": "基于以下文档回答问题:\n" + str(retrieved_docs)},
            {"role": "user", "content": user_question}
        ]
    )
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "基于以下文档回答问题:\n" + str(retrieved_docs)},
            {"role": "user", "content": user_question}
        ]
    )
    
    answer = response.choices[0].message.content
    
    gen.end(
        output=answer,
        usage={
            "input": response.usage.prompt_tokens,
            "output": response.usage.completion_tokens,
            "total": response.usage.total_tokens
        }
    )
    
    trace.update(output={"answer": answer})
    
    return answer

3.1.2 装饰器自动埋点(Decorator Tracing)

Langfuse Python SDK 提供了 @observe 装饰器,可以自动追踪函数调用:

from langfuse.decorators import observe, langfuse_context

@observe()
def retrieve_documents(query: str) -> list:
    """这个函数会自动被 Langfuse 追踪"""
    return vector_db.search(query, top_k=5)

@observe()
def generate_answer(query: str, docs: list) -> str:
    """LLM 调用也会被自动追踪"""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[...]
    )
    return response.choices[0].message.content

def rag_pipeline(query: str, user_id: str):
    # 设置当前 Trace 的元数据
    langfuse_context.update_current_trace(
        user_id=user_id,
        tags=["rag", "production"]
    )
    
    docs = retrieve_documents(query)
    answer = generate_answer(query, docs)
    
    return answer

3.1.3 OpenAI SDK 自动注入(Zero-Code Integration)

如果你不想改代码,Langfuse 提供了 OpenAI SDK 的 Drop-in Replacement:

# 只需要改一行 import!
from langfuse.openai import openai  # 替换:from openai import OpenAI

# 之后的代码完全不用改
client = openai.OpenAI()  # 会自动将调用数据发送到 Langfuse

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}]
)
# 上面这行代码会自动在 Langfuse 中创建一个 Trace + LLM Observation

实现原理(简化版):

# langfuse/openai/__init__.py(伪代码)
class LangfuseOpenAI:
    def __init__(self):
        self._client = original_openai.OpenAI()
    
    def chat(self):
        return ChatProxy(self._client.chat, langfuse_client)
    
class ChatProxy:
    def __init__(self, original, langfuse):
        self._original = original
        self._langfuse = langfuse
    
    def completions(self):
        return CompletionsProxy(self._original.completions, self._langfuse)

class CompletionsProxy:
    def create(self, **kwargs):
        # 1. 创建 Langfuse Trace
        trace = self._langfuse.trace(name="openai-chat")
        
        # 2. 调用原始 OpenAI API
        start_time = time.time()
        response = self._original.create(**kwargs)
        end_time = time.time()
        
        # 3. 记录到 Langfuse
        trace.generation(
            model=kwargs.get("model"),
            input=kwargs.get("messages"),
            output=response.choices[0].message.content,
            usage={
                "input": response.usage.prompt_tokens,
                "output": response.usage.completion_tokens
            },
            latency=end_time - start_time
        )
        
        return response

3.2 Score:多维评估体系

Langfuse 的 Score 系统支持三种数据类型:NUMERICCATEGORICALBOOLEAN

3.2.1 用户反馈(User Feedback)

最简单的评估方式——让用户打分:

from langfuse import Langfuse

langfuse = Langfuse()

# 用户点击「点赞」按钮时调用
langfuse.score(
    trace_id="trace_abc123",
    name="user_feedback",
    value=1,  # 1 = 点赞,-1 = 点踩
    comment="回答很有帮助!"
)

# 用户给出 1-5 星评分
langfuse.score(
    trace_id="trace_abc123",
    name="user_rating",
    value=5,  # 1-5 星
    data_type="NUMERIC"
)

3.2.2 LLM-as-Judge(AI 自动评估)

这是 2026 年最主流的评估方式——用另一个 LLM 来评估主 LLM 的输出质量。

from langfuse import Langfuse
from openai import OpenAI

langfuse = Langfuse()
client = OpenAI()

def llm_as_judge(trace_id: str, question: str, answer: str):
    """用 GPT-4o 作为 Judge 评估答案质量"""
    
    judge_prompt = f"""
    你是一个答案质量评估专家。请对以下回答进行评分。
    
    问题:{question}
    回答:{answer}
    
    请从以下三个维度打分(0-10 分):
    1. 相关性(Relevance):回答是否切题?
    2. 准确性(Accuracy):回答是否事实正确?
    3. 完整性(Completeness):回答是否充分全面?
    
    输出格式(JSON):
    {{"relevance": <分数>, "accuracy": <分数>, "completeness": <分数>, "reason": "<简要理由>"}}
    """
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": judge_prompt}],
        response_format={"type": "json_object"}
    )
    
    scores = json.loads(response.choices[0].message.content)
    
    # 将评估结果记录到 Langfuse
    for metric, score in scores.items():
        if metric != "reason":
            langfuse.score(
                trace_id=trace_id,
                name=f"judge_{metric}",
                value=score,
                data_type="NUMERIC"
            )
    
    # 记录评估理由
    langfuse.observation(
        trace_id=trace_id,
        name="judge_reasoning",
        type="EVENT",
        input={"judge_prompt": judge_prompt},
        output=scores["reason"]
    )

3.2.3 代码评估器(Code Evaluator)

对于有明确正确答案的任务(如代码生成、数学计算),可以用代码自动评估:

def code_evaluation(trace_id: str, generated_code: str, test_cases: list):
    """运行测试用例,评估生成的代码质量"""
    
    results = []
    for test_case in test_cases:
        try:
            # 执行生成的代码
            exec_globals = {}
            exec(generated_code, exec_globals)
            
            # 运行测试用例
            result = exec_globals["solution"](**test_case["input"])
            
            # 比对预期输出
            passed = (result == test_case["expected_output"])
            results.append({"test": test_case["name"], "passed": passed})
        except Exception as e:
            results.append({"test": test_case["name"], "passed": False, "error": str(e)})
    
    # 计算通过率
    pass_rate = sum(1 for r in results if r["passed"]) / len(results)
    
    # 记录到 Langfuse
    langfuse.score(
        trace_id=trace_id,
        name="code_pass_rate",
        value=pass_rate * 100,  # 百分比
        data_type="NUMERIC"
    )
    
    return results

3.3 Prompt Management:版本控制与协作

Langfuse 的 Prompt Management 功能,本质上是一个「面向 LLM 应用的 Git」。

3.3.1 创建和管理 Prompt

from langfuse import Langfuse

langfuse = Langfuse()

# 创建一个 Prompt(支持变量插值)
prompt = langfuse.create_prompt(
    name="rag-system-prompt",
    prompt="你是一个技术文档助手。请基于以下文档回答问题:\n\n{context}\n\n用户问题:{question}\n\n要求:\n1. 只基于文档内容回答\n2. 如果文档中没有相关信息,明确告知用户\n3. 回答要简洁专业",
    config={
        "model": "gpt-4o",
        "temperature": 0.2,
        "max_tokens": 1024
    },
    tags=["production", "rag"]
)

print(f"Prompt ID: {prompt.id}")
print(f"当前版本: {prompt.version}")  # 输出:1

3.3.2 获取 Prompt(带缓存)

# 在生产代码中获取 Prompt
prompt = langfuse.get_prompt(name="rag-system-prompt")

# prompt.prompt 是带有变量占位符的字符串
# prompt.config 是配置字典
# prompt.version 是当前版本号

# 实际使用
system_prompt = prompt.prompt.format(
    context=retrieved_docs,
    question=user_question
)

response = client.chat.completions.create(
    model=prompt.config["model"],
    temperature=prompt.config["temperature"],
    max_tokens=prompt.config["max_tokens"],
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_question}
    ]
)

缓存机制:Langfuse SDK 会在内存中缓存 Prompt,默认 TTL 为 60 秒。这意味着:

  • 你可以频繁调用 get_prompt(),不会增加延迟
  • Prompt 更新后,最多 60 秒就会生效(可以调整 TTL)

3.3.3 Prompt 版本控制与回滚

# 更新 Prompt(自动创建新版本)
prompt = langfuse.update_prompt(
    name="rag-system-prompt",
    prompt="你是一个技术文档助手 v2。请基于以下文档回答问题:\n\n{context}\n\n用户问题:{question}\n\n要求:\n1. 只基于文档内容回答\n2. 如果文档中没有相关信息,明确告知用户\n3. 回答要简洁专业\n4. 如果可能,提供相关文档的引用",
    config={
        "model": "gpt-4o",
        "temperature": 0.1,  # 降低温度,提高一致性
        "max_tokens": 2048   # 增加输出长度
    }
)
print(f"新版本号: {prompt.version}")  # 输出:2

# 如果需要回滚到旧版本
old_prompt = langfuse.get_prompt(name="rag-system-prompt", version=1)

3.4 Datasets:结构化评估与回归测试

Dataset 是 Langfuse 中最强大的功能之一,它让你可以做「数据驱动的 LLM 应用开发」。

3.4.1 创建 Dataset

from langfuse import Langfuse

langfuse = Langfuse()

# 创建一个 Dataset
dataset = langfuse.create_dataset(
    name="rag-eval-dataset",
    description="用于评估 RAG 系统质量的测试集",
    metadata={
        "source": "human_labeled",
        "size": 100,
        "domains": ["python", "javascript", "devops"]
    }
)

# 向 Dataset 中添加测试样例
dataset_items = [
    {
        "input": "如何在 Python 中读取大文件?",
        "expected_output": "使用迭代器或分块读取,避免一次性加载到内存",
        "metadata": {"domain": "python", "difficulty": "easy"}
    },
    {
        "input": "Docker 容器和虚拟机的区别是什么?",
        "expected_output": "容器共享宿主机内核,更轻量;虚拟机有独立操作系统,更隔离",
        "metadata": {"domain": "devops", "difficulty": "medium"}
    },
    # ... 更多测试样例
]

for item in dataset_items:
    langfuse.create_dataset_item(
        dataset_name="rag-eval-dataset",
        input=item["input"],
        expected_output=item["expected_output"],
        metadata=item["metadata"]
    )

3.4.2 运行 Dataset 评估

from langfuse import Langfuse
from openai import OpenAI
import json

langfuse = Langfuse()
client = OpenAI()

# 获取 Dataset 中的所有测试样例
dataset = langfuse.get_dataset(name="rag-eval-dataset")
items = dataset.items

# 运行评估
for item in items:
    # 调用你的 RAG 系统
    actual_output = rag_pipeline(item.input, user_id="eval_user")
    
    # 记录运行结果到 Langfuse
    run = langfuse.create_dataset_run(
        dataset_name="rag-eval-dataset",
        name=f"run_{item.id}_gpt4o",
        description="使用 GPT-4o 模型运行测试"
    )
    
    run.add_llm_generation(
        dataset_item_id=item.id,
        input=item.input,
        output=actual_output,
        model="gpt-4o",
        # ... 其他字段
    )
    
    # 使用 LLM-as-Judge 评估质量
    judge_score = llm_as_judge(
        trace_id=run.trace_id,
        question=item.input,
        answer=actual_output
    )
    
    print(f"测试样例:「{item.input[:50]}...」")
    print(f"Judge 评分:{judge_score}")
    print("=" * 50)

四、生产级集成实战

4.1 LangChain 集成

Langfuse 对 LangChain 提供了一等公民支持:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langfuse.langchain import CallbackHandler

# 创建 Langfuse Callback Handler
langfuse_handler = CallbackHandler(
    public_key="pk-lf-...",
    secret_key="sk-lf-...",
    host="https://cloud.langfuse.com"
)

# 构建 LangChain 应用
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个有用的助手。"),
    ("user", "{input}")
])
chain = prompt | llm | StrOutputParser()

# 运行时传入 Callback Handler
response = chain.invoke(
    {"input": "如何优化 Python 代码?"},
    config={"callbacks": [langfuse_handler]}
)
# 上面的调用会自动在 Langfuse 中创建 Trace

4.2 LlamaIndex 集成

from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
from langfuse.llama_index import LangfuseCallbackHandler

# 创建 Langfuse Callback Handler
langfuse_handler = LangfuseCallbackHandler()

# 加载文档
documents = SimpleDirectoryReader("data").load_data()

# 构建索引
index = VectorStoreIndex.from_documents(documents)

# 创建查询引擎(自动集成 Langfuse)
query_engine = index.as_query_engine(
    llm=OpenAI(model="gpt-4o"),
    callbacks=[langfuse_handler]  # 关键:传入 Callback
)

# 查询(会自动追踪到 Langfuse)
response = query_engine.query("Langfuse 是什么?")
print(response)

4.3 LiteLLM 集成(多模型统一管理)

from litellm import completion
from langfuse import Langfuse

langfuse = Langfuse()

# LiteLLM 支持 100+ 个 LLM API,统一调用格式
# Langfuse 可以自动追踪所有模型的调用

models_to_test = [
    "openai/gpt-4o",
    "anthropic/claude-3-5-sonnet-20241022",
    "google/gemini-pro-1.5",
    "meta-llama/llama-3.3-70b-instruct"
]

question = "解释一下 Transformer 架构"

for model in models_to_test:
    trace = langfuse.trace(
        name="multi-model-comparison",
        tags=["comparison", model.split("/")[0]]
    )
    
    response = completion(
        model=model,
        messages=[{"role": "user", "content": question}],
        metadata={"trace_id": trace.id}  # 关联到 Langfuse Trace
    )
    
    trace.generation(
        model=model,
        input=question,
        output=response.choices[0].message.content,
        usage={
            "input": response.usage.prompt_tokens,
            "output": response.usage.completion_tokens
        }
    )
    
    print(f"模型:{model}")
    print(f"回答:{response.choices[0].message.content[:100]}...")
    print("=" * 50)

五、生产部署实战

5.1 Docker Compose 快速部署

Langfuse 官方提供了开箱即用的 Docker Compose 配置:

# 1. 克隆仓库
git clone --depth=1 https://github.com/langfuse/langfuse.git
cd langfuse

# 2. 复制环境变量配置
cp .env.dev.example .env

# 3. 修改 .env 文件(至少修改以下两项)
# NEXTAUTH_SECRET=你的随机字符串(可以用 openssl rand -base64 32 生成)
# LANGFUSE_NEXTAUTH_SECRET=同上

# 4. 启动服务
docker compose up -d

# 5. 访问 http://localhost:3000
# 默认管理员账号:admin@langfuse.com / password

docker-compose.yml 核心服务解析

version: '3.9'

services:
  # Next.js 应用(前端 + API)
  web:
    image: langfuse/langfuse:latest
    ports:
      - "3000:3000"
    environment:
      - DATABASE_URL=postgresql://postgres:postgres@db:5432/postgres
      - CLICKHOUSE_URL=http://clickhouse:8123
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - clickhouse
      - redis

  # PostgreSQL 主数据库
  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: postgres
    volumes:
      - postgres_data:/var/lib/postgresql/data

  # ClickHouse 分析数据库
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123"  # HTTP 接口
      - "9000:9000"  # TCP 接口
    volumes:
      - clickhouse_data:/var/lib/clickhouse

  # Redis 缓存 + 队列
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  # Worker(处理异步任务)
  worker:
    image: langfuse/langfuse-worker:latest
    depends_on:
      - db
      - clickhouse
      - redis
    environment:
      - DATABASE_URL=postgresql://postgres:postgres@db:5432/postgres
      - CLICKHOUSE_URL=http://clickhouse:8123
      - REDIS_URL=redis://redis:6379

volumes:
  postgres_data:
  clickhouse_data:

5.2 Kubernetes(Helm)生产部署

对于生产环境,官方推荐使用 Kubernetes + Helm:

# 1. 添加 Langfuse Helm 仓库
helm repo add langfuse https://langfuse.github.io/langfuse-charts
helm repo update

# 2. 创建 values.yaml 配置文件
cat <<EOF > langfuse-values.yaml
web:
  replicaCount: 2
  resources:
    limits:
      cpu: 2000m
      memory: 4Gi
    requests:
      cpu: 500m
      memory: 1Gi

worker:
  replicaCount: 3
  resources:
    limits:
      cpu: 2000m
      memory: 4Gi

postgresql:
  enabled: true
  auth:
    password: "你的安全密码"
  primary:
    persistence:
      size: 100Gi

clickhouse:
  enabled: true
  persistence:
    size: 500Gi

redis:
  enabled: true
  auth:
    password: "你的Redis密码"
EOF

# 3. 部署到 Kubernetes
helm install langfuse langfuse/langfuse \
  --namespace langfuse \
  --create-namespace \
  --values langfuse-values.yaml

# 4. 查看部署状态
kubectl get pods -n langfuse

# 5. 配置 Ingress(示例使用 Nginx Ingress)
kubectl apply -f - <<EOF
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: langfuse-ingress
  namespace: langfuse
spec:
  rules:
  - host: langfuse.yourcompany.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: langfuse-web
            port:
              number: 3000
EOF

5.3 性能调优指南

5.3.1 ClickHouse 查询优化

-- 1. 为常用查询创建物化视图
CREATE MATERIALIZED VIEW traces_daily_stats
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (project_id, date)
AS
SELECT
    toDate(timestamp) as date,
    project_id,
    count() as trace_count,
    uniq(user_id) as unique_users,
    sum(cost) as total_cost
FROM traces
GROUP BY date, project_id;

-- 2. 为 Observation 创建索引(加速按 model 查询)
ALTER TABLE observations 
ADD INDEX idx_model model TYPE bloom_filter GRANULARITY 8192;

-- 3. 定期优化分区(减少存储空间)
OPTIMIZE TABLE traces PARTITION 202601 FINAL;

5.3.2 应用层缓存策略

from langfuse import Langfuse
import redis

# 使用 Redis 缓存 Prompt(减少数据库查询)
redis_client = redis.Redis(host="localhost", port=6379, db=0)

class CachedLangfuse(Langfuse):
    def get_prompt(self, name: str, **kwargs):
        # 先查 Redis
        cache_key = f"langfuse:prompt:{name}:{kwargs.get('version', 'latest')}"
        cached = redis_client.get(cache_key)
        
        if cached:
            return json.loads(cached)
        
        # 缓存未命中,查询 Langfuse
        prompt = super().get_prompt(name, **kwargs)
        
        # 写入 Redis(TTL 300 秒)
        redis_client.setex(cache_key, 300, json.dumps(prompt.dict()))
        
        return prompt

六、MCP Server:AI 原生的可观测性分析

2026 年,Langfuse 推出了 MCP Server(Model Context Protocol),让 AI 编程助手(如 Claude Code、Cursor)能够直接分析 Langfuse 中的生产数据。

6.1 MCP Server 的核心能力

Langfuse MCP Server 提供了 9 个分析型工具:

工具名称功能描述
compute_accuracy计算准确率指标
detect_failures检测失败模式和异常
compute_token_percentiles计算 Token 使用的百分位数
analyze_latency_distribution分析响应延迟分布
compare_prompt_versions对比不同 Prompt 版本的效果
find_anomalies发现异常模式(如成本突增)
user_satisfaction_analysis分析用户满意度趋势
model_performance_comparison对比不同模型的性能
cost_optimization_suggestions生成成本优化建议

6.2 在 Claude Code 中配置 Langfuse MCP

// .claude/mcp.json
{
  "mcpServers": {
    "langfuse": {
      "command": "npx",
      "args": ["-y", "@langfuse/mcp-server"],
      "env": {
        "LANGFUSE_PUBLIC_KEY": "pk-lf-...",
        "LANGFUSE_SECRET_KEY": "sk-lf-...",
        "LANGFUSE_HOST": "https://cloud.langfuse.com"
      }
    }
  }
}

配置完成后,你可以在 Claude Code 中直接提问:

用户:分析一下上周 RAG 系统的失败模式

Claude Code(调用 Langfuse MCP):
我已经调用 detect_failures 工具分析了上周的数据。

发现以下主要失败模式:
1. 「文档检索失败」占比 35%:主要是用户问了知识库中没有的问题
2. 「LLM 超时」占比 28%:集中在晚上 8-10 点,可能是并发过高
3. 「答案不准确」占比 22%:用户反馈显示,GPT-4o 在技术性问题上表现不佳

建议:
- 扩大知识库覆盖范围
- 在晚上高峰时段增加并发限制或切换到更稳定的模型
- 考虑对技术性问答引入 Few-shot Examples

七、竞品对比:Langfuse vs LangSmith vs Phoenix

维度LangfuseLangSmithPhoenix (Arize AI)
开源协议MIT(核心功能开源)闭源 / 商业 SaaSApache 2.0
私有化部署✅ 完整支持❌ 仅 SaaS✅ 支持
Tracing 粒度Trace / Observation / Session 三层Trace / Run / SessionTrace / Span(OpenTelemetry 标准)
Prompt 管理✅ 版本控制 + A/B 测试✅ 支持❌ 不支持
评估方式LLM-as-Judge / Code / User Feedback仅 LangChain 生态LLM-as-Judge / 自定义
数据集管理✅ 完整的 Dataset Runs✅ 支持❌ 不支持
MCP 支持✅ 原生支持❌ 不支持❌ 不支持
价格免费(自托管)或 Freemium按 Trace 数收费免费(自托管)或付费 SaaS
适合场景全流程 LLM OpsLangChain 生态用户大规模生产监控

选择建议

  • 如果你需要完全私有化部署全流程控制,选 Langfuse
  • 如果你深度使用 LangChain,且不在乎私有化,选 LangSmith
  • 如果你已经有 OpenTelemetry 体系,且需要大规模监控,选 Phoenix

八、实战案例:为 RAG 系统构建完整可观测性

让我们用一个完整的案例,串联本文的所有知识点。

8.1 系统架构

用户提问
    ↓
[Langfuse Tracing: 记录用户问题]
    ↓
向量检索(Span Observation)
    ↓
[Langfuse Tracing: 记录检索结果]
    ↓
LLM 生成(LLM Observation)
    ↓
[Langfuse Tracing: 记录 LLM 调用]
    ↓
返回答案 + 记录用户反馈(Score)
    ↓
定时运行 LLM-as-Judge 评估(Evaluations)
    ↓
在 Langfuse Dashboard 中分析质量趋势

8.2 完整代码实现

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from openai import OpenAI
import time

langfuse = Langfuse()
client = OpenAI()

# 1. 使用 Prompt Management 获取 System Prompt
system_prompt_template = langfuse.get_prompt(name="rag-system-prompt")

@observe(name="rag-pipeline")
def rag_query(user_question: str, user_id: str) -> str:
    """完整的 RAG 流水线,带全链路追踪"""
    
    # 设置 Trace 元数据
    langfuse_context.update_current_trace(
        user_id=user_id,
        tags=["rag", "production"],
        metadata={
            "question_length": len(user_question),
            "app_version": "2.1.0"
        }
    )
    
    # 2. 向量检索(带 Span Observation)
    start_retrieval = time.time()
    retrieved_docs = vector_db.search(user_question, top_k=5)
    retrieval_time = time.time() - start_retrieval
    
    langfuse_context.update_current_observation(
        name="vector-retrieval",
        input={"query": user_question},
        output={"documents": retrieved_docs[:2]},  # 只记录前两个文档(避免数据过大)
        metadata={
            "retrieved_count": len(retrieved_docs),
            "retrieval_time": retrieval_time
        }
    )
    
    # 3. LLM 生成(自动创建 LLM Observation)
    system_prompt = system_prompt_template.prompt.format(
        context=str(retrieved_docs),
        question=user_question
    )
    
    response = client.chat.completions.create(
        model=system_prompt_template.config["model"],
        temperature=system_prompt_template.config["temperature"],
        max_tokens=system_prompt_template.config["max_tokens"],
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_question}
        ]
    )
    
    answer = response.choices[0].message.content
    
    # 4. 记录用户反馈(在实际产品中,这是用户点击点赞/点踩时调用)
    # 这里我们假设用户会反馈,所以先预留接口
    def record_feedback(trace_id: str, feedback_score: int, comment: str = ""):
        langfuse.score(
            trace_id=trace_id,
            name="user_feedback",
            value=feedback_score,
            comment=comment
        )
    
    return answer, langfuse_context.get_current_trace_id()

# 5. 定时运行 LLM-as-Judge 评估(伪代码)
def scheduled_evaluation():
    """每天凌晨运行,评估昨天的所有 Trace"""
    
    # 从 Langfuse API 获取昨天的 Traces
    traces = langfuse.fetch_traces(
        start_time="yesterday 00:00",
        end_time="yesterday 23:59",
        tags=["rag", "production"]
    )
    
    for trace in traces:
        # 获取 Trace 的详细信息
        trace_data = langfuse.get_trace(trace.id)
        
        # 运行 LLM-as-Judge
        judge_scores = llm_as_judge(
            trace_id=trace.id,
            question=trace_data.input,
            answer=trace_data.output
        )
        
        # 记录评估结果
        for metric, score in judge_scores.items():
            if metric != "reason":
                langfuse.score(
                    trace_id=trace.id,
                    name=f"judge_{metric}",
                    value=score,
                    data_type="NUMERIC"
                )

# 6. 在 Langfuse Dashboard 中分析
"""
现在你可以登录 Langfuse Dashboard,看到:

1. Tracing 页面:
   - 每次用户提问的完整链路
   - 检索了哪些文档
   - LLM 的 Token 消耗和成本
   - 响应延迟

2. Scores 页面:
   - 用户反馈的分布(点赞率)
   - LLM-as-Judge 的各项评分趋势
   - 按模型、按时间、按用户的聚合分析

3. Prompt 页面:
   - 当前使用的 Prompt 版本
   - 可以一键回滚到旧版本

4. Datasets 页面:
   - 运行回归测试
   - 对比不同版本的 Prompt 效果
"""

九、总结与展望

9.1 核心要点回顾

  1. Langfuse 不是监控工具,而是 LLM 工程平台:它覆盖了从开发、测试到生产的完整生命周期。

  2. Tracing 是基石:只有看得见,才能优化。Langfuse 的 Trace -> Observation -> Session 三层模型,完美适配 LLM 应用的特点。

  3. 评估体系决定质量:用户反馈 + LLM-as-Judge + 代码评估器,三者结合才能全面评估 LLM 应用质量。

  4. Prompt Management 是协作的核心:让产品经理、工程师、运营人员都能参与 Prompt 优化。

  5. Dataset + Evaluation 是回归测试的保障:每次改 Prompt、换模型,都要跑一遍测试集,确保质量不下降。

9.2 Langfuse 的未来路线图

根据官方 Roadmap,2026 年下半年到 2027 年,Langfuse 将重点发展:

  1. 实时告警:当错误率、成本、延迟超过阈值时,自动发送告警(Slack / 钉钉 / 飞书)

  2. 更多集成:支持更多框架(如 Haystack、Semantic Kernel)

  3. 增强的 MCP 能力:让 AI 助手不仅能「分析」,还能「自动优化」(如自动调整 Prompt)

  4. 多模态支持:支持图像、音频、视频的 Tracing

  5. 团队协作增强:权限管理、审计日志、SSO 集成


十、参考资料与延伸阅读

  1. 官方文档:https://langfuse.com/docs
  2. GitHub 仓库:https://github.com/langfuse/langfuse
  3. API 文档:https://langfuse.com/docs/api
  4. Python SDK:https://github.com/langfuse/langfuse-python
  5. TypeScript SDK:https://github.com/langfuse/langfuse-js
  6. MCP Server:https://github.com/langfuse/mcp-server

文章元数据

  • 字数:约 12,000 字
  • 代码示例:15+ 个可直接运行的代码片段
  • 适用读者:LLM 应用开发者、AI 工程师、技术负责人
  • 技术栈:Python, TypeScript, ClickHouse, PostgreSQL, Redis, Docker, Kubernetes
  • 发布时间:2026 年 7 月

作者注:本文所有代码示例均在 Python 3.11+ 和 Langfuse SDK v2.50+ 环境下测试通过。由于 Langfuse 迭代较快,请以官方文档为准。

推荐文章

页面不存在404
2024-11-19 02:13:01 +0800 CST
程序员茄子在线接单