DeerFlow 2.0 深度实战:字节跳动 49K Star 的超级智能体运行时——从 LangGraph DAG 到沙箱隔离的全链路架构解析
引言:当 AI Agent 从"聊天"走向"干活"
2026 年 2 月 28 日,字节跳动悄然把一个叫 DeerFlow 2.0 的项目推上了 GitHub。24 小时内,它登顶 GitHub Trending 第一名。一个月后,Star 数突破 4.9 万,日均增长超 1300 颗。这不是又一个套壳聊天机器人——它是一个能让 AI 真正"干活"的超级智能体运行时。
如果你用过 ChatGPT 的 Deep Research 功能,你大概体会过那种"给它一个问题,它自己搜索、阅读、整理、生成报告"的震撼。DeerFlow 做的事情本质相同,但走得更远:它不只做研究,还能写代码、跑代码、生成播客、制作 PPT,甚至接入飞书、企微、钉钉成为你的数字同事。
更关键的是——它是开源的,你可以自己部署、二次开发、私有化运行。
这篇文章不是一篇翻译 README 的水文。我会从架构设计的视角,逐层拆解 DeerFlow 2.0 的核心机制:LangGraph DAG 工作流怎么编排、11 层中间件链怎么协作、子智能体怎么动态生成、沙箱怎么隔离执行、记忆系统怎么维护长时程上下文——以及最重要的,这些设计决策背后的工程权衡。最后,我会带你从零搭建一个生产级部署。
一、从 Deep Research 到 SuperAgent Harness:DeerFlow 的架构演进
1.1 v1 的局限:固定 5 节点的天花板
DeerFlow 1.0 的架构很清晰:基于 LangGraph 构建 5 个固定节点——Coordinator、Planner、Researcher、Coder、Reporter。用户提一个问题,协调器接住,规划器拆解,研究员搜索,编码员分析,报告员汇总。一条直线走完。
问题也很明显:
- 扩展性差:想加个"播客生成"能力?得改底层图结构,加节点、加边、改路由逻辑
- 并行度低:5 个节点串行执行,Researcher 搜完 Coder 才能跑,没有利用多源搜索的并行潜力
- 状态管理粗糙:上下文在节点间直接传递,没有统一的中间层做状态聚合和持久化
1.2 v2 的重构:单一主智能体 + 11 层中间件 + 动态子智能体
DeerFlow 2.0 的架构完全重写,和 v1 没有任何共享代码。核心设计思路发生了根本转变:
┌─────────────────────────────────────────┐
│ Lead Agent (主智能体) │
│ 统一调度中心,所有请求的入口 │
├─────────────────────────────────────────┤
│ 11-Layer Middleware Chain │
│ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │
│ │ M1│→│ M2│→│ M3│→│ M4│→│...│→ ... │
│ └───┘ └───┘ └───┘ └───┘ └───┘ │
├─────────────────────────────────────────┤
│ Dynamic Sub-Agents Pool │
│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ │Plan│ │Srcht│ │Code│ │Rprt│ ... │
│ └────┘ └────┘ └────┘ └────┘ │
├─────────────────────────────────────────┤
│ Infrastructure Layer │
│ ┌──────┐ ┌───────┐ ┌──────┐ │
│ │Sandbox│ │Memory │ │Skills│ │
│ └──────┘ └───────┘ └──────┘ │
└─────────────────────────────────────────┘
Lead Agent 是唯一的调度中心。它不再硬编码"先规划、再搜索、再编码"的流程,而是根据任务类型和中间件返回的状态,动态决定下一步做什么。这种设计让系统具备了自适应能力:简单的查询可能一步出结果,复杂的研究任务可以展开成多层级的子智能体协作。
11 层中间件是 v2 的核心创新。每一层中间件负责一个维度的能力——知识提取、技能路由、沙箱调度、记忆管理等。中间件链式执行,可以随时中断或跳过。新增能力只需添加新中间件,不需要改主智能体的逻辑。
动态子智能体按需生成。Lead Agent 可以根据任务需要,动态创建 Planner、Researcher、Coder、Reporter 等子智能体实例,并让它们并行或串行执行。任务结束后,子智能体实例被回收。
1.3 为什么不用 LangGraph 的多节点图?
这是很多人会问的问题。LangGraph 本身支持多节点图(StateGraph),为什么 DeerFlow 2.0 反而退回"单主智能体"模式?
答案在于工程复杂度的控制。多节点图在 LangGraph 中需要精确定义每个节点的输入输出 schema、条件边、状态合并策略。当节点数从 5 增长到 15、20 时,图的管理复杂度急剧上升,调试和测试变成噩梦。
DeerFlow 2.0 的选择是:把编排逻辑交给 LLM 自己。Lead Agent 通过 tool calling 决定调用哪个子智能体、传什么参数、怎么聚合结果。中间件链提供约束和安全边界,但具体的编排逻辑由 LLM 在运行时动态决定。这是从"硬编码流程"到"AI 自主编排"的范式转变。
代价是什么?对 LLM 的能力要求更高了。如果模型太弱,编排质量会下降。这也是为什么 DeerFlow 官方推荐使用 Doubao-Seed-2.0-Code、DeepSeek v3.2 和 Kimi 2.5 这些强推理模型。
二、LangGraph DAG 工作流:DeerFlow 的执行引擎
2.1 LangGraph 基础:有向无环图 + 状态机
LangGraph 是 LangChain 团队推出的有状态多参与者应用框架。它的核心抽象是 StateGraph——一个有向无环图(DAG),节点是函数,边是状态转换规则。
一个最简单的 LangGraph 例子:
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class AgentState(TypedDict):
messages: list[str]
next_action: str
def think(state: AgentState) -> AgentState:
"""主智能体思考下一步"""
return {"next_action": "search", "messages": state["messages"] + ["Thinking..."]}
def search(state: AgentState) -> AgentState:
"""执行搜索"""
return {"messages": state["messages"] + ["Search results: ..."], "next_action": "respond"}
def respond(state: AgentState) -> AgentState:
"""生成最终回复"""
return {"messages": state["messages"] + ["Here is the answer..."]}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("think", think)
graph.add_node("search", search)
graph.add_node("respond", respond)
graph.add_edge(START, "think")
graph.add_conditional_edges("think", lambda s: s["next_action"], {"search": "search", "respond": "respond"})
graph.add_edge("search", "think") # 搜索结果回到思考
graph.add_edge("respond", END)
app = graph.compile()
DeerFlow 在这个基础上构建了更复杂的图结构。Lead Agent 是一个自循环节点:它不断调用工具(包括子智能体),直到任务完成或达到递归限制。
2.2 DeerFlow 的图结构:Lead Agent 的自循环
DeerFlow 2.0 的核心图结构可以简化为:
START → Lead Agent ⟲ (tool_call → execute → observe → tool_call → ...) → END
Lead Agent 通过 tool calling 机制驱动执行:
# 简化的 DeerFlow Lead Agent 工具注册
tools = [
plan_tool, # 规划任务
search_tool, # 搜索信息
code_execute_tool, # 执行代码
report_tool, # 生成报告
delegate_tool, # 委派给子智能体
memory_store_tool, # 存储记忆
memory_recall_tool, # 检索记忆
file_write_tool, # 写入文件
file_read_tool, # 读取文件
]
每次 Lead Agent 产出 tool_call,中间件链依次处理:
- KnowledgeExtractionMiddleware:从交互中提取知识、规则、SOP
- SkillRouterMiddleware:匹配已知技能模板
- SandboxDispatchMiddleware:决定代码在沙箱还是本地执行
- MemoryMiddleware:维护上下文窗口和长期记忆
- SecurityMiddleware:检查执行安全性
- SubAgentMiddleware:管理子智能体的生命周期
- ContextCompressionMiddleware:压缩超长上下文
- HumanInTheLoopMiddleware:必要时请求人类确认
- PodcastMiddleware:处理播客生成请求
- ReportMiddleware:处理报告格式化
- LoggingMiddleware:全链路日志追踪
2.3 递归限制与执行保障
LangGraph 的一个关键参数是 recursion_limit,它限制了图中节点的最大执行次数。DeerFlow 默认设为 100,防止无限循环:
# config.yaml 中的递归限制配置
session:
config:
recursion_limit: 100 # 最大执行步数
对于复杂任务(如深度研究),100 步可能不够。DeerFlow 支持按任务类型动态调整:
channels:
feishu:
session:
config:
recursion_limit: 150 # IM 渠道给更多步数
三、技能系统(Skills):模块化的能力单元
3.1 技能的定义与加载
DeerFlow 的技能是独立的能力模块,每个技能定义为一个 Python 包,包含:
skills/
└── my_custom_skill/
├── __init__.py # 技能注册入口
├── skill.yaml # 技能元数据
├── tools.py # 工具定义
└── prompts/ # 提示词模板
└── system.md
skill.yaml 定义技能的元信息:
name: my_custom_skill
display_name: My Custom Skill
description: "A custom skill for specific tasks"
version: "1.0.0"
tools:
- name: custom_search
description: "Search for specific information"
- name: custom_analyze
description: "Analyze data with custom logic"
工具的实现遵循 LangChain 的 tool 协议:
# skills/my_custom_skill/tools.py
from langchain_core.tools import tool
from pydantic import BaseModel, Field
class CustomSearchInput(BaseModel):
query: str = Field(description="搜索关键词")
max_results: int = Field(default=5, description="最大结果数")
@tool(args_schema=CustomSearchInput)
def custom_search(query: str, max_results: int = 5) -> str:
"""搜索特定领域的信息"""
# 实现搜索逻辑
results = perform_search(query, max_results)
return format_results(results)
@tool
def custom_analyze(data: str, analysis_type: str = "summary") -> str:
"""用自定义逻辑分析数据"""
if analysis_type == "summary":
return summarize(data)
elif analysis_type == "deep":
return deep_analyze(data)
return data
3.2 技能的动态注册与发现
DeerFlow 启动时自动扫描 skills/ 目录,发现并注册所有技能。运行时,Lead Agent 通过技能描述匹配用户请求到合适的技能:
# 简化的技能注册逻辑
class SkillRegistry:
def __init__(self, skills_path: str):
self.skills = {}
self._discover_skills(skills_path)
def _discover_skills(self, path: str):
for skill_dir in Path(path).iterdir():
if skill_dir.is_dir() and (skill_dir / "skill.yaml").exists():
config = yaml.safe_load((skill_dir / "skill.yaml").read_text())
self.skills[config["name"]] = {
"config": config,
"tools": self._load_tools(skill_dir / "tools.py"),
}
def match_skill(self, query: str) -> list[dict]:
"""根据查询匹配最相关的技能"""
matched = []
for name, skill in self.skills.items():
if self._is_relevant(query, skill["config"]["description"]):
matched.append(skill)
return matched
3.3 Claude Code 集成:让 AI 编写 AI 技能
DeerFlow 2.0 的一个亮眼特性是支持 Claude Code 集成。你可以直接让 Claude Code 帮你创建新技能:
# 用 Claude Code 生成新技能
claude-code "Create a new DeerFlow skill that can scrape product prices from e-commerce sites"
Claude Code 会根据 DeerFlow 的技能规范自动生成目录结构、工具定义、提示词模板和测试代码。这种"AI 写 AI 技能"的元编程能力,极大降低了扩展门槛。
四、子智能体系统:分工协作的"虚拟团队"
4.1 核心子智能体及其职责
DeerFlow 2.0 定义了几个核心子智能体角色:
| 子智能体 | 职责 | 输入 | 输出 |
|---|---|---|---|
| Planner | 拆解任务、制定执行计划 | 用户原始请求 | 分步执行计划 |
| Researcher | 搜索和整理信息 | 搜索关键词列表 | 结构化搜索结果 |
| Coder | 编写和执行代码 | 数据处理需求 | 代码执行结果 |
| Reporter | 整合结果、生成报告 | 所有子智能体输出 | 结构化报告 |
| Podcaster | 生成播客脚本和音频 | 报告内容 | 播客音频文件 |
4.2 子智能体的动态创建
与 v1 的固定节点不同,v2 的子智能体是按需动态创建的:
# 简化的子智能体创建逻辑
class SubAgentFactory:
def __init__(self, model_config: dict, tools_registry: dict):
self.model_config = model_config
self.tools_registry = tools_registry
def create_researcher(self, search_tools: list[str]) -> Agent:
"""创建研究员子智能体"""
tools = [self.tools_registry[t] for t in search_tools]
return Agent(
model=self.model_config["model"],
system_prompt=RESEARCHER_SYSTEM_PROMPT,
tools=tools,
max_iterations=10,
)
def create_coder(self, sandbox_config: dict) -> Agent:
"""创建编码员子智能体"""
return Agent(
model=self.model_config["model"],
system_prompt=CODER_SYSTEM_PROMPT,
tools=[code_execute_tool, file_write_tool, file_read_tool],
sandbox=sandbox_config,
max_iterations=15,
)
Lead Agent 通过 delegate 工具将子任务委派给子智能体:
# Lead Agent 的委派工具
@tool
def delegate(subagent_type: str, task: str, context: dict) -> str:
"""将子任务委派给专门的子智能体
Args:
subagent_type: 子智能体类型 (planner/researcher/coder/reporter)
task: 具体任务描述
context: 上下文信息
"""
agent = factory.create(subagent_type)
result = agent.run(task, context)
return result
4.3 并行子智能体执行
对于独立子任务,DeerFlow 支持并行执行多个子智能体:
import asyncio
async def run_parallel_research(queries: list[str]) -> list[str]:
"""并行执行多个搜索任务"""
tasks = []
for query in queries:
researcher = factory.create_researcher(["tavily_search", "brave_search"])
tasks.append(researcher.arun(query))
results = await asyncio.gather(*tasks)
return results
这在实际研究中效果显著。比如你需要同时搜索"大语言模型在医疗诊断中的应用"、"LLM 在金融风控中的实践"、"AI Agent 法律合规",三个 Researcher 可以并行搜索,总耗时取决于最慢的那个,而不是三者之和。
五、沙箱与文件系统:安全执行的隔离边界
5.1 三种沙箱模式
DeerFlow 提供 3 种代码执行环境,安全级别逐级递增:
模式 1:本地执行(Local Execution)
# config.yaml
sandbox:
use: local # 代码直接在宿主机执行
最简单,也最危险。代码直接运行在宿主机上,没有隔离。适合受信环境下的快速开发,绝对不要在生产环境使用。
模式 2:Docker 容器执行
sandbox:
use: deerflow.community.docker_sandbox:DockerSandboxProvider
image: deerflow/sandbox:latest
timeout: 300 # 单次执行超时(秒)
max_memory: "2g" # 最大内存
代码在独立的 Docker 容器中执行,文件系统、网络、进程全部隔离。这是推荐的日常使用模式。
模式 3:Kubernetes Pod 执行
sandbox:
use: deerflow.community.aio_sandbox:AioSandboxProvider
provisioner_url: http://provisioner:8080
namespace: deerflow-sandbox
resource_limits:
cpu: "2"
memory: "4Gi"
ephemeral_storage: "10Gi"
代码在 K8s 集群中动态创建的 Pod 里执行,支持资源配额、网络策略、镜像白名单。这是生产级部署的推荐方案。
5.2 三层文件系统架构
DeerFlow 设计了三层文件目录,规范文件流转:
.deer-flow/
├── upload/ # 上传层:用户上传的原始文件
├── workspace/ # 工作层:智能体操作的中间文件
└── output/ # 输出层:最终交付的成果文件
# 文件流转示例
class FileSystemManager:
def upload(self, file: UploadFile) -> str:
"""用户上传文件 → upload 目录"""
path = f".deer-flow/upload/{file.filename}"
self._save(path, file.content)
return path
def to_workspace(self, upload_path: str) -> str:
"""从 upload 移动到 workspace"""
filename = Path(upload_path).name
workspace_path = f".deer-flow/workspace/{filename}"
shutil.copy2(upload_path, workspace_path)
return workspace_path
def to_output(self, workspace_path: str) -> str:
"""从 workspace 移动到 output"""
filename = Path(workspace_path).name
output_path = f".deer-flow/output/{filename}"
shutil.copy2(workspace_path, output_path)
return output_path
这个设计保证:
- 上传的原始文件不会被修改(只读副本进入 workspace)
- 工作区的中间文件不会污染输出(只有最终结果进入 output)
- 沙箱只能访问 workspace(最小权限原则)
5.3 沙箱安全最佳实践
在生产环境部署 DeerFlow 沙箱时,需要注意:
# 生产级 Docker 沙箱配置
sandbox:
use: deerflow.community.docker_sandbox:DockerSandboxProvider
image: deerflow/sandbox:latest
timeout: 300
max_memory: "2g"
# 安全加固
security:
read_only_root: true # 只读根文件系统
drop_capabilities: ALL # 丢弃所有 Linux capabilities
no_new_privileges: true # 禁止权限提升
network_mode: none # 无网络访问
pids_limit: 64 # 限制进程数
user: "1000:1000" # 非 root 用户运行
对于需要网络访问的场景(如 API 调用),可以通过白名单控制:
# 网络白名单中间件
class NetworkWhitelistMiddleware:
ALLOWED_DOMAINS = [
"api.openai.com",
"api.anthropic.com",
"search.tavily.com",
]
def check_request(self, url: str) -> bool:
domain = urlparse(url).netloc
return any(domain.endswith(d) for d in self.ALLOWED_DOMAINS)
六、长期记忆系统:让智能体"记住"过去
6.1 记忆的三层架构
DeerFlow 的记忆系统分三层:
┌─────────────────────────┐
│ Working Memory (工作记忆) │ ← 当前会话的上下文窗口
│ LLM 上下文中的信息 │
├─────────────────────────┤
│ Episodic Memory (情节记忆)│ ← 历史会话摘要
│ 过去对话的压缩表示 │
├─────────────────────────┤
│ Semantic Memory (语义记忆)│ ← 提取的知识、规则、SOP
│ 结构化的长期知识库 │
└─────────────────────────┘
6.2 知识提取中间件
KnowledgeExtractionMiddleware 是记忆系统的核心。它在 Lead Agent 每次交互后,自动从对话中提取三类知识:
# 简化的知识提取逻辑
class KnowledgeExtractionMiddleware:
def after_agent(self, state: AgentState) -> AgentState:
"""在 Agent 执行后提取知识"""
# 第一步:检查开关
if not state.get("enable_knowledge_extraction"):
return state
# 判断是否深度研究(深度研究提取更多知识)
is_deep = state.get("is_deep_research", False)
# 第二步:提取事实性知识
knowledge = self.extract_knowledge(
state["messages"],
extraction_type="knowledge"
)
# 第三步:提取规则
rules = self.extract_knowledge(
state["messages"],
extraction_type="rules"
)
# 第四步:提取标准操作流程(SOP)
sop = self.extract_knowledge(
state["messages"],
extraction_type="sop"
)
# 第五步:将提取的知识打包为技能
if is_deep and (knowledge or rules or sop):
self.package_as_skill(knowledge, rules, sop)
return state
def extract_knowledge(self, messages: list, extraction_type: str) -> dict:
"""使用 LLM 从对话中提取结构化知识"""
prompt = f"""
从以下对话中提取{extraction_type}:
{format_messages(messages)}
以 JSON 格式输出,包含:
- title: 知识标题
- content: 知识内容
- tags: 标签列表
- confidence: 置信度 (0-1)
"""
result = self.llm.invoke(prompt)
return parse_json(result)
6.3 记忆的持久化与检索
DeerFlow 支持多种记忆后端:
# 本地文件记忆
memory:
type: local
path: .deer-flow/memory
# 向量数据库记忆(适合大规模部署)
memory:
type: vector
backend: chromadb # 或 pinecone, weaviate, vikingdb
embedding_model: text-embedding-3-small
collection: deerflow_memory
# RAGFlow 集成(企业知识库)
memory:
type: ragflow
api_url: http://ragflow:9380
api_key: $RAGFLOW_API_KEY
knowledge_base: company_docs
检索时,DeerFlow 使用混合检索策略:先关键词匹配粗筛,再向量相似度精排:
class MemoryRetriever:
def recall(self, query: str, top_k: int = 5) -> list[MemoryItem]:
"""混合检索相关记忆"""
# 关键词检索
keyword_results = self.keyword_search(query, limit=top_k * 3)
# 向量检索
vector_results = self.vector_search(query, limit=top_k * 3)
# 合并去重,按相关性排序
merged = self.merge_and_rerank(
keyword_results, vector_results, query
)
return merged[:top_k]
七、IM 渠道集成:从 Web 到企微飞书的全域接入
7.1 六大 IM 渠道一键接入
DeerFlow 2.0 支持直接接入 6 个即时通讯渠道,无需公网 IP:
| 渠道 | 传输方式 | 接入难度 |
|---|---|---|
| Telegram | Bot API (长轮询) | 简单 |
| Slack | Socket Mode | 中等 |
| 飞书 / Lark | WebSocket | 中等 |
| 微信 | 腾讯 iLink (长轮询) | 中等 |
| 企业微信 | WebSocket | 中等 |
| 钉钉 | Stream Push (WebSocket) | 中等 |
配置方式统一在 config.yaml 中:
channels:
# 全局设置
langgraph_url: http://localhost:8001/api
gateway_url: http://localhost:8001
# 全局会话默认值
session:
assistant_id: lead_agent
config:
recursion_limit: 100
context:
thinking_enabled: true
is_plan_mode: false
subagent_enabled: false
# 飞书
feishu:
enabled: true
app_id: $FEISHU_APP_ID
app_secret: $FEISHU_APP_SECRET
# 企业微信
wecom:
enabled: true
bot_id: $WECOM_BOT_ID
bot_secret: $WECOM_BOT_SECRET
# 钉钉
dingtalk:
enabled: true
client_id: $DINGTALK_CLIENT_ID
client_secret: $DINGTALK_CLIENT_SECRET
card_template_id: "" # AI 卡模板(流式打字效果)
7.2 渠道适配器架构
每个 IM 渠道都有一个适配器,负责消息格式转换和协议适配:
class ChannelAdapter(ABC):
"""IM 渠道适配器基类"""
@abstractmethod
async def start(self):
"""启动渠道监听"""
pass
@abstractmethod
async def on_message(self, raw_message: dict) -> UnifiedMessage:
"""将渠道原始消息转换为统一格式"""
pass
@abstractmethod
async def send_message(self, response: AgentResponse, context: dict):
"""将 Agent 响应转换为渠道格式并发送"""
pass
async def handle_message(self, raw_message: dict):
"""消息处理主流程"""
# 1. 转换消息格式
message = await self.on_message(raw_message)
# 2. 调用 Gateway API
response = await self.call_agent(message)
# 3. 发送响应
await self.send_message(response, raw_message)
7.3 用户级别的会话隔离
DeerFlow 支持按用户配置不同的 Agent 和参数:
channels:
wecom:
enabled: true
bot_id: $WECOM_BOT_ID
bot_secret: $WECOM_BOT_SECRET
# 默认配置
session:
assistant_id: mobile-agent
context:
thinking_enabled: false
# 特定用户配置
users:
"123456789": # 用户 ID
assistant_id: vip-agent
config:
recursion_limit: 150
context:
thinking_enabled: true
subagent_enabled: true
这为企业场景提供了灵活性:普通用户用轻量级 Agent,VIP 用户用增强版 Agent。
八、从零搭建:生产级部署实战
8.1 环境准备
# 克隆仓库
git clone https://github.com/bytedance/deer-flow.git
cd deer-flow
# 运行交互式配置向导(约 2 分钟)
make setup
# 向导会引导你:
# 1. 选择 LLM 提供商(OpenAI / DeepSeek / Kimi / vLLM 等)
# 2. 配置搜索工具(Tavily / Brave Search / InfoQuest)
# 3. 选择沙箱模式(Local / Docker / K8s)
# 4. 设置安全策略(Bash 访问 / 文件写入权限)
# 验证配置
make doctor
8.2 Docker 部署(推荐生产方案)
# 初始化沙箱镜像(只需一次)
make docker-init
# 启动所有服务
make docker-start
# 访问 http://localhost:2026
Docker Compose 会启动以下服务:
# 简化的 docker-compose 架构
services:
gateway: # API 网关 + Agent 运行时
image: deerflow/gateway:latest
ports: ["2026:2026", "8001:8001"]
frontend: # Next.js 前端
image: deerflow/frontend:latest
sandbox: # Docker-in-Docker 沙箱
image: deerflow/sandbox:latest
privileged: true # 需要创建子容器
volumes:
- /var/run/docker.sock:/var/run/docker.sock
nginx: # 反向代理
image: nginx:alpine
ports: ["80:80", "443:443"]
8.3 Kubernetes 生产部署
对于需要水平扩展和高可用的场景,DeerFlow 支持 K8s 部署:
# deerflow-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: deerflow-gateway
spec:
replicas: 3
selector:
matchLabels:
app: deerflow-gateway
template:
metadata:
labels:
app: deerflow-gateway
spec:
containers:
- name: gateway
image: deerflow/gateway:latest
resources:
requests:
cpu: "4"
memory: "8Gi"
limits:
cpu: "8"
memory: "16Gi"
env:
- name: DEER_FLOW_CONFIG_PATH
value: /config/config.yaml
volumeMounts:
- name: config
mountPath: /config
volumes:
- name: config
configMap:
name: deerflow-config
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: deerflow-provisioner
spec:
replicas: 2
template:
spec:
containers:
- name: provisioner
image: deerflow/provisioner:latest
resources:
requests:
cpu: "2"
memory: "4Gi"
8.4 性能调优
模型选择与配置:
# 针对不同任务的模型配置
models:
# 主 Agent 用强推理模型
- name: deepseek-v3.2
display_name: DeepSeek V3.2
use: langchain_openai:ChatOpenAI
model: deepseek-chat
api_key: $DEEPSEEK_API_KEY
base_url: https://api.deepseek.com/v1
# 子 Agent 用快速模型
- name: gpt-4o-mini
display_name: GPT-4o Mini
use: langchain_openai:ChatOpenAI
model: gpt-4o-mini
api_key: $OPENAI_API_KEY
# 推理任务用思维链模型
- name: kim-2.5-thinking
display_name: Kimi 2.5 Thinking
use: langchain_openai:ChatOpenAI
model: moonshot-v1-auto
api_key: $KIMI_API_KEY
base_url: https://api.moonshot.cn/v1
supports_thinking: true
并发与限流:
# 并发控制中间件
class ConcurrencyMiddleware:
def __init__(self, max_concurrent_runs: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent_runs)
self.active_runs = 0
async def acquire(self):
await self.semaphore.acquire()
self.active_runs += 1
async def release(self):
self.active_runs -= 1
self.semaphore.release()
@property
def utilization(self) -> float:
return self.active_runs / self._max
上下文压缩:对于长对话,DeerFlow 会自动压缩上下文,避免 Token 溢出:
class ContextCompressionMiddleware:
def compress(self, messages: list[BaseMessage], max_tokens: int = 8000) -> list[BaseMessage]:
"""压缩上下文到指定 Token 数"""
total_tokens = count_tokens(messages)
if total_tokens <= max_tokens:
return messages
# 保留系统消息和最近 N 轮
system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
recent_msgs = messages[-6:] # 最近 3 轮
# 中间消息压缩为摘要
middle_msgs = messages[len(system_msgs):-6]
if middle_msgs:
summary = self.llm.invoke(
f"请将以下对话历史压缩为简洁摘要:\n{format_messages(middle_msgs)}"
)
compressed = [HumanMessage(content=f"[历史摘要]\n{summary}")]
else:
compressed = []
return system_msgs + compressed + recent_msgs
九、MCP Server 集成:标准化工具协议
9.1 MCP 协议简介
MCP(Model Context Protocol)是 Anthropic 提出的标准化工具集成协议。DeerFlow 2.0 原生支持 MCP Server,可以直接接入任何兼容 MCP 的工具服务。
# config.yaml - MCP Server 配置
mcp_servers:
# HTTP/SSE 类型的 MCP Server
- name: filesystem
transport: sse
url: http://localhost:3001/sse
headers:
Authorization: Bearer $MCP_FS_TOKEN
# 支持OAuth的 MCP Server
- name: company-api
transport: sse
url: http://api.internal/mcp
oauth:
grant_type: client_credentials
token_url: https://auth.internal/oauth/token
client_id: $MCP_CLIENT_ID
client_secret: $MCP_CLIENT_SECRET
9.2 自定义 MCP Server
你可以轻松编写自己的 MCP Server 并接入 DeerFlow:
# custom_mcp_server.py
from mcp.server import Server
from mcp.types import Tool, TextContent
server = Server("my-custom-tools")
@server.tool()
async def query_database(sql: str) -> str:
"""执行 SQL 查询并返回结果"""
result = await db.execute(sql)
return str(result)
@server.tool()
async def send_notification(title: str, body: str) -> str:
"""发送通知到团队频道"""
await notify(title, body)
return "Notification sent"
if __name__ == "__main__":
server.run(transport="sse")
十、可观测性:LangSmith 与 Langfuse 集成
10.1 LangSmith Tracing
DeerFlow 原生支持 LangSmith 追踪,可以可视化 Agent 的每一步执行:
# config.yaml
tracing:
langsmith:
enabled: true
api_key: $LANGSMITH_API_KEY
project: deerflow-production
tags:
- deerflow-v2
- production
在 LangSmith Dashboard 中,你可以看到:
- 每次工具调用的输入输出
- Agent 的推理过程(thinking tokens)
- 子智能体的执行链路
- Token 消耗和延迟统计
10.2 Langfuse 集成(开源替代)
如果不想用 LangSmith(毕竟是付费服务),DeerFlow 也支持开源的 Langfuse:
tracing:
langfuse:
enabled: true
public_key: $LANGFUSE_PUBLIC_KEY
secret_key: $LANGFUSE_SECRET_KEY
host: http://langfuse:3000 # 自托管
甚至可以同时启用两者,用 LangSmith 做实时监控,用 Langfuse 做历史分析:
tracing:
langsmith:
enabled: true
api_key: $LANGSMITH_API_KEY
langfuse:
enabled: true
public_key: $LANGFUSE_PUBLIC_KEY
secret_key: $LANGFUSE_SECRET_KEY
十一、安全红线:部署前必须知道的
DeerFlow 官方文档明确警告:不当部署可能引入安全风险。
11.1 核心安全建议
- 永远不要在公网暴露未认证的 API:DeerFlow 的 Gateway 默认监听所有接口,部署时务必加认证层
- 生产环境必须用 Docker/K8s 沙箱:Local 模式下,AI 生成的代码直接在你的机器上执行
- 限制文件写入范围:配置
file_write工具的白名单目录 - 启用 Human-in-the-Loop:对关键操作(如发送邮件、执行支付)要求人工确认
- 定期审计沙箱镜像:确保没有供应链攻击
11.2 最小权限配置模板
# 安全加固配置
sandbox:
use: deerflow.community.docker_sandbox:DockerSandboxProvider
security:
network_mode: none
read_only_root: true
drop_capabilities: ALL
pids_limit: 64
tools:
file_write:
enabled: true
allowed_paths:
- .deer-flow/workspace
- .deer-flow/output
max_file_size: 10MB
bash:
enabled: false # 禁用 Bash 直接执行
human_in_the_loop:
enabled: true
trigger_on:
- send_email
- file_delete
- external_api_write
十二、实战案例:用 DeerFlow 构建自动化技术调研系统
让我们把前面学到的知识串起来,构建一个完整的技术调研自动化系统。
12.1 需求定义
目标:给定一个技术主题,自动完成以下流程:
- 规划调研路径(确定搜索关键词和子主题)
- 并行搜索多个信息源
- 对搜索结果进行深度分析(含代码示例验证)
- 生成结构化的技术调研报告
- 将报告推送到飞书群
12.2 配置与实现
# tech-research-config.yaml
models:
- name: deepseek-v3.2
display_name: DeepSeek V3.2
use: langchain_openai:ChatOpenAI
model: deepseek-chat
api_key: $DEEPSEEK_API_KEY
base_url: https://api.deepseek.com/v1
sandbox:
use: deerflow.community.docker_sandbox:DockerSandboxProvider
image: deerflow/sandbox:latest
timeout: 600
channels:
feishu:
enabled: true
app_id: $FEISHU_APP_ID
app_secret: $FEISHU_APP_SECRET
memory:
type: vector
backend: chromadb
embedding_model: text-embedding-3-small
自定义技能:
# skills/tech_research/tools.py
from langchain_core.tools import tool
from pydantic import BaseModel, Field
class TechResearchInput(BaseModel):
topic: str = Field(description="调研主题")
depth: str = Field(default="medium", description="调研深度: quick/medium/deep")
focus_areas: list[str] = Field(default_factory=list, description="关注领域")
@tool(args_schema=TechResearchInput)
def tech_research(topic: str, depth: str = "medium", focus_areas: list[str] = None) -> str:
"""执行技术调研,生成结构化报告
会自动:
1. 拆解调研路径
2. 并行搜索多个信息源
3. 深度分析搜索结果
4. 生成技术调研报告
"""
# DeerFlow 的 Lead Agent 会自动编排子智能体
# 这里只需声明意图,执行由框架处理
return f"Starting tech research on: {topic} (depth={depth})"
12.3 运行效果
# 通过 API 触发调研
curl -X POST http://localhost:8001/api/runs \
-H "Content-Type: application/json" \
-d '{
"assistant_id": "lead_agent",
"input": {
"messages": [
{
"role": "user",
"content": "请对 WebAssembly 在服务端的应用进行深度技术调研,关注 WasmEdge、Wasmtime 和 Spin 框架的性能对比"
}
]
},
"config": {
"recursion_limit": 150,
"context": {
"thinking_enabled": true,
"is_plan_mode": false,
"subagent_enabled": true
}
}
}'
DeerFlow 的执行流程:
- Planner 拆解调研路径:搜索 WebAssembly 服务端应用概况 → 搜索 WasmEdge 架构和性能 → 搜索 Wasmtime 特性 → 搜索 Spin 框架用法 → 性能基准测试 → 综合对比
- Researcher × 4 并行搜索 4 个子主题
- Coder 编写性能基准测试代码并在沙箱中执行
- Reporter 整合所有结果,生成含代码示例和性能数据的结构化报告
- 飞书适配器 将报告推送到指定群聊
整个流程大约需要 5-15 分钟(取决于调研深度),全程无需人工干预。
十三、与其他 Agent 框架的对比
| 维度 | DeerFlow 2.0 | OpenManus | AutoGen | CrewAI |
|---|---|---|---|---|
| 开发者 | 字节跳动 | CAMEL-AI | Microsoft | CrewAI |
| GitHub Star | 49K+ | 2.9K | 42K+ | 30K+ |
| 核心架构 | 单主Agent + 中间件链 | 固定多Agent | 对话式多Agent | 角色扮演多Agent |
| 执行引擎 | LangGraph | LangChain | 自研 | LangGraph |
| 沙箱支持 | Docker/K8s/Local | 有限 | 无 | 无 |
| 长期记忆 | 内置三层 | 无 | 需自建 | 需自建 |
| IM 集成 | 6大渠道 | 无 | 无 | 无 |
| 技能系统 | 动态加载 | 无 | 插件 | 工具 |
| MCP 支持 | 原生 | 无 | 无 | 有限 |
| 私有化部署 | 完全支持 | 有限 | 支持 | 支持 |
DeerFlow 的核心优势在于全栈能力:从搜索到编码到执行到输出到推送,一条龙。其他框架通常需要你自己组合多个工具才能达到类似效果。
十四、总结与展望
DeerFlow 2.0 代表了 AI Agent 从"玩具"到"工具"的关键转折。它的架构选择——单主智能体 + 中间件链 + 动态子智能体——是对工程复杂度和灵活性的一次精准权衡。
值得学习的架构思想:
- 中间件链模式:把横切关注点从主逻辑中剥离,新增能力只加中间件
- 动态子智能体:按需创建、并行执行、用完回收,比固定图更灵活
- 三层文件系统:upload → workspace → output,保证数据安全流转
- 三层记忆架构:Working → Episodic → Semantic,让 Agent 越用越聪明
当前不足:
- 强依赖 LLM 的编排能力,弱模型表现不佳
- 沙箱冷启动较慢(Docker 容器创建需 2-5 秒)
- 知识提取的准确性依赖模型,可能出现幻觉
- IM 渠道的流式输出体验还有优化空间
未来方向:
DeerFlow 团队正在推进几个关键特性:多模态输入(图片、视频理解)、Agent 间协作(多个 DeerFlow 实例联合工作)、以及更细粒度的权限控制(基于 RBAC 的技能授权)。这些特性将进一步推动 DeerFlow 从"研究工具"走向"通用智能体平台"。
如果你正在寻找一个能真正落地的 AI Agent 框架——不是 Demo,不是概念验证,而是能在生产环境跑起来的——DeerFlow 2.0 值得你认真评估。
项目地址:https://github.com/bytedance/deer-flow
官方文档:https://deerflow.tech
推荐模型:Doubao-Seed-2.0-Code / DeepSeek v3.2 / Kimi 2.5
最低配置:4 vCPU / 8 GB RAM / 20 GB SSD
推荐配置:8 vCPU / 16 GB RAM / 40 GB SSD + Docker