DeerFlow 2.0 源码级深度解析:字节跳动如何用 Lead Agent + Middleware Chain 重新定义 AI Agent 运行时
引言:从聊天机器人到执行系统的关键一跃
2026年2月28日,字节跳动在GitHub上开源了DeerFlow 2.0,30天内斩获近4.9万Star,登顶GitHub Trending榜首。但这不是又一个"AI聊天框架"——DeerFlow 2.0的核心命题是:让AI从"能说"进化到"能做"。
在DeerFlow之前,市面上的Agent框架基本都卡在同一个瓶颈上:单轮对话式交互。你问一句,它答一句,遇到需要多步骤、长时间执行的任务就歇菜。DeerFlow 2.0彻底推翻了1.0的架构,从"深度研究框架"重构为"Super Agent Harness"——一个能接收任务、拆解任务、调度子智能体、沙盒执行代码、持久化记忆、甚至自动提取知识打包成Skill的完整运行时。
这篇文章不是产品介绍,也不是部署教程。我们要做的是拆开DeerFlow 2.0的源码,从架构设计到中间件链,从沙盒隔离到知识提取,逐层分析字节跳动团队在工程层面的技术决策,帮你理解:一个生产级Agent运行时到底应该怎么设计。
一、架构全景:Lead Agent + Middleware Chain + Dynamic Sub-agents
DeerFlow 2.0的架构可以用一句话概括:一个主智能体(Lead Agent)驱动中间件链(Middleware Chain),按需动态派生子智能体(Sub-agents)。
┌─────────────────────────────────────────────────┐
│ Gateway API │
│ (LangGraph Server) │
└──────────────────────┬──────────────────────────┘
│
┌────────▼────────┐
│ Lead Agent │
│ (主调度智能体) │
└────────┬────────┘
│
┌─────────────▼──────────────┐
│ Middleware Chain │
│ ┌─────────────────────┐ │
│ │ ThreadDataMiddleware │ │
│ ├─────────────────────┤ │
│ │ SandboxMiddleware │ │
│ ├─────────────────────┤ │
│ │ TitleMiddleware │ │
│ ├─────────────────────┤ │
│ │ UploadsMiddleware │ │
│ ├─────────────────────┤ │
│ │ KnowledgeExtract │ │
│ │ Middleware │ │
│ └─────────────────────┘ │
└─────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ Dynamic Sub-agents │
│ ┌───────┐ ┌──────┐ ┌────┐ │
│ │ Coder │ │Research│ │Art │ │
│ └───────┘ └──────┘ └────┘ │
└────────────────────────────┘
1.1 Lead Agent:不是简单的任务分发器
Lead Agent是整个系统的入口和调度核心。它不是传统意义上的"路由器"——简单地根据意图把请求转发给不同的子智能体。它更像是一个项目经理:
- 任务理解与拆解:接收用户的自然语言任务,分析需要哪些步骤、哪些子智能体参与
- 执行计划生成:生成有依赖关系的执行DAG(有向无环图),而非简单的线性步骤列表
- 上下文管理:维护整个任务的生命周期上下文,包括中间结果、失败重试策略、子智能体间的数据传递
- 反思与修正:在子智能体返回结果后,评估质量,决定是否需要重试或调整策略
# backend/src/agents/lead_agent/agent.py 核心逻辑简化
class LeadAgent:
def __init__(self, config):
self.model = config.model
self.middlewares = self._init_middlewares(config)
self.sub_agent_factory = SubAgentFactory(config)
self.memory = MemoryManager(config)
async def run(self, task: str, context: AgentContext):
# 1. 中间件前置处理
enriched_context = await self.middlewares.before_agent(context)
# 2. 任务规划:生成执行DAG
plan = await self._plan(task, enriched_context)
# 3. 按依赖关系调度子智能体
results = {}
for stage in plan.topological_order():
stage_results = await self._execute_stage(
stage,
context=enriched_context,
prior_results=results
)
results.update(stage_results)
# 4. 中间件后置处理(知识提取等)
final_context = await self.middlewares.after_agent(
enriched_context, results
)
return final_context
关键设计决策:Lead Agent本身不执行任何具体任务。它只做"思考"和"调度"。这种分离确保了:
- Lead Agent的上下文不会被大量执行细节污染
- 子智能体可以独立扩展和替换
- 失败隔离:某个子智能体崩溃不会影响整体调度
1.2 中间件链:DeerFlow的真正灵魂
如果说Lead Agent是大脑,那Middleware Chain就是神经系统的骨干。DeerFlow 2.0的所有横切关注点——沙盒隔离、知识提取、标题生成、文件上传——全部通过中间件实现。
这种设计让核心Agent逻辑保持纯粹,而功能扩展只需要添加新的中间件。
# 中间件基类
class AgentMiddleware(ABC):
@abstractmethod
async def before_agent(self, context: AgentContext) -> AgentContext:
"""Agent执行前的预处理"""
pass
@abstractmethod
async def after_agent(self, context: AgentContext, result: AgentResult) -> AgentContext:
"""Agent执行后的后处理"""
pass
# 中间件链示例
class MiddlewareChain:
def __init__(self, middlewares: list[AgentMiddleware]):
self.middlewares = middlewares
async def before_agent(self, context: AgentContext) -> AgentContext:
for mw in self.middlewares:
context = await mw.before_agent(context)
return context
async def after_agent(self, context: AgentContext, result: AgentResult) -> AgentContext:
# 逆序执行后置处理
for mw in reversed(self.middlewares):
context = await mw.after_agent(context, result)
return context
当前版本内置的核心中间件:
| 中间件 | 职责 | 执行时机 |
|---|---|---|
ThreadDataMiddleware | 线程级数据注入与上下文传递 | before_agent |
SandboxMiddleware | 沙盒环境初始化与资源管理 | before_agent + after_agent |
TitleMiddleware | 自动生成对话标题 | after_agent |
UploadsMiddleware | 文件上传处理与资源清理 | after_agent |
KnowledgeExtractionMiddleware | 知识提取、规则提取、SOP提取、Skill打包 | after_agent |
其中最值得关注的是SandboxMiddleware和KnowledgeExtractionMiddleware,我们后面单独拆解。
二、Sub-agents 系统:从"单打独斗"到"团队协作"
DeerFlow 2.0内置了多种子智能体,每种专注于特定领域:
/mnt/skills/
├── public/ # 公共技能
│ └── SKILL.md
├── research/ # 深度研究智能体
│ └── SKILL.md
├── report-generation/ # 报告生成智能体
│ └── SKILL.md
├── slide-creation/ # PPT生成智能体
│ └── SKILL.md
├── web-page/ # 网页创建智能体
│ └── SKILL.md
└── image-generation/ # 图像生成智能体
└── SKILL.md
2.1 子智能体的动态创建与销毁
不同于传统框架在启动时就初始化所有Agent,DeerFlow采用按需创建策略:
# backend/src/agents/agent.py - Agent工厂
class AgentFactory:
def __init__(self, config):
self.skill_registry = SkillRegistry(config.skills_path)
self._agent_pool = {} # 复用池
async def create_agent(self, agent_type: str, context: AgentContext):
# 检查复用池
if agent_type in self._agent_pool:
return self._agent_pool[agent_type]
# 按需创建
skill = self.skill_registry.get_skill(agent_type)
agent = SubAgent(
skill=skill,
sandbox=await self._create_sandbox(),
memory=context.memory
)
# 注册到池中(长任务场景可复用)
self._agent_pool[agent_type] = agent
return agent
async def cleanup(self):
"""任务结束后清理所有子智能体和沙盒"""
for agent in self._agent_pool.values():
await agent.sandbox.destroy()
self._agent_pool.clear()
这种设计有几个工程上的好处:
- 资源效率:不预分配不用的Agent,节省GPU/CPU/内存
- 灵活扩展:新增Skill只需在skills目录下添加SKILL.md,无需修改核心代码
- 生命周期管理:通过cleanup统一释放资源,避免沙盒泄漏
2.2 子智能体间的数据传递
多智能体协作最棘手的问题之一是数据如何在智能体之间流转。DeerFlow采用共享上下文+消息总线的混合模式:
class SharedContext:
"""子智能体间的共享状态"""
def __init__(self):
self._artifacts = {} # 中间产物(代码、文档、数据)
self._messages = [] # 消息日志
self._lock = asyncio.Lock()
async def put_artifact(self, key: str, value: Any, producer: str):
async with self._lock:
self._artifacts[key] = {
"value": value,
"producer": producer,
"timestamp": time.time()
}
async def get_artifact(self, key: str) -> Optional[Any]:
async with self._lock:
return self._artifacts.get(key, {}).get("value")
举个例子:当Researcher子智能体完成了数据收集,把结果放入共享上下文;Report Generation子智能体从共享上下文中读取这些数据,生成结构化报告。
三、沙盒系统:让AI真正"动手"的安全基础设施
这是DeerFlow 2.0最核心的工程突破之一。传统Agent框架最大的短板是"只能说不能做"——DeerFlow通过沙盒系统解决了这个问题。
3.1 沙盒架构设计
┌──────────────────────────────────┐
│ SandboxMiddleware │
│ │
│ ┌────────────────────────────┐ │
│ │ Sandbox Manager │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ Container Pool │ │ │
│ │ │ ┌────┐ ┌────┐ │ │ │
│ │ │ │ C1 │ │ C2 │ ... │ │ │
│ │ │ └────┘ └────┘ │ │ │
│ │ └──────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ FileSystem Layer │ │ │
│ │ │ /workspace/ │ │ │
│ │ │ /artifacts/ │ │ │
│ │ │ /temp/ │ │ │
│ │ └──────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ Resource Limits │ │ │
│ │ │ CPU / Memory / Time │ │ │
│ │ └──────────────────────┘ │ │
│ └────────────────────────────┘ │
└──────────────────────────────────┘
沙盒的核心能力:
- 代码执行:支持Python和Shell脚本的直接运行
- 文件系统访问:隔离的文件系统,子智能体可以读写文件
- 网络控制:可配置的网络访问策略(出站白名单、DNS限制)
- 资源限制:CPU时间、内存占用、磁盘空间的硬限制
- 持久化:任务中间结果可以持久化到宿主机
3.2 沙盒安全模型
class SandboxMiddleware(AgentMiddleware):
async def before_agent(self, context: AgentContext) -> AgentContext:
# 为当前任务创建隔离沙盒
sandbox = await self.sandbox_manager.create_sandbox(
image="deerflow/runtime:latest",
resource_limits={
"cpu": 2, # 2核
"memory": "4g", # 4GB内存
"disk": "10g", # 10GB磁盘
"timeout": 3600, # 1小时超时
},
network_policy={
"allow_outbound": True,
"dns_allowlist": ["*.wikipedia.org", "api.openai.com"],
}
)
context.sandbox = sandbox
return context
async def after_agent(self, context: AgentContext, result: AgentResult) -> AgentContext:
# 提取沙盒中的产物
artifacts = await context.sandbox.extract_artifacts()
result.artifacts = artifacts
# 清理沙盒
await context.sandbox.destroy()
return context
关键安全决策:
- 每次任务一个沙盒:任务间完全隔离,避免状态泄漏
- 最小权限原则:沙盒只挂载必要的目录,网络访问严格白名单
- 超时硬杀:不管任务执行到什么状态,超时直接销毁容器
- 产物提取后即焚:沙盒销毁前提取产物,之后立刻删除容器
3.3 代码执行的完整链路
当用户要求DeerFlow"写一个数据分析脚本并运行"时,执行链路是这样的:
用户请求 → Lead Agent 理解任务
→ Coder子智能体生成Python代码
→ SandboxMiddleware创建沙盒环境
→ 代码写入沙盒文件系统
→ 在沙盒中执行Python
→ 捕获stdout/stderr
→ 如果出错,Coder自动修复并重试
→ 成功后提取结果
→ 沙盒销毁
# 代码执行核心逻辑
class CodeExecutor:
async def execute(self, code: str, sandbox: Sandbox) -> ExecutionResult:
# 1. 写入代码文件
code_path = "/workspace/main.py"
await sandbox.write_file(code_path, code)
# 2. 安装依赖(如果有)
if await self._has_requirements(sandbox):
await sandbox.run_command("pip install -r requirements.txt")
# 3. 执行代码
result = await sandbox.run_command(
f"python {code_path}",
timeout=300, # 5分钟执行超时
capture_output=True
)
# 4. 处理结果
if result.exit_code != 0:
return ExecutionResult(
success=False,
error=result.stderr,
stdout=result.stdout
)
# 5. 收集产出文件
artifacts = await sandbox.list_files("/workspace/output/")
return ExecutionResult(
success=True,
stdout=result.stdout,
artifacts=artifacts
)
四、记忆系统:从"金鱼记忆"到"长期经验"
传统Agent每次对话都从零开始,这是工程上最大的浪费。DeerFlow 2.0实现了三层记忆架构:
4.1 三层记忆模型
┌─────────────────────────────────────┐
│ 长期记忆 (Long-term) │
│ 历史经验、提取的知识、SOP规则 │
│ 存储:向量数据库 + 关系数据库 │
│ 生命周期:跨会话持久化 │
├─────────────────────────────────────┤
│ 短期记忆 (Short-term) │
│ 当前任务的上下文、中间结果 │
│ 存储:内存 + Redis │
│ 生命周期:单次任务 │
├─────────────────────────────────────┤
│ 工作记忆 (Working) │
│ 当前步骤的输入输出、工具调用结果 │
│ 存储:LLM上下文窗口 │
│ 生命周期:单步操作 │
└─────────────────────────────────────┘
4.2 记忆的读写机制
class MemoryManager:
def __init__(self, config):
self.vector_store = VectorStore(config.vector_db)
self.redis = Redis(config.redis_url)
self.knowledge_db = KnowledgeDB(config.db_url)
async def store_experience(self, task: str, result: AgentResult):
"""存储任务执行经验"""
# 提取关键信息
experience = {
"task_type": task.classification,
"approach": result.strategy_used,
"outcome": result.outcome_summary,
"lessons": result.lessons_learned,
"timestamp": time.time()
}
# 存入向量数据库(语义检索用)
await self.vector_store.upsert(
id=result.task_id,
text=experience["approach"],
metadata=experience
)
# 存入关系数据库(精确查询用)
await self.knowledge_db.insert_experience(experience)
async def recall_relevant(self, task: str, top_k: int = 5):
"""回忆与当前任务相关的历史经验"""
# 向量相似度检索
similar = await self.vector_store.search(
query=task,
top_k=top_k
)
# 组装为上下文注入Prompt
memories = []
for exp in similar:
memories.append(
f"历史经验[{exp.metadata['task_type']}]: "
f"方法={exp.metadata['approach']}, "
f"结果={exp.metadata['outcome']}"
)
return "\n".join(memories)
4.3 记忆压缩策略
LLM的上下文窗口是有限的,不能把所有记忆都塞进去。DeerFlow采用了渐进式摘要策略:
class MemoryCompressor:
async def compress(self, memories: list[Memory]) -> str:
# 1. 按时间窗口分组
recent = [m for m in memories if m.age < timedelta(hours=1)]
medium = [m for m in memories if timedelta(hours=1) <= m.age < timedelta(days=1)]
old = [m for m in memories if m.age >= timedelta(days=1)]
# 2. 不同粒度的压缩
summary_parts = []
# 最近1小时:完整保留
for m in recent:
summary_parts.append(m.full_text())
# 1小时-1天:压缩为摘要
if medium:
medium_summary = await self._llm_summarize(
"\n".join(m.full_text() for m in medium),
max_tokens=500
)
summary_parts.append(f"[近24小时摘要] {medium_summary}")
# 1天以上:只保留关键结论
if old:
old_summary = await self._llm_summarize(
"\n".join(m.full_text() for m in old),
max_tokens=200
)
summary_parts.append(f"[历史经验摘要] {old_summary}")
return "\n\n".join(summary_parts)
五、KnowledgeExtractionMiddleware:Agent的自主学习引擎
这是DeerFlow 2.0最令人兴奋的设计——Agent能从任务执行过程中自动提取知识,打包成可复用的Skill。
5.1 五步知识提取流水线
根据源码分析,KnowledgeExtractionMiddleware.after_agent()执行以下五步:
| 步骤 | 内容 | 说明 |
|---|---|---|
| ① | 检查开关 + 判断是否深度研究 | 不是所有任务都需要知识提取 |
| ② | extract_knowledge(..., "knowledge") | 提取事实性知识 |
| ③ | extract_knowledge(..., "rules") | 提取规则性知识 |
| ④ | extract_knowledge(..., "sop") | 提取标准操作流程 |
| ⑤ | package_as_skill(...) | 打包为可复用Skill |
class KnowledgeExtractionMiddleware(AgentMiddleware):
async def after_agent(self, context: AgentContext, result: AgentResult):
# 步骤①:检查是否需要知识提取
if not self._should_extract(context, result):
return context
if not context.is_deep_research:
return context
# 步骤②③④:并行/串行提取三类知识
knowledge = await self._extract_knowledge(context, "knowledge")
rules = await self._extract_knowledge(context, "rules")
sop = await self._extract_knowledge(context, "sop")
# 步骤⑤:打包为Skill
if knowledge or rules or sop:
skill = await self._package_as_skill(
name=context.task_summary,
knowledge=knowledge,
rules=rules,
sop=sop
)
context.extracted_skill = skill
return context
async def _extract_knowledge(self, context: AgentContext, ktype: str):
"""使用LLM从对话历史中提取特定类型的知识"""
prompt = self._build_extraction_prompt(context, ktype)
result = await self.llm.generate(prompt)
return self._parse_extraction_result(result, ktype)
async def _package_as_skill(self, name, knowledge, rules, sop):
"""将提取的知识打包为标准Skill格式"""
skill_md = f"""# {name}
## Knowledge
{knowledge}
## Rules
{rules}
## SOP
{sop}
"""
skill_path = self.skills_dir / f"{self._slugify(name)}" / "SKILL.md"
skill_path.parent.mkdir(parents=True, exist_ok=True)
skill_path.write_text(skill_md)
# 注册到Skill Registry
self.skill_registry.register(skill_path)
return skill_path
5.2 知识提取的实际效果
假设你让DeerFlow执行了一个"分析某上市公司的财务数据并生成投资分析报告"的任务。完成后,KnowledgeExtractionMiddleware会自动:
- Knowledge提取:从对话中提取关于财务分析方法的事实性知识(如"P/E比率在15-25之间属于合理估值区间")
- Rules提取:提取规则性知识(如"当营收增长率连续3季度下滑时,需要额外检查现金流")
- SOP提取:提取操作流程(如"第一步获取财报数据→第二步计算关键指标→第三步横向对比→第四步生成报告")
- Skill打包:将以上内容打包成
financial-analysis/SKILL.md
下次你让DeerFlow做类似任务时,Lead Agent会自动发现这个Skill,直接复用,不再从零开始。
这就是DeerFlow的自我进化机制:用得越多,积累的Skill越丰富,执行效率越高。
六、技能系统(Skills):渐进式加载与MCP集成
6.1 Skill的标准结构
每个Skill都是一个目录,核心是SKILL.md:
skills/
├── research/
│ ├── SKILL.md # 技能描述与指令
│ ├── config.yaml # 技能配置
│ └── templates/ # Prompt模板
├── report-generation/
│ ├── SKILL.md
│ └── ...
└── custom-skill/
├── SKILL.md
└── ...
6.2 渐进式技能加载
DeerFlow不会在启动时加载所有Skill,而是按需发现和加载:
class SkillRegistry:
def __init__(self, skills_path: Path):
self.skills_path = skills_path
self._cache = {} # 已加载的Skill缓存
self._index = None # Skill索引
async def discover_skills(self, task: str) -> list[Skill]:
"""根据任务描述发现相关Skill"""
if self._index is None:
self._index = await self._build_index()
# 语义匹配
relevant = await self._semantic_match(task, self._index)
# 懒加载
skills = []
for match in relevant:
if match.name not in self._cache:
skill = await self._load_skill(match.path)
self._cache[match.name] = skill
skills.append(self._cache[match.name])
return skills
async def _build_index(self):
"""扫描skills目录,构建轻量级索引"""
index = []
for skill_dir in self.skills_path.iterdir():
skill_md = skill_dir / "SKILL.md"
if skill_md.exists():
# 只读取元数据,不加载完整内容
meta = self._parse_frontmatter(skill_md)
index.append(SkillIndex(
name=meta.get("name", skill_dir.name),
description=meta.get("description", ""),
path=skill_dir
))
return index
6.3 MCP协议集成
DeerFlow 2.0支持MCP(Model Context Protocol),这意味着它可以与外部工具服务对接:
# MCP集成配置示例 (config.yaml)
mcp_servers:
- name: "filesystem"
transport: "stdio"
command: "npx"
args: ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]
- name: "github"
transport: "stdio"
command: "npx"
args: ["-y", "@modelcontextprotocol/server-github"]
env:
GITHUB_TOKEN: "${GITHUB_TOKEN}"
- name: "database"
transport: "sse"
url: "http://localhost:3001/sse"
通过MCP,DeerFlow可以:
- 读写本地文件系统
- 操作GitHub仓库(创建PR、Review代码)
- 查询数据库
- 调用任意外部API
七、实战:用DeerFlow构建一个自动化代码审查系统
理论讲够了,来点实际的。我们用DeerFlow的架构思想,构建一个自动代码审查系统。
7.1 系统设计
GitHub Webhook → Lead Agent
├── Researcher: 分析PR变更的上下文(关联Issue、历史提交)
├── Coder: 检查代码质量(静态分析、安全扫描)
├── Reviewer: 评估架构合理性和可维护性
└── Reporter: 汇总审查结果,生成Review Comment
7.2 Skill定义
# code-review/SKILL.md
## Description
Automated code review skill for GitHub Pull Requests.
## Capabilities
- Static analysis (lint, type check, complexity)
- Security vulnerability scanning
- Architecture review
- Test coverage analysis
## SOP
1. Fetch PR diff from GitHub
2. Analyze changed files by language
3. Run static analysis tools in sandbox
4. Evaluate architectural impact
5. Check test coverage delta
6. Generate structured review comments
7. Post review to GitHub PR
## Rules
- Never auto-approve PRs with security vulnerabilities
- Flag any function with cyclomatic complexity > 15
- Require test coverage for new public APIs
- Check for hardcoded secrets/credentials
7.3 核心实现
import asyncio
from deerflow import LeadAgent, SkillRegistry, SandboxMiddleware
async def code_review_pipeline(pr_url: str):
# 1. 初始化Lead Agent
registry = SkillRegistry(Path("./skills"))
sandbox_mw = SandboxMiddleware(config)
agent = LeadAgent(
model="doubao-seed",
skill_registry=registry,
middlewares=[sandbox_mw]
)
# 2. 构建任务上下文
context = AgentContext(
task=f"Review the pull request: {pr_url}",
skills=["code-review"],
sandbox_enabled=True,
deep_research=True # 启用知识提取
)
# 3. 执行审查
result = await agent.run(context)
# 4. 输出审查结果
if result.success:
print(f"Review completed: {result.artifacts['review_summary']}")
print(f"Knowledge extracted: {result.extracted_skill}")
else:
print(f"Review failed: {result.error}")
# 运行
asyncio.run(code_review_pipeline(
"https://github.com/org/repo/pull/42"
))
7.4 沙盒中的静态分析
# 在沙盒中执行的代码
SANDBOX_SCRIPT = """
#!/bin/bash
set -e
# 安装分析工具
pip install ruff mypy bandit pytest-cov
# 克隆PR分支
git clone --depth=1 --branch=${PR_BRANCH} ${REPO_URL} /workspace/repo
cd /workspace/repo
# 1. Lint检查
echo "=== Lint Results ==="
ruff check . --output-format=json > /workspace/output/lint.json 2>&1 || true
# 2. 类型检查
echo "=== Type Check Results ==="
mypy . --json-report /workspace/output/types.json 2>&1 || true
# 3. 安全扫描
echo "=== Security Scan Results ==="
bandit -r . -f json -o /workspace/output/security.json 2>&1 || true
# 4. 测试覆盖率
echo "=== Test Coverage ==="
pytest --cov=. --cov-report=json:/workspace/output/coverage.json 2>&1 || true
echo "Analysis complete."
"""
八、性能优化:让Agent跑得更快
Agent系统的性能瓶颈通常不在LLM推理本身,而在调度开销和上下文管理。DeerFlow 2.0的几个关键优化策略:
8.1 并行子智能体调度
class ParallelScheduler:
async def execute_dag(self, plan: ExecutionPlan):
"""按DAG依赖关系并行执行"""
completed = set()
results = {}
while not plan.is_complete():
# 找到所有前置依赖已完成的阶段
ready = plan.get_ready_stages(completed)
# 并行执行所有就绪阶段
tasks = [
self._execute_stage(stage, results)
for stage in ready
]
stage_results = await asyncio.gather(*tasks, return_exceptions=True)
# 收集结果
for stage, result in zip(ready, stage_results):
if isinstance(result, Exception):
await self._handle_failure(stage, result)
else:
results[stage.id] = result
completed.add(stage.id)
8.2 上下文窗口优化
class ContextWindowManager:
def __init__(self, max_tokens: int = 128000):
self.max_tokens = max_tokens
self.token_counter = TiktokenCounter()
async def optimize_context(self, context: AgentContext) -> str:
"""在有限的上下文窗口内最大化有效信息密度"""
components = {
"system_prompt": context.system_prompt, # 不可压缩
"task_description": context.task, # 不可压缩
"relevant_memories": await self._recall(context), # 可压缩
"tool_results": context.tool_results, # 可压缩
"conversation_history": context.history, # 可压缩
}
# 按优先级分配token预算
budget = self._allocate_budget(components)
# 压缩低优先级组件
for key in ["conversation_history", "tool_results", "relevant_memories"]:
if self.token_counter.count(components[key]) > budget[key]:
components[key] = await self._compress(
components[key],
target_tokens=budget[key]
)
return self._assemble(components)
8.3 沙盒预热池
class SandboxPool:
"""预创建沙盒容器,避免冷启动延迟"""
def __init__(self, pool_size: int = 3):
self.pool_size = pool_size
self._available = asyncio.Queue(maxsize=pool_size)
self._ warming_task = None
async def warm(self):
"""预热:提前创建容器"""
for _ in range(self.pool_size):
sandbox = await self._create_sandbox()
await self._available.put(sandbox)
# 后台持续补充
self._warming_task = asyncio.create_task(self._refill_loop())
async def acquire(self) -> Sandbox:
"""获取一个预热好的沙盒"""
sandbox = await self._available.get()
return sandbox
async def release(self, sandbox: Sandbox):
"""归还沙盒(重置后复用)"""
await sandbox.reset() # 清理文件系统,但保留容器
await self._available.put(sandbox)
九、与主流Agent框架的对比分析
| 维度 | DeerFlow 2.0 | LangGraph | AutoGen | CrewAI |
|---|---|---|---|---|
| 架构模式 | Lead Agent + Middleware | State Graph | Multi-Agent Conversation | Role-Based Crew |
| 代码执行 | 内置沙盒 | 需外部工具 | 需外部工具 | 需外部工具 |
| 记忆系统 | 三层+自动压缩 | 手动实现 | 无内置 | 短期记忆 |
| 知识提取 | 自动提取+Skill打包 | 无 | 无 | 无 |
| MCP支持 | 原生集成 | 社区插件 | 无 | 无 |
| 技能扩展 | SKILL.md热加载 | 代码级扩展 | 代码级扩展 | 代码级扩展 |
| 沙盒安全 | 容器级隔离 | 无 | 无 | 无 |
| 长时间任务 | 原生支持(小时级) | 需配置Checkpointer | 不支持 | 不支持 |
| 自我进化 | 知识→Skill自动闭环 | 无 | 无 | 无 |
DeerFlow的核心差异化:
- 沙盒是内置的,不是可选插件——这决定了它"能做事"而不是"能说话"
- 知识提取是自动的——Agent用得越多越聪明,这是其他框架没有的
- Middleware Chain让架构具有高度的横向扩展性,添加新功能不需要改核心逻辑
十、部署实战:从零搭建DeerFlow开发环境
10.1 环境准备
# 系统要求
# - Node.js 22+
# - Python 3.12+
# - Docker (沙盒运行时)
# 安装依赖
npm install -g pnpm
pip install uv
# 克隆仓库
git clone https://github.com/bytedance/deer-flow.git
cd deer-flow
# 生成配置文件
python scripts/configure.py
# 这会生成 config.yaml 和 .env
10.2 配置模型
# config.yaml
models:
default: "doubao-seed"
available:
- name: "doubao-seed"
provider: "volcengine"
api_key_env: "DOUBAO_API_KEY"
- name: "gpt-4o"
provider: "openai"
api_key_env: "OPENAI_API_KEY"
- name: "claude-sonnet-4-20250514"
provider: "anthropic"
api_key_env: "ANTHROPIC_API_KEY"
sandbox:
enabled: true
image: "deerflow/runtime:latest"
max_containers: 5
default_timeout: 3600
memory:
vector_db: "chromadb"
persist_path: "./data/memory"
knowledge_extraction:
enabled: true
auto_skill: true
skills_output: "./skills/custom"
10.3 启动服务
# 启动LangGraph Server(Gateway API + Agent Runtime)
cd backend
$env:PYTHONPATH="."
uv run langgraph dev --no-browser
# 启动前端(可选)
cd frontend
pnpm install
pnpm dev
10.4 验证部署
# 发送测试任务
curl -X POST http://localhost:8000/api/run \
-H "Content-Type: application/json" \
-d '{
"task": "分析2025年全球主要编程语言的流行度趋势,生成结构化报告",
"deep_research": true,
"sandbox_enabled": true
}'
十一、总结与展望
DeerFlow 2.0代表的是Agent框架从"玩具"到"工具"的关键转变。它的几个核心设计决策值得所有做Agent系统的开发者学习:
Lead Agent + Middleware Chain:关注点分离的教科书级实践。调度逻辑和横切关注点完全解耦,新增功能只需添加中间件。
内置沙盒:这不是一个"nice-to-have"的功能,而是Agent从"能说"到"能做"的基础设施。没有沙盒的Agent框架,本质上还是聊天机器人。
自动知识提取:这是DeerFlow最前瞻的设计。Agent不仅能执行任务,还能从执行过程中学习,把经验沉淀为可复用的Skill。这创造了飞轮效应——用得越多,能力越强。
渐进式技能加载:避免了"启动时加载一切"的资源浪费,同时保持了发现和复用的灵活性。
当然,DeerFlow 2.0也有它的局限:
- 沙盒冷启动开销:容器创建需要数秒,对短任务来说延迟偏高
- 知识提取质量:依赖LLM的提取质量,对于专业性极强的领域可能不够准确
- 多租户隔离:当前设计更偏向单用户场景,多租户场景需要额外的隔离层
但瑕不掩瑜。DeerFlow 2.0的架构设计,为"生产级Agent运行时"给出了一个极具参考价值的答案。如果你正在构建自己的Agent系统,至少应该认真研究它的中间件链和沙盒设计——这两个决策的影响远超代码层面。
DeerFlow不是终点,但它是正确的方向。
项目地址:https://github.com/bytedance/deer-flow
Star数:49K+(截至2026年5月)
许可证:Apache 2.0