LangGraph 2026 生产级深度实战:从有状态图编排到企业级多租户 Agent 平台——AI Agent 工程化完全指南
作者: 程序员茄子 | 日期: 2026-05-23 | 字数: 约 12000 字
摘要
2026 年,AI Agent 已从「Demo 玩具」进化为企业核心生产系统。在众多 Agent 框架中,LangGraph 凭借其有状态图(Stateful Graph)架构、细粒度流程控制和企业级持久化能力,成为构建复杂生产级应用的首选框架。
本文将从架构原理、核心概念、生产级实战、性能优化、多租户部署五个维度,深度剖析 LangGraph 2026 的最新特性,并通过一个完整的「智能研究助手」项目,带你掌握从零到生产的完整链路。
目录
- 背景与趋势:为什么 2026 是 Agent 工程化元年
- 核心概念深度解析
- 2.1 StateGraph:有状态图的数学本质
- 2.2 Node 与 Edge:从 DAG 到循环图
- 2.3 Checkpointer:持久化与人间干预
- 2.4 Tool Calling:工具调用的底层机制
- 架构设计:LangGraph 为什么比 LangChain 更适合生产
- 3.1 从 Chain 到 Graph 的范式跃迁
- 3.2 状态管理的三种模式
- 3.3 人工介入(Human-in-the-Loop)的工程实现
- 生产级实战:构建「智能研究助手」系统
- 4.1 需求分析与架构设计
- 4.2 环境准备与依赖管理
- 4.3 定义状态模式(TypedDict + Annotated)
- 4.4 构建研究流程图谱
- 4.5 实现工具节点(搜索、抓取、总结)
- 4.6 条件分支与循环重试
- 4.7 人工审核节点
- 4.8 完整代码实现
- 性能优化:让 Agent 快起来
- 5.1 并行执行(Parallel Nodes)
- 5.2 流式输出(Streaming)
- 5.3 缓存策略(Semantic Cache)
- 5.4 Token 优化技巧
- 企业级部署:从单机到多租户平台
- 6.1 持久化后端选型(SQLite / PostgreSQL / Redis)
- 6.2 多租户隔离设计
- 6.3 异步任务队列(Celery + Redis)
- 6.4 监控与可观测性(LangSmith + OTEL)
- 6.5 安全加固(密钥管理、速率限制、审计日志)
- 实战案例:多 Agent 协作系统
- 7.1 Supervisor 模式
- 7.2 子图嵌套(Subgraph)
- 7.3 跨进程通信(Remote Graph)
- 调试与测试:让 Agent 可靠起来
- 8.1 LangGraph Studio 使用指南
- 8.2 单元测试与集成测试
- 8.3 故障注入与混沌工程
- 2026 新特性深度解读
- 9.1 LangGraph 0.3+ 新 API
- 9.2 多模态支持(Vision + Audio)
- 9.3 与 MCP 协议的集成
- 总结与展望
1. 背景与趋势:为什么 2026 是 Agent 工程化元年
1.1 从 Chatbot 到 Agent 的范式跃迁
2023-2024 年,大多数 AI 应用还停留在「单轮问答」或「简单 RAG」阶段。2025 年,随着 Function Calling 的成熟和 ReAct 模式的普及,Agent 开始具备「思考-行动-观察」的闭环能力。
但真正的转折点出现在 2026 年:
- 状态管理标准化:LangGraph 的 Checkpointer 机制成为行业事实标准
- 多 Agent 协作常态化:Supervisor 模式、Subgraph 嵌套成为生产标配
- 人工介入成为必需品:Human-in-the-Loop 不再是可选项,而是合规要求
- 企业级部署成熟:多租户、权限隔离、审计日志成为标配能力
1.2 为什么选择 LangGraph
在 2026 年的 Agent 框架生态中,主流选择包括:
| 框架 | 定位 | 优势 | 劣势 |
|---|---|---|---|
| LangGraph | 底层编排框架 | 精细控制、持久化、生产级 | 学习曲线陡峭 |
| CrewAI | 高层抽象框架 | 快速原型、角色分工直观 | 控制粒度粗、难以定制 |
| AutoGen | 多 Agent 对话 | 自然语言协作 | 不确定性强、难调试 |
| OpenAI Agents SDK | 官方 SDK | 与 GPT 深度集成 | 绑定 OpenAI 生态 |
| Google ADK | 谷歌生态 | 与 Vertex AI 集成 | 生态封闭 |
LangGraph 的核心竞争力:
- 有状态图(Stateful Graph):每个节点执行后状态自动持久化,支持中断恢复
- 循环与条件分支:真正支持非 DAG 流程(这是生产级 Agent 的刚需)
- 人工介入:在任意节点暂停,等待人工审核后继续
- 多后端支持:SQLite(开发)、PostgreSQL(生产)、Redis(高性能缓存)
- 生态完善:与 LangChain、LangSmith、LangServe 无缝集成
2. 核心概念深度解析
2.1 StateGraph:有状态图的数学本质
LangGraph 的核心是 StateGraph,它是对**有限状态机(FSM)**的泛化扩展。
数学定义
一个 StateGraph 可以定义为一个五元组:
G = (S, N, E, Σ, δ)
其中:
S:状态空间(State Schema),通常用TypedDict定义N:节点集合(Nodes),每个节点是一个 Python 函数E:边集合(Edges),定义节点间的流转关系Σ:输入字母表(用户输入 + 工具返回)δ:转移函数,δ: S × Σ → S
代码示例:最小 StateGraph
from typing import TypedDict, Annotated, Sequence
import operator
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph, END
# 定义状态模式
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
next_step: str
# 构建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("research", research_node)
workflow.add_node("write", write_node)
workflow.add_node("review", review_node)
# 定义边
workflow.add_edge("research", "write")
workflow.add_edge("write", "review")
workflow.add_conditional_edges(
"review",
should_continue,
{"continue": "research", END: END}
)
# 编译
app = workflow.compile()
关键点:
Annotated[Sequence[...], operator.add]表示状态字段是「追加模式」,每次节点执行后追加而非覆盖add_conditional_edges支持动态路由,这是实现循环和分支的基础
2.2 Node 与 Edge:从 DAG 到循环图
传统工作流引擎(如 Airflow)只支持有向无环图(DAG),但 Agent 流程往往需要循环(重试、自我修正)和动态分支(根据中间结果决定下一步)。
三种边类型
- 固定边(Fixed Edge):
add_edge("A", "B")—— A 执行完后必然执行 B - 条件边(Conditional Edge):
add_conditional_edges("A", routing_func, {label: node})—— 根据routing_func的返回值决定下一个节点 - 动态边(Dynamic Edge):通过
Command对象在节点内部动态决定下一个节点(LangGraph 0.3+)
代码示例:条件分支
def should_continue(state: AgentState) -> str:
"""决定是继续研究还是结束"""
messages = state["messages"]
last_message = messages[-1]
# 如果上次工具调用失败,重试
if "error" in last_message.content.lower():
if state.get("retry_count", 0) < 3:
return "retry"
else:
return END
# 如果质量达标,结束
if last_message.quality_score > 0.8:
return END
# 否则继续研究
return "continue"
workflow.add_conditional_edges(
"review",
should_continue,
{
"continue": "research",
"retry": "research",
END: END
}
)
2.3 Checkpointer:持久化与人间干预
Checkpointer 是 LangGraph 的杀手级特性,它解决了 Agent 生产化的两个核心问题:
- 中断恢复:Agent 执行到一半崩溃了,重启后从断点继续
- 人工介入:在关键决策点暂停,等待人工审核
持久化后端对比
| 后端 | 适用场景 | 性能 | 可靠性 |
|---|---|---|---|
SqliteSaver | 本地开发、单机部署 | 中 | 高(文件级锁) |
PostgresSaver | 生产环境、多进程 | 高 | 极高(ACID) |
RedisSaver | 高并发、临时状态 | 极高 | 中(需配置持久化) |
代码示例:配置 Checkpointer
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg
# 方式 1:SQLite(开发环境)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = workflow.compile(checkpointer=checkpointer)
result = app.invoke(initial_state, config={"configurable": {"thread_id": "1"}})
# 方式 2:PostgreSQL(生产环境)
with psycopg.connect("dbname=agentdb user=postgres password=xxx", autocommit=True) as conn:
checkpointer = PostgresSaver(conn)
app = workflow.compile(checkpointer=checkpointer)
# 中断点:在 review 节点前暂停
result = app.invoke(
initial_state,
config={
"configurable": {"thread_id": "user_456"},
"interrupt_before": ["review"] # 暂停点
}
)
# 人工审核...
human_approval = get_human_approval(result)
# 继续执行
result = app.invoke(
None, # 不需要重新传入 state,从 checkpointer 恢复
config={"configurable": {"thread_id": "user_456"}}
)
关键点:
thread_id是持久化的核心标识,相同thread_id的请求共享状态历史interrupt_before/interrupt_after定义人工介入点
2.4 Tool Calling:工具调用的底层机制
LangGraph 的工具调用基于 LangChain 的 Tool 抽象,但增加了并行调用和错误恢复能力。
工具定义的最佳实践
from langchain_core.tools import tool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(..., description="搜索关键词")
max_results: int = Field(5, description="最大结果数")
@tool("web_search", args_schema=SearchInput)
def web_search(query: str, max_results: int = 5) -> str:
"""使用 DuckDuckGo 搜索网络内容。
Args:
query: 搜索关键词
max_results: 最大结果数(默认 5)
Returns:
搜索结果的格式化文本
"""
try:
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
return "\n".join([
f"[{r['title']}]({r['href']})\n{r['body']}"
for r in results
])
except Exception as e:
return f"搜索失败: {str(e)}"
# 工具错误处理装饰器
def with_retry(max_retries=3, delay=1.0):
def decorator(func):
def wrapper(*args, **kwargs):
import time
for i in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if i == max_retries - 1:
raise
time.sleep(delay * (2 ** i)) # 指数退避
return wrapper
return decorator
@with_retry(max_retries=3)
def reliable_search(query: str) -> str:
# ...
pass
3. 架构设计:LangGraph 为什么比 LangChain 更适合生产
3.1 从 Chain 到 Graph 的范式跃迁
LangChain 的 Chain 是线性管道:
Input → Prompt → LLM → Output Parser → Output
这种模型适合简单的「输入-处理-输出」场景,但无法处理:
- 循环(重试、自我修正)
- 动态分支(根据中间结果决定路径)
- 长时间运行任务(需要中断恢复)
LangGraph 的 Graph 是通用状态机:
Node A → (condition) → Node B
↓
Node C → Node D → ... → END
核心差异:
| 特性 | LangChain Chain | LangGraph Graph |
|---|---|---|
| 流程控制 | 线性 | 任意图结构 |
| 循环支持 | ❌ | ✅ |
| 状态管理 | 无状态 | 有状态(Checkpointer) |
| 人工介入 | ❌ | ✅ |
| 长时间运行 | ❌ | ✅ |
3.2 状态管理的三种模式
LangGraph 支持三种状态字段的更新模式:
模式 1:追加模式(Append)
from typing import Annotated
import operator
class State(TypedDict):
messages: Annotated[list, operator.add] # 每次追加
适用场景:对话历史、执行日志
模式 2:覆盖模式(Overwrite)
class State(TypedDict):
current_step: str # 每次覆盖
result: dict # 每次覆盖
适用场景:当前状态、最终结果
模式 3:合并模式(Merge)
from dataclasses import dataclass
from typing import Annotated
def merge_dicts(left: dict, right: dict) -> dict:
return {**left, **right}
class State(TypedDict):
metadata: Annotated[dict, merge_dicts]
适用场景:逐步累积的中间结果
3.3 人工介入(Human-in-the-Loop)的工程实现
在生产环境中,某些决策必须经由人工审核:
- 金融场景:Agent 执行转账前需人工确认
- 医疗场景:Agent 给出诊断建议前需医生审核
- 内容审核:Agent 生成公开内容前需人工审核
实现方式:interrupt_before / interrupt_after
from langgraph.types import interrupt, Command
def human_review_node(state: State) -> Command:
"""人工审核节点"""
# 暂停并等待人工输入
review_result = interrupt({
"question": "请审核以下内容是否可发布:",
"content": state["draft"],
"options": ["approve", "reject", "revise"]
})
# 根据人工反馈决定下一步
if review_result == "approve":
return Command(goto=END, update={"status": "approved"})
elif review_result == "reject":
return Command(goto=END, update={"status": "rejected"})
else: # revise
return Command(goto="revise_node", update={"revision_notes": review_result})
# 注册节点
workflow.add_node("human_review", human_review_node)
前端集成示例(FastAPI + WebSocket):
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
@app.websocket("/agent/stream/{task_id}")
async def agent_stream(websocket: WebSocket, task_id: str):
await websocket.accept()
config = {
"configurable": {"thread_id": task_id},
"interrupt_before": ["human_review"]
}
# 启动 Agent
async for chunk in app.astream(input_state, config=config):
if "__interrupt__" in chunk:
# 发送中断信息到前端
await websocket.send_json({
"type": "interrupt",
"data": chunk["__interrupt__"][0].value
})
# 等待前端回复
user_response = await websocket.receive_json()
# 继续执行
async for chunk2 in app.astream(
Command(resume=user_response["action"]),
config=config
):
await websocket.send_json({"type": "chunk", "data": chunk2})
else:
await websocket.send_json({"type": "chunk", "data": chunk})
4. 生产级实战:构建「智能研究助手」系统
4.1 需求分析与架构设计
我们要构建一个智能研究助手,它能够:
- 接收用户的研究主题
- 自动搜索相关资料(Web Search + arXiv + GitHub)
- 对资料进行筛选和总结
- 生成结构化研究报告
- 支持人工审核和修正
- 持久化研究过程(可恢复)
系统架构图
用户输入
↓
[主题解析节点] → 提取关键词、确定搜索策略
↓
[并行搜索节点] → Web / arXiv / GitHub 同时搜索
↓
[去重与排序节点] → 基于相关性排序、去除低质量内容
↓
[内容抓取节点] → 并行抓取全文(支持 JS 渲染)
↓
[总结节点] → LLM 生成每篇内容的摘要
↓
[人工审核节点] → 暂停,等待用户确认/修改
↓
[报告生成节点] → 组装最终报告(Markdown + 图表)
↓
[输出节点] → 保存到文件 / 发送到用户邮箱
4.2 环境准备与依赖管理
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 安装依赖
pip install \
langgraph==0.3.5 \
langchain==0.3.15 \
langchain-openai==0.2.14 \
langchain-community==0.3.14 \
duckduckgo-search==6.2.11 \
arxiv==2.1.3 \
requests==2.32.3 \
beautifulsoup4==4.12.3 \
playwright==1.48.0 \
sqlalchemy==2.0.36 \
psycopg[binary]==3.2.3 \
redis==5.2.1 \
fastapi==0.115.6 \
uvicorn==0.34.0 \
pydantic==2.10.5 \
python-dotenv==1.0.0
# 安装 Playwright 浏览器
playwright install chromium
.env 配置文件:
# .env
OPENAI_API_KEY=sk-xxx
OPENAI_BASE_URL=https://api.openai.com/v1
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=lsv2_xxx
LANGCHAIN_PROJECT=research-agent-prod
# 数据库配置
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=agentdb
POSTGRES_USER=agentuser
POSTGRES_PASSWORD=xxx
# Redis 配置
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=xxx
4.3 定义状态模式(TypedDict + Annotated)
# state.py
from typing import TypedDict, Annotated, Sequence, List, Optional
import operator
from langchain_core.messages import BaseMessage
from pydantic import BaseModel, Field
class Source(BaseModel):
"""信息来源"""
url: str
title: str
snippet: str
full_text: Optional[str] = None
quality_score: float = 0.0
class ResearchState(TypedDict):
"""研究助手的状态模式"""
# 输入
topic: str # 研究主题
requirements: str # 用户额外要求(如「重点关注 2024 年后的论文」)
# 中间状态(追加模式)
messages: Annotated[Sequence[BaseMessage], operator.add]
search_results: Annotated[List[Source], operator.add]
summaries: Annotated[List[str], operator.add]
# 中间状态(覆盖模式)
current_step: str
retry_count: int
# 输出
draft_report: str # 草稿报告
final_report: str # 最终报告
status: str # pending | approved | rejected
设计要点:
search_results使用追加模式,因为每次搜索都会产生新结果current_step使用覆盖模式,用于跟踪当前执行步骤(方便调试)draft_report和final_report分开,支持人工审核流程
4.4 构建研究流程图谱
# graph.py
from langgraph.graph import StateGraph, END
from state import ResearchState
def build_research_graph():
"""构建研究助手的工作流图谱"""
workflow = StateGraph(ResearchState)
# 添加所有节点
workflow.add_node("parse_topic", parse_topic_node)
workflow.add_node("search_web", search_web_node)
workflow.add_node("search_arxiv", search_arxiv_node)
workflow.add_node("search_github", search_github_node)
workflow.add_node("deduplicate", deduplicate_node)
workflow.add_node("fetch_fulltext", fetch_fulltext_node)
workflow.add_node("summarize", summarize_node)
workflow.add_node("human_review", human_review_node)
workflow.add_node("generate_report", generate_report_node)
workflow.add_node("output", output_node)
# 定义流转逻辑
workflow.set_entry_point("parse_topic")
# 步骤 1:主题解析完成后,并行执行三个搜索
workflow.add_edge("parse_topic", "search_web")
workflow.add_edge("parse_topic", "search_arxiv")
workflow.add_edge("parse_topic", "search_github")
# 步骤 2:三个搜索都完成后,进入去重节点
workflow.add_edge("search_web", "deduplicate")
workflow.add_edge("search_arxiv", "deduplicate")
workflow.add_edge("search_github", "deduplicate")
# 步骤 3:去重后并行抓取全文
workflow.add_edge("deduplicate", "fetch_fulltext")
# 步骤 4:抓取完成后总结
workflow.add_edge("fetch_fulltext", "summarize")
# 步骤 5:总结后人工审核
workflow.add_edge("summarize", "human_review")
# 步骤 6:根据人工审核结果决定下一步
workflow.add_conditional_edges(
"human_review",
review_routing,
{
"approve": "generate_report",
"revise": "summarize", # 回到总结节点,使用人工反馈
"reject": END
}
)
# 步骤 7:报告生成后输出
workflow.add_edge("generate_report", "output")
workflow.add_edge("output", END)
return workflow
关键点:
- 使用
add_edge连接固定流程 - 使用
add_conditional_edges实现动态路由(人工审核后) - 并行节点(search_web / search_arxiv / search_github)通过共享的
deduplicate节点汇合
4.5 实现工具节点(搜索、抓取、总结)
节点 1:主题解析节点
# nodes/parse_topic.py
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from state import ResearchState
def parse_topic_node(state: ResearchState) -> dict:
"""解析用户主题,提取关键词和搜索策略"""
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
prompt = f"""
用户研究主题:{state["topic"]}
额外要求:{state.get("requirements", "无")}
请完成以下任务:
1. 提取 3-5 个核心关键词(英文)
2. 生成适合 Web 搜索的查询语句(中文 + 英文各 2 条)
3. 生成适合 arXiv 搜索的查询语句(英文,使用 arXiv 语法)
4. 判断是否需要搜索 GitHub(如果是技术主题,需要)
输出格式:JSON
"""
response = llm.invoke([HumanMessage(content=prompt)])
# 解析 LLM 输出(假设返回 JSON)
import json
parsed = json.loads(response.content)
return {
"current_step": "parse_topic_completed",
"messages": [response],
"topic": state["topic"],
"requirements": state.get("requirements", ""),
# 将解析结果存储到临时字段(实际应扩展到 State 定义)
"_parsed_plan": parsed
}
节点 2:Web 搜索节点
# nodes/search_web.py
from duckduckgo_search import DDGS
from state import ResearchState, Source
import time
def search_web_node(state: ResearchState) -> dict:
"""执行 Web 搜索"""
parsed_plan = state.get("_parsed_plan", {})
queries = parsed_plan.get("web_queries", [state["topic"]])
results = []
for query in queries:
try:
with DDGS() as ddgs:
search_results = list(ddgs.text(query, max_results=10))
for r in search_results:
results.append(Source(
url=r["href"],
title=r["title"],
snippet=r["body"][:200],
quality_score=0.5 # 默认质量分数
))
time.sleep(1) # 避免被限流
except Exception as e:
print(f"Web 搜索失败(query={query}): {e}")
return {
"search_results": results,
"current_step": "search_web_completed",
"messages": [f"Web 搜索完成,获得 {len(results)} 条结果"]
}
节点 3:arXiv 搜索节点
# nodes/search_arxiv.py
import arxiv
from state import ResearchState, Source
def search_arxiv_node(state: ResearchState) -> dict:
"""执行 arXiv 学术论文搜索"""
parsed_plan = state.get("_parsed_plan", {})
queries = parsed_plan.get("arxiv_queries", [state["topic"]])
results = []
for query in queries:
try:
search = arxiv.Search(
query=query,
max_results=20,
sort_by=arxiv.SortCriterion.Relevance
)
for result in search.results():
results.append(Source(
url=result.entry_id,
title=result.title,
snippet=result.summary[:200],
quality_score=0.8 # 学术论文质量较高
))
except Exception as e:
print(f"arXiv 搜索失败(query={query}): {e}")
return {
"search_results": results,
"current_step": "search_arxiv_completed",
"messages": [f"arXiv 搜索完成,获得 {len(results)} 条结果"]
}
节点 4:全文抓取节点(支持 JS 渲染)
# nodes/fetch_fulltext.py
import asyncio
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
from state import ResearchState, Source
import requests
from typing import List
async def fetch_with_playwright(url: str) -> str:
"""使用 Playwright 抓取需要 JS 渲染的页面"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto(url, timeout=30000)
await page.wait_for_load_state("networkidle")
content = await page.content()
await browser.close()
return content
def fetch_fulltext_node(state: ResearchState) -> dict:
"""并行抓取所有来源的全文"""
sources: List[Source] = state["search_results"]
updated_sources = []
for source in sources:
try:
# 优先使用 requests(速度快)
response = requests.get(source.url, timeout=10)
soup = BeautifulSoup(response.text, "html.parser")
# 提取正文(简单启发式)
article = soup.find("article") or soup.find("main") or soup.body
full_text = article.get_text(separator="\n", strip=True)[:5000]
if len(full_text) < 500: # 内容太少,可能是 JS 渲染页面
raise ValueError("Content too short, try Playwright")
source.full_text = full_text
source.quality_score = min(source.quality_score + 0.1, 1.0)
except Exception as e:
# fallback 到 Playwright
try:
full_text = asyncio.run(fetch_with_playwright(source.url))
soup = BeautifulSoup(full_text, "html.parser")
full_text = soup.get_text(separator="\n", strip=True)[:5000]
source.full_text = full_text
except Exception as e2:
print(f"抓取失败({source.url}): {e2}")
source.full_text = source.snippet # 使用摘要作为降级
updated_sources.append(source)
return {
"search_results": updated_sources,
"current_step": "fetch_fulltext_completed"
}
4.6 条件分支与循环重试
实现「质量不足则重试」逻辑
# nodes/summarize.py
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from state import ResearchState
def summarize_node(state: ResearchState) -> dict:
"""对全文进行总结"""
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
sources = state["search_results"]
summaries = []
for source in sources:
if not source.full_text:
continue
prompt = f"""
请对以下内容进行总结(200 字以内):
标题:{source.title}
内容:{source.full_text[:3000]}
要求:
1. 提取核心观点和方法
2. 指出与主题「{state["topic"]}」的相关性
3. 评估信息质量(高/中/低)
"""
response = llm.invoke([HumanMessage(content=prompt)])
summaries.append(response.content)
# 质量检查:如果总结太少,可能是抓取失败
if len(summaries) < 3 and state.get("retry_count", 0) < 2:
return {
"summaries": [],
"retry_count": state.get("retry_count", 0) + 1,
"current_step": "summarize_retry",
"_goto": "fetch_fulltext" # 回到抓取节点重试
}
return {
"summaries": summaries,
"current_step": "summarize_completed"
}
def review_routing(state: ResearchState) -> str:
"""根据总结质量决定下一步"""
if state.get("_goto") == "fetch_fulltext":
return "fetch_fulltext" # 重试
if state.get("status") == "approved":
return "generate_report"
return "human_review" # 默认进入人工审核
4.7 人工审核节点
# nodes/human_review.py
from langgraph.types import interrupt, Command
from state import ResearchState
def human_review_node(state: ResearchState) -> Command:
"""人工审核节点"""
# 构造审核界面需要的数据
review_data = {
"topic": state["topic"],
"draft_report": state.get("draft_report", ""),
"sources_count": len(state["search_results"]),
"summaries_preview": state["summaries"][:3], # 前 3 条预览
}
# 暂停并等待人工输入
human_input = interrupt(review_data)
# human_input 是前端返回的结果,格式:
# {
# "action": "approve" | "revise" | "reject",
# "feedback": "..." # 如果 action=revise,提供修改建议
# }
action = human_input.get("action", "reject")
if action == "approve":
return Command(
goto="generate_report",
update={"status": "approved"}
)
elif action == "revise":
# 将人工反馈加入到 summaries,让总结节点重新处理
return Command(
goto="summarize",
update={
"summaries": state["summaries"] + [human_input["feedback"]],
"status": "revising"
}
)
else: # reject
return Command(
goto=END,
update={"status": "rejected"}
)
4.8 完整代码实现
完整代码太长,此处展示核心框架,完整代码已上传到 GitHub(略)
编译并运行:
# main.py
from graph import build_research_graph
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg
def main():
# 构建图
workflow = build_research_graph()
# 配置持久化
with psycopg.connect(
"dbname=agentdb user=agentuser password=xxx",
autocommit=True
) as conn:
checkpointer = PostgresSaver(conn)
# 编译
app = workflow.compile(checkpointer=checkpointer)
# 初始状态
initial_state = {
"topic": "LangGraph 2026 生产级最佳实践",
"requirements": "重点关注持久化、多租户部署、性能优化",
"messages": [],
"search_results": [],
"summaries": [],
"current_step": "start",
"retry_count": 0,
"draft_report": "",
"final_report": "",
"status": "pending"
}
# 执行(同步模式)
config = {"configurable": {"thread_id": "research_001"}}
result = app.invoke(initial_state, config=config)
print("最终报告:")
print(result["final_report"])
if __name__ == "__main__":
main()
5. 性能优化:让 Agent 快起来
5.1 并行执行(Parallel Nodes)
LangGraph 支持真正的并行执行(基于 Python 的 asyncio):
# 方式 1:使用 Send 对象实现动态并行
from langgraph.types import Send
def parallel_search_node(state: ResearchState) -> List[Send]:
"""动态生成并行任务"""
queries = state["_parsed_plan"]["web_queries"]
return [
Send("search_web_single", {"query": q, "topic": state["topic"]})
for q in queries
]
workflow.add_node("search_web_single", search_web_single_node)
workflow.add_conditional_edges("parse_topic", parallel_search_node)
5.2 流式输出(Streaming)
对于长时间运行的 Agent,流式输出能显著改善用户体验:
# 流式执行
for chunk in app.stream(initial_state, config=config):
for node_name, node_output in chunk.items():
print(f"[{node_name}] {node_output}")
# Async 流式执行(FastAPI)
async for chunk in app.astream(initial_state, config=config):
await websocket.send_json(chunk)
5.3 缓存策略(Semantic Cache)
使用 Redis 缓存相似查询的 LLM 响应:
from langchain_community.cache import InMemoryCache
from langchain_openai import ChatOpenAI
# 启用缓存
llm = ChatOpenAI(model="gpt-4o", cache=True)
# 自定义语义缓存(Redis + Embedding)
import redis
from langchain_community.embeddings import OpenAIEmbeddings
class SemanticCache:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.embeddings = OpenAIEmbeddings()
def get(self, query: str, threshold: float = 0.95):
"""语义相似度查询"""
query_embedding = self.embeddings.embed_query(query)
# 省略具体实现(使用 Redis VSS 或近似最近邻搜索)
...
5.4 Token 优化技巧
- 使用更短的 System Prompt
- 对长文档使用 Map-Reduce 总结
- 启用 Response Format(JSON Mode)减少输出 Token
llm = ChatOpenAI(
model="gpt-4o",
model_kwargs={
"response_format": {"type": "json_object"}
}
)
6. 企业级部署:从单机到多租户平台
6.1 持久化后端选型
| 场景 | 推荐后端 | 原因 |
|---|---|---|
| 本地开发 | SqliteSaver | 零配置、单机文件 |
| 小型生产 | PostgresSaver | ACID、成熟生态 |
| 高并发 | RedisSaver + PostgresSaver | Redis 做缓存、PostgreSQL 做归档 |
6.2 多租户隔离设计
# multi_tenant.py
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg
class MultiTenantCheckpointer:
"""多租户 Checkpointer 封装"""
def __init__(self, db_url: str):
self.db_url = db_url
def get_checkpointer(self, tenant_id: str):
"""为每个租户创建独立的 schema"""
with psycopg.connect(self.db_url, autocommit=True) as conn:
# 创建租户专属 schema
conn.execute(f'CREATE SCHEMA IF NOT EXISTS "{tenant_id}"')
# 创建 checkpointer(指定 schema)
checkpointer = PostgresSaver(
conn,
schema=tenant_id
)
return checkpointer
# 使用
checkpointer_factory = MultiTenantCheckpointer("dbname=agentdb")
@app.post("/agent/run")
def run_agent(tenant_id: str, topic: str):
checkpointer = checkpointer_factory.get_checkpointer(tenant_id)
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": f"{tenant_id}_{topic}"}}
result = app.invoke({"topic": topic}, config=config)
return result
6.3 异步任务队列
对于长时间运行的 Agent,应使用任务队列:
# task_queue.py
from celery import Celery
from langgraph.graph import StateGraph
celery = Celery("agent_tasks", broker="redis://localhost:6379")
@celery.task(bind=True)
def run_agent_task(self, tenant_id: str, topic: str):
"""Celery 任务:执行 Agent"""
# 更新任务状态
self.update_state(state="PROGRESS", meta={"current_step": "parsing"})
# 执行 Agent
checkpointer = get_checkpointer(tenant_id)
app = workflow.compile(checkpointer=checkpointer)
result = app.invoke(
{"topic": topic},
config={"configurable": {"thread_id": self.request.id}}
)
return {"status": "completed", "result": result["final_report"]}
# 调用
task = run_agent_task.delay("tenant_001", "LangGraph 最佳实践")
print(task.id) # 返回任务 ID,用于查询进度
6.4 监控与可观测性
集成 LangSmith
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "lsv2_xxx"
os.environ["LANGCHAIN_PROJECT"] = "research-agent-prod"
# 执行时自动上报到 LangSmith
result = app.invoke(initial_state)
集成 OpenTelemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# 配置 OTEL
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
# 在节点中手动打点
def search_web_node(state: ResearchState) -> dict:
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("search_web") as span:
span.set_attribute("topic", state["topic"])
# ... 执行搜索
span.set_attribute("results_count", len(results))
return {"search_results": results}
6.5 安全加固
密钥管理(使用 HashiCorp Vault)
import hvac
client = hvac.Client(url="http://vault:8200")
client.auth.approle.login(role_id="xxx", secret_id="xxx")
# 读取密钥
openai_key = client.secrets.kv.v2.read_secret_version(
path="openai"
)["data"]["data"]["api_key"]
速率限制(使用 Redis)
# rate_limit.py
import redis
import time
class RateLimiter:
def __init__(self, redis_url: str, max_requests: int = 100, window: int = 3600):
self.redis = redis.from_url(redis_url)
self.max_requests = max_requests
self.window = window
def is_allowed(self, tenant_id: str) -> bool:
key = f"ratelimit:{tenant_id}:{int(time.time() / self.window)}"
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, self.window)
return current <= self.max_requests
# 使用
limiter = RateLimiter("redis://localhost:6379")
@app.post("/agent/run")
def run_agent(tenant_id: str, topic: str):
if not limiter.is_allowed(tenant_id):
raise HTTPException(429, "速率限制:每小时最多 100 次请求")
# ...
7. 实战案例:多 Agent 协作系统
7.1 Supervisor 模式
当一个任务需要多个专业 Agent 协作时,使用 Supervisor 模式:
# multi_agent.py
from langgraph.graph import StateGraph, END
from langgraph.types import Send
class SupervisorState(TypedDict):
messages: Annotated[list, operator.add]
next_agent: str
task_description: str
def supervisor_node(state: SupervisorState) -> dict:
"""Supervisor:决定下一个执行的 Agent"""
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
prompt = f"""
当前任务:{state["task_description"]}
已完成步骤:{state["messages"]}
请决定下一个执行的 Agent(从以下列表):
- researcher:负责信息搜索和总结
- writer:负责撰写报告
- reviewer:负责审核和修改
输出格式:单个 Agent 名称
"""
response = llm.invoke([HumanMessage(content=prompt)])
next_agent = response.content.strip()
return {"next_agent": next_agent}
# 构建 Supervisor 图
workflow = StateGraph(SupervisorState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("reviewer", reviewer_agent)
workflow.set_entry_point("supervisor")
# 动态路由:Supervisor 决定下一个 Agent
workflow.add_conditional_edges(
"supervisor",
lambda s: s["next_agent"],
{
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
END: END
}
)
# Agent 执行完后回到 Supervisor
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("writer", "supervisor")
workflow.add_edge("reviewer", "supervisor")
7.2 子图嵌套(Subgraph)
对于复杂 Agent,可以将其封装为子图:
def build_inner_graph():
"""子图:负责一个子任务"""
inner_workflow = StateGraph(InnerState)
inner_workflow.add_node("step1", step1_node)
inner_workflow.add_node("step2", step2_node)
inner_workflow.add_edge("step1", "step2")
inner_workflow.set_entry_point("step1")
inner_workflow.add_edge("step2", END)
return inner_workflow.compile()
# 在主图中调用子图
main_workflow.add_node("inner_task", build_inner_graph())
7.3 跨进程通信(Remote Graph)
在微服务架构中,不同 Agent 可能运行在不同进程中:
# 使用 LangServe 将 Agent 部署为 HTTP 服务
from langserve import add_routes
from fastapi import FastAPI
app = FastAPI()
add_routes(app, research_agent, path="/research")
# 另一个 Agent 通过 HTTP 调用
import requests
def call_remote_agent(query: str) -> str:
response = requests.post(
"http://research-agent:8000/research/invoke",
json={"input": {"topic": query}}
)
return response.json()["output"]
8. 调试与测试:让 Agent 可靠起来
8.1 LangGraph Studio 使用指南
LangGraph Studio 是官方提供的可视化调试工具:
# 安装 LangGraph CLI
pip install langgraph-cli
# 启动 Studio(会自动打开浏览器)
langgraph studio -m my_agent.main --config langgraph.json
核心功能:
- 可视化流程图:实时查看 Agent 执行路径
- 状态检查:查看每个节点执行后的状态快照
- 时间旅行调试:回滚到任意历史状态重新执行
- 人工介入模拟:在 Studio 中模拟人工审核
8.2 单元测试与集成测试
# test_agent.py
import pytest
from main import build_research_graph
def test_parse_topic_node():
"""测试主题解析节点"""
workflow = build_research_graph()
app = workflow.compile()
initial_state = {
"topic": "LangGraph 教程",
"requirements": "",
"messages": [],
"search_results": [],
"summaries": [],
"current_step": "start",
"retry_count": 0,
"draft_report": "",
"final_report": "",
"status": "pending"
}
result = app.invoke(initial_state)
assert "_parsed_plan" in result
assert "web_queries" in result["_parsed_plan"]
def test_human_review_interrupt():
"""测试人工审核中断"""
workflow = build_research_graph()
app = workflow.compile()
config = {
"configurable": {"thread_id": "test_001"},
"interrupt_before": ["human_review"]
}
# 执行到中断点
result = app.invoke(initial_state, config=config)
assert "__interrupt__" in result
# 模拟人工输入
human_response = {"action": "approve"}
result = app.invoke(
Command(resume=human_response),
config=config
)
assert result["status"] == "approved"
8.3 故障注入与混沌工程
# chaos.py
import random
def chaotic_search_node(state: ResearchState) -> dict:
"""注入故障的搜索节点(用于测试)"""
if random.random() < 0.3: # 30% 概率失败
raise Exception("模拟网络故障")
# 正常执行
return search_web_node(state)
# 在测试环境中使用
if os.getenv("ENV") == "test":
workflow.add_node("search_web", chaotic_search_node)
9. 2026 新特性深度解读
9.1 LangGraph 0.3+ 新 API
Command 对象:节点内动态路由
from langgraph.types import Command
def my_node(state: State) -> Command:
if state["score"] > 0.8:
return Command(goto=END, update={"status": "success"})
else:
return Command(goto="retry_node", update={"retry_count": state["retry_count"] + 1})
Send 对象:动态并行
(已在 5.1 节介绍)
9.2 多模态支持
LangGraph 0.3+ 原生支持多模态输入:
from langchain_core.messages import HumanMessage
def vision_node(state: State) -> dict:
llm = ChatOpenAI(model="gpt-4o") # gpt-4o 支持视觉
message = HumanMessage(
content=[
{"type": "text", "text": "图片内容是什么?"},
{"type": "image_url", "image_url": {"url": "https://example.com/image.jpg"}}
]
)
response = llm.invoke([message])
return {"vision_result": response.content}
9.3 与 MCP 协议的集成
MCP(Model Context Protocol) 是 2026 年流行的工具集成标准。LangGraph 通过 langchain-mcp 适配器支持 MCP:
from langchain_mcp import MCPToolkit
# 连接到 MCP 服务器(例如文件系统工具)
toolkit = MCPToolkit.from_server("filesystem", "npx -y @modelcontextprotocol/server-filesystem /tmp")
tools = toolkit.get_tools()
# 在 Agent 中使用 MCP 工具
llm_with_tools = llm.bind_tools(tools)
10. 总结与展望
本文回顾
本文从以下九个维度深度剖析了 LangGraph 2026 的生产级实践:
- 背景与趋势:2026 是 Agent 工程化元年
- 核心概念:StateGraph、Node、Edge、Checkpointer
- 架构设计:为什么 LangGraph 比 LangChain 更适合生产
- 生产级实战:构建了一个完整的「智能研究助手」
- 性能优化:并行、流式、缓存、Token 优化
- 企业级部署:多租户、任务队列、监控、安全
- 多 Agent 协作:Supervisor、Subgraph、Remote Graph
- 调试与测试:LangGraph Studio、单元测试、混沌工程
- 2026 新特性:Command、多模态、MCP 集成
LangGraph 的适用场景
✅ 适合:
- 需要精细控制流程的生产级 Agent
- 长时间运行任务(需要中断恢复)
- 需要人工介入的合规场景
- 多 Agent 协作系统
❌ 不适合:
- 简单的单轮问答(用 LangChain Chain 即可)
- 快速原型验证(用 CrewAI 更快)
- 对学习曲线敏感的团队
未来展望
- 与 A2A 协议的集成:Agent-to-Agent 通信将成为多 Agent 系统的标准
- 边缘部署:在浏览器端运行轻量级 Agent(WebAssembly + ONNX Runtime)
- 自我进化:Agent 通过强化学习优化自己的流程图结构
参考资料
- LangGraph 官方文档
- LangGraph GitHub
- Productionizing LangGraph
- Human-in-the-Loop with LangGraph
- LangGraph Checkpointer Deep Dive
版权声明:本文由程序员茄子原创,转载请注明出处。
更新日期:2026-05-23
字数统计:约 12000 字