LangGraph 深度实战:从状态图到生产级 Agent 系统——用图结构重新定义 AI 工程边界
2026 年,AI Agent 已经从实验室走进了每个工程团队。LangChain 的"链式调用"时代正在谢幕,而 LangGraph 用"图结构"接管了复杂 Agent 编排的话语权。本文是一篇 5000+ 字的深度实战指南,带你从原理到生产落地,彻底搞懂 LangGraph。
一、背景:为什么链式调用不够用了?
2022 年,LangChain 横空出世,用 Chain(链)的思路串联 LLM 调用。那时候大多数应用都是:输入 → Prompt → LLM → 输出,线性的,可预期的。
但 2025 年之后,业务需求变了:
- 用户要求 Agent 能"回头重来":如果搜索结果不好,重新搜索
- 要求多个 Agent 并发协作,互相通信
- 要求在关键决策点"暂停等人审批"(Human-in-the-loop)
- 要求 Agent 的执行过程可回溯、可 replay
线性 Chain 处理这些需求非常痛苦——你得手动管理状态、手写循环、自己实现分支逻辑。代码很快变成意大利面。
LangGraph 的核心洞察:AI Agent 的执行流程本质上是一个有向图。节点是执行单元,边是流转逻辑,State 是全局共享的上下文。把业务流程画成图,比手写 if-else 直观 10 倍,维护 10 倍容易。
二、核心概念:三要素一图
LangGraph 的核心就三个概念:
2.1 State(状态)
State 是贯穿整个图的"公共黑板",所有节点都可以读写它。
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, add_messages
class AgentState(TypedDict):
# messages 字段使用 add_messages reducer,自动追加而非覆盖
messages: Annotated[list, add_messages]
# 自定义业务字段
search_results: list[str]
retry_count: int
final_answer: str
这里有个关键点:Reducer。默认情况下,节点返回的值会"覆盖"State 对应字段。但对于 messages(消息历史),我们想要追加而不是覆盖,所以用 add_messages 这个内置 Reducer。
你也可以自定义 Reducer:
from typing import Annotated
import operator
def merge_results(left: list, right: list) -> list:
"""去重合并搜索结果"""
return list(set(left + right))
class ResearchState(TypedDict):
# 自定义 reducer,搜索结果去重合并
search_results: Annotated[list, merge_results]
# 普通字段,直接覆盖
current_query: str
is_done: bool
2.2 Nodes(节点)
节点是执行单元,本质是一个函数:接收 State,返回更新后的 State(部分更新即可)。
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def research_planner(state: AgentState) -> dict:
"""规划节点:分析用户问题,制定研究计划"""
system_prompt = """你是一个研究规划专家。
分析用户的研究问题,制定一个清晰的调研计划。
返回 3-5 个具体的搜索查询词,用于后续信息收集。"""
response = llm.invoke([
SystemMessage(content=system_prompt),
*state["messages"]
])
return {"messages": [response]}
def web_searcher(state: AgentState) -> dict:
"""搜索节点:执行网络搜索"""
# 从最新消息中提取搜索词
last_message = state["messages"][-1].content
# 模拟搜索(实际替换为 SerpAPI、Tavily 等)
results = mock_search(last_message)
return {
"search_results": results,
"retry_count": state.get("retry_count", 0)
}
def quality_checker(state: AgentState) -> dict:
"""质量检查节点:判断搜索结果是否足够"""
results = state.get("search_results", [])
retry_count = state.get("retry_count", 0)
if len(results) < 3 and retry_count < 2:
# 结果不够,需要重新搜索
return {"retry_count": retry_count + 1}
else:
return {"is_done": True}
def report_writer(state: AgentState) -> dict:
"""报告生成节点:基于搜索结果撰写报告"""
system_prompt = """基于提供的搜索结果,撰写一份详细的研究报告。
要求:逻辑清晰,数据准确,有独特见解。"""
context = "\n".join(state.get("search_results", []))
response = llm.invoke([
SystemMessage(content=f"{system_prompt}\n\n参考资料:\n{context}"),
*state["messages"]
])
return {
"messages": [response],
"final_answer": response.content
}
2.3 Edges(边)
边决定执行流转逻辑。LangGraph 支持三种边:
普通边:无条件跳转
graph.add_edge("planner", "searcher")
条件边:根据 State 动态决定下一个节点
def route_after_check(state: AgentState) -> str:
"""根据质量检查结果决定流转方向"""
if state.get("is_done"):
return "writer" # 结果够了,写报告
elif state.get("retry_count", 0) < 2:
return "searcher" # 重试搜索
else:
return "writer" # 超过最大重试次数,强制写报告
graph.add_conditional_edges(
"quality_check",
route_after_check,
{
"writer": "writer",
"searcher": "searcher"
}
)
入口和出口:
from langgraph.graph import START, END
graph.add_edge(START, "planner") # 图的入口
graph.add_edge("writer", END) # 图的出口
三、架构设计:搭建完整的研究 Agent
把上面三个概念组合起来,搭建一个完整的"超级研究员" Agent:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
def build_research_agent():
# 1. 初始化图
workflow = StateGraph(AgentState)
# 2. 添加节点
workflow.add_node("planner", research_planner)
workflow.add_node("searcher", web_searcher)
workflow.add_node("quality_check", quality_checker)
workflow.add_node("writer", report_writer)
# 3. 定义边(流转逻辑)
workflow.add_edge(START, "planner") # 入口 → 规划
workflow.add_edge("planner", "searcher") # 规划 → 搜索
workflow.add_edge("searcher", "quality_check") # 搜索 → 质量检查
# 条件边:根据质量检查结果决定
workflow.add_conditional_edges(
"quality_check",
route_after_check,
{"writer": "writer", "searcher": "searcher"}
)
workflow.add_edge("writer", END) # 写报告 → 结束
# 4. 添加 checkpoint(持久化 State,支持 Human-in-the-loop)
memory = MemorySaver()
# 5. 编译图
agent = workflow.compile(checkpointer=memory)
return agent
# 运行 Agent
agent = build_research_agent()
# 配置线程 ID(每个对话独立的上下文)
config = {"configurable": {"thread_id": "research-001"}}
result = agent.invoke(
{"messages": [HumanMessage(content="分析 2026 年 AI Agent 技术的发展趋势")]},
config=config
)
print(result["final_answer"])
3.1 可视化执行图
LangGraph 支持一行代码生成图的可视化:
# 生成 Mermaid 图表
from IPython.display import display, Image
display(Image(agent.get_graph().draw_mermaid_png()))
输出类似:
START → planner → searcher → quality_check
↑ ↓
└──────────────┘ (retry)
↓
writer → END
四、进阶能力:Human-in-the-Loop
这是 LangGraph 最强大的功能之一——在执行到关键节点时,暂停等待人类输入,然后继续执行。
4.1 设置中断点
# 编译时指定需要中断的节点
agent = workflow.compile(
checkpointer=memory,
interrupt_before=["writer"] # 在写报告前中断,让人类审核搜索结果
)
4.2 执行到中断点
config = {"configurable": {"thread_id": "research-002"}}
# 第一次运行:执行到 writer 节点前暂停
result = agent.invoke(
{"messages": [HumanMessage(content="分析量子计算的商业化进展")]},
config=config
)
# 此时 result 包含已完成的搜索结果
print("搜索结果:")
for r in result["search_results"]:
print(f" - {r}")
# 可以检查当前状态
current_state = agent.get_state(config)
print(f"下一步:{current_state.next}") # ('writer',)
4.3 人类审核后继续执行
# 人类可以修改状态(例如补充额外信息)
agent.update_state(
config,
{"search_results": result["search_results"] + ["额外补充:量子纠错突破最新进展..."]}
)
# 继续执行(从中断点恢复)
final_result = agent.invoke(None, config=config)
print(final_result["final_answer"])
4.4 审批工作流实战
一个更实际的例子——邮件发送审批工作流:
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END, add_messages
from langchain_core.messages import HumanMessage, AIMessage
class EmailWorkflowState(TypedDict):
messages: Annotated[list, add_messages]
draft_email: str
approved: bool
recipient: str
def draft_email_node(state: EmailWorkflowState) -> dict:
"""起草邮件"""
prompt = f"为 {state['recipient']} 起草一封专业的商务邮件,内容:{state['messages'][-1].content}"
response = llm.invoke([HumanMessage(content=prompt)])
return {
"messages": [response],
"draft_email": response.content
}
def send_email_node(state: EmailWorkflowState) -> dict:
"""发送邮件(只有 approved=True 才会到这里)"""
# 实际发送逻辑
print(f"✅ 邮件已发送给 {state['recipient']}")
return {"messages": [AIMessage(content="邮件发送成功!")]}
def reject_email_node(state: EmailWorkflowState) -> dict:
"""拒绝发送"""
return {"messages": [AIMessage(content="邮件发送已取消。")]}
def route_after_approval(state: EmailWorkflowState) -> str:
if state.get("approved"):
return "send"
return "reject"
# 构建工作流
workflow = StateGraph(EmailWorkflowState)
workflow.add_node("draft", draft_email_node)
workflow.add_node("send", send_email_node)
workflow.add_node("reject", reject_email_node)
workflow.add_edge(START, "draft")
workflow.add_conditional_edges(
"draft",
route_after_approval,
{"send": "send", "reject": "reject"}
)
workflow.add_edge("send", END)
workflow.add_edge("reject", END)
memory = MemorySaver()
email_agent = workflow.compile(
checkpointer=memory,
interrupt_after=["draft"] # 起草后中断等待审批
)
config = {"configurable": {"thread_id": "email-001"}}
# 触发工作流
email_agent.invoke(
{
"messages": [HumanMessage(content="通知项目团队明天下午3点开会")],
"recipient": "team@company.com",
"approved": False
},
config=config
)
# 查看草稿
state = email_agent.get_state(config)
print("📧 邮件草稿:")
print(state.values["draft_email"])
# 人类审批(批准发送)
email_agent.update_state(config, {"approved": True})
# 继续执行
email_agent.invoke(None, config=config)
五、多 Agent 协作:Supervisor 架构
LangGraph 支持构建多 Agent 系统,最常见的是 Supervisor 架构:一个 Supervisor Agent 负责分配任务,多个 Worker Agent 负责执行。
from langgraph.graph import StateGraph, START, END
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
# 创建专门的 Worker Agent
research_agent = create_react_agent(
llm,
tools=[web_search_tool, arxiv_search_tool],
name="researcher",
prompt="你是研究专家,负责搜集和分析信息。"
)
code_agent = create_react_agent(
llm,
tools=[python_repl_tool, code_sandbox_tool],
name="coder",
prompt="你是代码专家,负责编写和调试代码。"
)
writing_agent = create_react_agent(
llm,
tools=[],
name="writer",
prompt="你是写作专家,负责将信息整理成清晰的文章。"
)
# 创建 Supervisor
supervisor = create_supervisor(
agents=[research_agent, code_agent, writing_agent],
model=llm,
prompt="""你是一个项目经理。根据任务需求,决定让哪个专家来处理:
- 需要搜索信息 → researcher
- 需要写代码 → coder
- 需要写文档 → writer
当任务完成时,汇总结果并结束。"""
)
multi_agent = supervisor.compile(checkpointer=MemorySaver())
5.1 Swarm 架构(Agent 自主切换)
另一种模式是 Swarm(蜂群):Agent 之间可以互相"移交"控制权,不需要中央 Supervisor。
from langgraph_swarm import create_swarm, create_handoff_tool
# 定义移交工具
transfer_to_coder = create_handoff_tool(
agent_name="coder",
description="当需要写代码或调试程序时,移交给代码专家"
)
transfer_to_writer = create_handoff_tool(
agent_name="writer",
description="当需要写文档或报告时,移交给写作专家"
)
# 研究 Agent(可以移交给 coder 或 writer)
research_agent = create_react_agent(
llm,
tools=[web_search_tool, transfer_to_coder, transfer_to_writer],
name="researcher"
)
# 代码 Agent(可以移交回 researcher 或给 writer)
transfer_to_researcher = create_handoff_tool(agent_name="researcher")
code_agent = create_react_agent(
llm,
tools=[python_repl_tool, transfer_to_researcher, transfer_to_writer],
name="coder"
)
# 组装 Swarm
swarm = create_swarm(
agents=[research_agent, code_agent, writing_agent],
default_active_agent="researcher" # 从 researcher 开始
)
六、持久化与生产级部署
6.1 生产级 Checkpoint
内存版 MemorySaver 不适合生产,进程重启就丢失了。生产环境需要持久化:
# PostgreSQL 持久化(推荐生产使用)
from langgraph.checkpoint.postgres import PostgresSaver
connection_string = "postgresql://user:password@localhost:5432/langgraph_db"
checkpointer = PostgresSaver.from_conn_string(connection_string)
# 初始化数据库表(只需执行一次)
checkpointer.setup()
agent = workflow.compile(checkpointer=checkpointer)
# Redis 持久化(适合高并发场景)
from langgraph.checkpoint.redis import RedisSaver
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
agent = workflow.compile(checkpointer=checkpointer)
6.2 流式输出
LangGraph 原生支持流式输出,适合 Web 应用实时展示进度:
import asyncio
async def stream_research(query: str):
config = {"configurable": {"thread_id": f"stream-{query[:10]}"}}
async for chunk in agent.astream(
{"messages": [HumanMessage(content=query)]},
config=config,
stream_mode="updates" # 每次节点更新都推送
):
node_name = list(chunk.keys())[0]
node_output = chunk[node_name]
# 实时打印节点执行情况
print(f"\n[{node_name}] 执行完成")
if "messages" in node_output:
last_msg = node_output["messages"][-1]
if hasattr(last_msg, "content"):
print(f"输出:{last_msg.content[:200]}...")
# 运行
asyncio.run(stream_research("分析 Rust 在系统编程中的最新进展"))
也可以流式获取 Token 级别的输出:
async for chunk in agent.astream(
{"messages": [HumanMessage(content=query)]},
config=config,
stream_mode="messages" # Token 级流式
):
if isinstance(chunk, tuple):
message, metadata = chunk
if hasattr(message, "content") and message.content:
print(message.content, end="", flush=True)
6.3 LangGraph Cloud / LangGraph Platform
LangChain 官方提供了 LangGraph Platform,一个专门用于部署 LangGraph 应用的托管服务:
# langgraph.json 配置文件
{
"dependencies": ["."],
"graphs": {
"research_agent": "./src/agent.py:graph",
"email_workflow": "./src/email.py:workflow"
},
"env": ".env"
}
部署后会自动提供:
- REST API(
POST /runs,GET /runs/{run_id}/stream) - WebSocket 支持
- 水平扩展
- 内置监控和可观测性
七、性能优化:让 Agent 跑得更快
7.1 节点并发执行
LangGraph 支持节点并发——没有依赖关系的节点可以并行执行:
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class ParallelState(TypedDict):
query: str
# 三个并发搜索结果,用 operator.add 合并
web_results: Annotated[list, operator.add]
arxiv_results: Annotated[list, operator.add]
news_results: Annotated[list, operator.add]
final_report: str
def web_search(state: ParallelState) -> dict:
"""并发节点1:网页搜索"""
results = search_web(state["query"])
return {"web_results": results}
def arxiv_search(state: ParallelState) -> dict:
"""并发节点2:学术搜索"""
results = search_arxiv(state["query"])
return {"arxiv_results": results}
def news_search(state: ParallelState) -> dict:
"""并发节点3:新闻搜索"""
results = search_news(state["query"])
return {"news_results": results}
def synthesize(state: ParallelState) -> dict:
"""汇总节点:在所有并发节点完成后执行"""
all_results = state["web_results"] + state["arxiv_results"] + state["news_results"]
report = generate_report(state["query"], all_results)
return {"final_report": report}
# 构建并发图
workflow = StateGraph(ParallelState)
workflow.add_node("web_search", web_search)
workflow.add_node("arxiv_search", arxiv_search)
workflow.add_node("news_search", news_search)
workflow.add_node("synthesize", synthesize)
# 从 START 同时触发三个搜索节点(并发)
workflow.add_edge(START, "web_search")
workflow.add_edge(START, "arxiv_search")
workflow.add_edge(START, "news_search")
# 三个节点都完成后执行汇总
workflow.add_edge("web_search", "synthesize")
workflow.add_edge("arxiv_search", "synthesize")
workflow.add_edge("news_search", "synthesize")
workflow.add_edge("synthesize", END)
parallel_agent = workflow.compile()
这样三个搜索任务并发执行,总耗时从串行的 3 × T 降低到约 1 × T。
7.2 缓存节点结果
from functools import lru_cache
import hashlib
def cached_search(state: AgentState) -> dict:
"""带缓存的搜索节点"""
query = state["messages"][-1].content
cache_key = hashlib.md5(query.encode()).hexdigest()
# 检查缓存
cached = redis_client.get(f"search:{cache_key}")
if cached:
return {"search_results": json.loads(cached)}
# 执行搜索
results = actual_search(query)
# 写入缓存(1小时过期)
redis_client.setex(f"search:{cache_key}", 3600, json.dumps(results))
return {"search_results": results}
7.3 动态限流防止 API 超额
import asyncio
from collections import deque
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.calls = deque()
async def acquire(self):
now = datetime.now()
# 清理过期的调用记录
while self.calls and self.calls[0] < now - timedelta(seconds=self.period):
self.calls.popleft()
if len(self.calls) >= self.max_calls:
# 等待直到有空位
wait_time = (self.calls[0] + timedelta(seconds=self.period) - now).total_seconds()
await asyncio.sleep(wait_time)
self.calls.append(now)
# 每分钟最多 60 次 LLM 调用
llm_limiter = RateLimiter(max_calls=60, period=60)
async def rate_limited_llm_node(state: AgentState) -> dict:
await llm_limiter.acquire()
response = await llm.ainvoke(state["messages"])
return {"messages": [response]}
八、可观测性:生产监控必备
8.1 集成 LangSmith
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "research-agent-prod"
# 之后所有的 LangGraph 执行都会自动上报到 LangSmith
# 可以看到每个节点的输入输出、耗时、Token 消耗等
8.2 自定义回调
from langchain_core.callbacks import BaseCallbackHandler
from datetime import datetime
class AgentMonitorCallback(BaseCallbackHandler):
def __init__(self, metrics_client):
self.metrics = metrics_client
def on_chain_start(self, serialized, inputs, **kwargs):
chain_name = serialized.get("name", "unknown")
self.metrics.increment(f"agent.node.start.{chain_name}")
self.start_time = datetime.now()
def on_chain_end(self, outputs, **kwargs):
duration = (datetime.now() - self.start_time).total_seconds()
self.metrics.histogram("agent.node.duration", duration)
def on_chain_error(self, error, **kwargs):
self.metrics.increment("agent.node.error")
logger.error(f"节点执行错误: {error}")
# 使用回调
monitor = AgentMonitorCallback(prometheus_client)
result = agent.invoke(
{"messages": [HumanMessage(content=query)]},
config={
"configurable": {"thread_id": "prod-001"},
"callbacks": [monitor]
}
)
九、真实案例:代码审查 Agent
综合以上技术,实现一个完整的代码审查 Agent:
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
class CodeReviewState(TypedDict):
messages: Annotated[list, add_messages]
code_snippet: str
language: str
security_issues: list[str]
performance_issues: list[str]
style_issues: list[str]
overall_score: int # 0-100
review_approved: bool
revision_count: int
# ===== 节点实现 =====
def detect_language(state: CodeReviewState) -> dict:
"""检测编程语言"""
response = llm.invoke([
SystemMessage(content="判断代码使用的编程语言,只返回语言名称,如 Python、JavaScript、Go 等"),
HumanMessage(content=state["code_snippet"])
])
return {"language": response.content.strip()}
def security_audit(state: CodeReviewState) -> dict:
"""安全审查节点"""
response = llm.invoke([
SystemMessage(content=f"""你是 {state['language']} 安全专家。
审查代码中的安全问题,包括:SQL注入、XSS、命令注入、敏感信息泄露、权限漏洞等。
以 JSON 列表形式返回问题,如:["存在 SQL 注入风险:第3行", "硬编码密码:第7行"]
如果没有安全问题,返回 []"""),
HumanMessage(content=state["code_snippet"])
])
try:
import json
issues = json.loads(response.content)
except:
issues = [response.content] if response.content != "[]" else []
return {"security_issues": issues}
def performance_audit(state: CodeReviewState) -> dict:
"""性能审查节点(与安全审查并发执行)"""
response = llm.invoke([
SystemMessage(content=f"""你是 {state['language']} 性能优化专家。
审查代码中的性能问题,包括:N+1查询、不必要的循环、内存泄漏、阻塞I/O等。
以 JSON 列表形式返回问题。"""),
HumanMessage(content=state["code_snippet"])
])
try:
import json
issues = json.loads(response.content)
except:
issues = [response.content] if response.content.strip() else []
return {"performance_issues": issues}
def style_audit(state: CodeReviewState) -> dict:
"""代码风格审查节点(与前两个并发执行)"""
response = llm.invoke([
SystemMessage(content=f"""你是 {state['language']} 代码风格专家。
审查代码风格问题:命名规范、注释质量、函数长度、复杂度等。
以 JSON 列表形式返回问题。"""),
HumanMessage(content=state["code_snippet"])
])
try:
import json
issues = json.loads(response.content)
except:
issues = []
return {"style_issues": issues}
def score_and_summarize(state: CodeReviewState) -> dict:
"""汇总评分节点"""
security = state.get("security_issues", [])
performance = state.get("performance_issues", [])
style = state.get("style_issues", [])
# 计算分数(安全问题扣分最多)
base_score = 100
base_score -= len(security) * 15 # 每个安全问题扣15分
base_score -= len(performance) * 8 # 每个性能问题扣8分
base_score -= len(style) * 3 # 每个风格问题扣3分
score = max(0, min(100, base_score))
# 生成总结报告
summary_prompt = f"""基于以下审查结果,生成一份简洁的代码审查报告:
安全问题({len(security)}个):{security}
性能问题({len(performance)}个):{performance}
风格问题({len(style)}个):{style}
综合评分:{score}/100
要求:
1. 指出最关键的问题(按优先级)
2. 给出具体的修改建议
3. 语气专业但友好"""
response = llm.invoke([HumanMessage(content=summary_prompt)])
return {
"messages": [AIMessage(content=response.content)],
"overall_score": score
}
def check_approval(state: CodeReviewState) -> Literal["approve", "request_revision"]:
"""决策节点:是否通过审查"""
score = state.get("overall_score", 0)
security_issues = state.get("security_issues", [])
# 安全问题零容忍,或分数低于 60 都需要修改
if security_issues or score < 60:
return "request_revision"
return "approve"
def request_revision(state: CodeReviewState) -> dict:
"""请求修改节点"""
revision_count = state.get("revision_count", 0) + 1
msg = f"代码审查未通过(第{revision_count}次)。评分:{state['overall_score']}/100。请根据上述问题修改后重新提交。"
return {
"messages": [AIMessage(content=msg)],
"review_approved": False,
"revision_count": revision_count
}
def approve_code(state: CodeReviewState) -> dict:
"""批准节点"""
msg = f"✅ 代码审查通过!评分:{state['overall_score']}/100。代码质量良好,可以合并。"
return {
"messages": [AIMessage(content=msg)],
"review_approved": True
}
# ===== 构建图 =====
def build_code_review_agent():
workflow = StateGraph(CodeReviewState)
# 添加节点
workflow.add_node("detect_language", detect_language)
workflow.add_node("security_audit", security_audit)
workflow.add_node("performance_audit", performance_audit)
workflow.add_node("style_audit", style_audit)
workflow.add_node("score_and_summarize", score_and_summarize)
workflow.add_node("request_revision", request_revision)
workflow.add_node("approve_code", approve_code)
# 定义边
workflow.add_edge(START, "detect_language")
# 语言检测后,并发执行三个审查
workflow.add_edge("detect_language", "security_audit")
workflow.add_edge("detect_language", "performance_audit")
workflow.add_edge("detect_language", "style_audit")
# 三个审查完成后汇总
workflow.add_edge("security_audit", "score_and_summarize")
workflow.add_edge("performance_audit", "score_and_summarize")
workflow.add_edge("style_audit", "score_and_summarize")
# 条件判断
workflow.add_conditional_edges(
"score_and_summarize",
check_approval,
{
"approve": "approve_code",
"request_revision": "request_revision"
}
)
workflow.add_edge("approve_code", END)
workflow.add_edge("request_revision", END)
# 生产级 PostgreSQL checkpoint
checkpointer = PostgresSaver.from_conn_string(
"postgresql://localhost/code_review_db"
)
checkpointer.setup()
return workflow.compile(checkpointer=checkpointer)
# ===== 使用示例 =====
review_agent = build_code_review_agent()
test_code = """
import sqlite3
def get_user(username):
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
# 危险:直接拼接SQL,存在注入风险
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
user = cursor.fetchone()
conn.close()
return user
def process_users():
users = get_all_users()
result = []
for user in users:
# N+1 问题:每个用户单独查询订单
orders = get_user_orders(user['id'])
result.append({**user, 'orders': orders})
return result
"""
config = {"configurable": {"thread_id": "review-001"}}
result = review_agent.invoke(
{
"code_snippet": test_code,
"messages": [HumanMessage(content="请审查这段代码")],
"security_issues": [],
"performance_issues": [],
"style_issues": [],
"revision_count": 0
},
config=config
)
print(result["messages"][-1].content)
print(f"\n综合评分:{result['overall_score']}/100")
print(f"审查结果:{'通过' if result['review_approved'] else '需要修改'}")
十、LangGraph vs 竞品对比
| 维度 | LangGraph | CrewAI | AutoGen | 直接手写 |
|---|---|---|---|---|
| 学习曲线 | 中等 | 低 | 中等 | 低(但维护成本高) |
| 灵活性 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Human-in-the-loop | 原生支持 | 有限支持 | 支持 | 需手写 |
| 持久化 | 多种后端 | 有限 | 有限 | 需手写 |
| 可视化 | 内置 | 无 | 无 | 无 |
| 生产成熟度 | 高 | 中 | 中 | 取决于实现 |
| 并发支持 | 原生 | 无 | 有限 | 需手写 |
| 社区活跃 | 非常活跃 | 活跃 | 活跃 | N/A |
结论:如果你要在生产环境部署复杂的 Agent 系统,LangGraph 是目前最成熟的选择。如果只是快速原型,CrewAI 更简单。AutoGen 适合需要 Agent 间"自由对话"的场景。
十一、常见坑和最佳实践
坑 1:State 字段覆盖问题
# ❌ 错误:每次更新都会覆盖 messages,只保留最新一条
class BadState(TypedDict):
messages: list # 没有 reducer
# ✅ 正确:使用 add_messages reducer,自动追加
class GoodState(TypedDict):
messages: Annotated[list, add_messages]
坑 2:无限循环
# ❌ 危险:没有终止条件的条件边可能死循环
def route(state):
if not state["is_done"]:
return "retry_node" # 如果 retry_node 不更新 is_done,永远循环!
# ✅ 安全:添加最大重试次数限制
def route(state):
if not state["is_done"] and state["retry_count"] < 3:
return "retry_node"
return "end_node"
坑 3:checkpoint 泄漏
# ❌ 同一个 thread_id 重复使用会叠加历史状态
config = {"configurable": {"thread_id": "fixed-id"}} # 每次都用同一个
# ✅ 每个独立任务用唯一的 thread_id
import uuid
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
坑 4:节点函数不是纯函数
# ❌ 全局状态导致并发问题
counter = 0
def bad_node(state):
global counter
counter += 1 # 并发时会竞争
return {"count": counter}
# ✅ 状态都放 State 里,节点是纯函数
def good_node(state):
return {"count": state.get("count", 0) + 1}
最佳实践清单
- State 设计优先:在写任何节点之前,先设计好 State 结构
- 节点保持纯函数:不要在节点里维护全局状态
- 条件边要有兜底:所有条件分支都要有默认情况
- 生产用 PostgreSQL checkpoint:内存 Saver 不适合生产
- 流式输出改善用户体验:长任务一定要用
astream - 为重要节点加日志:方便排查 Agent 行为
十二、总结与展望
LangGraph 的出现代表了 AI Agent 工程化的一次重要进步:
它解决的核心问题:
- 复杂 Agent 流程的可维护性(图 vs 意大利面代码)
- Human-in-the-Loop 的生产级支持
- 多 Agent 协作的标准化接口
- 状态持久化和可恢复性
它的局限:
- 对于简单的单轮问答,它是过度设计
- 学习曲线比 CrewAI 更陡
- 图结构对动态变化的流程支持有限(节点和边是静态的)
2026 年的展望:随着 AI Agent 从实验走向大规模生产,LangGraph 这类"Agent 操作系统"会变得越来越重要。下一步值得关注的方向:
- LangGraph + 向量数据库:长期记忆的标准解决方案
- 跨语言 Agent 互操作:不同框架的 Agent 能互相调用
- Agent 评估框架:自动化测试 Agent 行为的基准体系
工具只是手段,搞清楚你的业务流程,才是用好 LangGraph 的关键。
本文代码示例基于 LangGraph 0.2.x,依赖:pip install langgraph langchain langchain-openai