DeerFlow 2.0 深度实战:字节跳动开源超级智能体框架——从 LangGraph 图状态机到 Docker 沙箱隔离,46K Star 背后的技术架构革命
文章摘要:DeerFlow 2.0 是字节跳动于 2026 年 2 月开源的超级智能体框架,上线即登顶 GitHub Trending 榜首,30 天内斩获近 5 万 Star。作为 OpenAI Deep Research 的开源替代方案,DeerFlow 创新性地将子代理编排、记忆系统、Docker 沙箱和可扩展技能整合为统一的 "SuperAgent Harness",支持从分钟级到小时级的复杂任务自动化。本文将从架构设计、核心组件、技术实现、性能优化、实战案例等多个维度,深度剖析 DeerFlow 2.0 的技术内幕。
一、背景介绍:AI Agent 从"对话"到"执行"的范式转变
1.1 传统 AI 助手的困境
2024-2025 年,我们见证了大模型从"对话工具"向"生产力工具"的演进。然而,传统的 AI 助手(如 ChatGPT、Claude)仍然存在以下核心问题:
- 无状态性:每次对话都是全新的开始,无法记住之前的上下文和决策过程
- 无法执行:只能生成代码和建议,无法真正运行代码、操作系统、访问文件系统
- 工具孤立:每个工具都是独立的 API 调用,缺乏协同和编排能力
- 无沙箱隔离:执行代码时缺乏安全隔离,容易引发系统风险
1.2 DeerFlow 的诞生背景
字节跳动内部在长期的人工智能研发实践中,逐渐形成了对"超级智能体"的明确需求:
- 深度研究任务:需要数小时的信息收集、代码执行、数据分析
- 多步骤工作流:需要规划、执行、验证、迭代的闭环能力
- 安全隔离:执行不受信任的代码时,必须保证宿主系统的安全
- 可扩展性:支持自定义技能和工具,而不需要修改核心框架
2026 年 2 月 28 日,字节跳动正式开源 DeerFlow 2.0(Deep Exploration and Efficient Research Flow),将其内部 LangManus 项目的验证经验对外开放。
核心数据:
- GitHub Star:46K+(截至 2026 年 5 月)
- 日均增长:1300+ Star
- 开源时间:2026 年 2 月 28 日
- 技术栈:LangGraph 1.0 + Docker + FastAPI + React
二、核心概念:DeerFlow 的架构哲学
2.1 什么是 SuperAgent Harness?
DeerFlow 的核心创新在于提出了 "SuperAgent Harness"(超级智能体套件)的概念。与传统的单智能体架构不同,Harness 是一个编排层,它负责:
- 协调多个子智能体(Sub-Agents)的并行执行
- 管理共享状态(通过 LangGraph 的 StateGraph)
- 提供安全隔离(通过 Docker Sandbox)
- 持久化记忆(Short-term + Long-term Memory)
- 动态加载技能(Markdown-based Skills)
┌─────────────────────────────────────────────────┐
│ SuperAgent Harness │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Lead Agent │──│ Orchestrator│ │
│ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ┌─────┴─────┬────────┴────────┐ │
│ │ │ │ │
│ ┌▼───┐ ┌──▼──┐ ┌────▼────┐ │
│ │Sub │ │Sub │ ... │Sub Agent│ │
│ │Agent│ │Agent│ │ N │ │
│ └─────┘ └─────┘ └─────────┘ │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ 14-Layer Middleware Stack │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
│ │
┌──────▼──────┐ ┌─────▼─────┐
│Docker Sandbox│ │ Memory │
│ Pool │ │ System │
└─────────────┘ └───────────┘
2.2 DeerFlow 2.0 vs 1.0:架构演进
| 维度 | DeerFlow 1.0 | DeerFlow 2.0 |
|---|---|---|
| 编排框架 | LangGraph 0.x | LangGraph 1.0 |
| 节点数量 | 固定 5 节点 | 动态中间件链(11 层+) |
| 子智能体 | 单线程串行 | 并行编排(支持 Map-Reduce) |
| 沙箱环境 | 无沙箱 / 简单隔离 | Docker Sandbox Pool |
| 技能系统 | 硬编码 | Markdown 可扩展技能 |
| 记忆系统 | 基础对话历史 | 短期 + 长期 + 知识图谱 |
2.3 核心技术组件
DeerFlow 2.0 由以下六大核心组件构成:
1. Lead Agent(主智能体)
- 角色:项目经理,负责理解用户意图、拆解任务、分配子任务
- 实现:基于 LangGraph 的
StateGraph,维护全局状态 - 技术细节:使用
TypedDict定义状态 schema,支持类型安全的状态传递
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator
class DeerFlowState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
task_plan: list[str] # 拆解后的任务列表
current_step: int # 当前执行步骤
sandbox_id: str # 当前沙箱 ID
memory_context: dict # 记忆上下文
2. Sub-Agent Orchestrator(子智能体编排器)
- 功能:管理子智能体的生命周期(创建、调度、销毁)
- 并行策略:支持
Parallel、Serial、Conditional Branch - 容错机制:子智能体失败自动重试(最多 3 次),失败后 fallback 到主智能体处理
from langgraph.graph import START, StateGraph
def create_orchestrator():
graph = StateGraph(DeerFlowState)
# 添加子智能体节点
graph.add_node("researcher", researcher_agent)
graph.add_node("coder", coder_agent)
graph.add_node("reporter", reporter_agent)
# 并行编排:researcher 和 coder 同时工作
graph.add_edge("researcher", "reporter")
graph.add_edge("coder", "reporter")
return graph.compile()
3. Docker Sandbox Pool(Docker 沙箱池)
这是 DeerFlow 最值得称道的安全设计。每个任务都在独立的 Docker 容器中执行,保证:
- 文件系统隔离:容器内的文件操作不影响宿主机
- 网络隔离:可选禁止容器访问外网(防止数据泄露)
- 资源限制:CPU、内存、磁盘 IO 的上限控制
技术实现:
import docker
class DockerSandbox:
def __init__(self, memory_limit="2g", cpu_quota=100000):
self.client = docker.from_env()
self.container = None
self.memory_limit = memory_limit
self.cpu_quota = cpu_quota
def create(self):
"""创建隔离的 Docker 容器"""
self.container = self.client.containers.run(
image="deerflow/sandbox:latest",
command="tail -f /dev/null", # 保持容器运行
detach=True,
mem_limit=self.memory_limit,
cpu_quota=self.cpu_quota,
network_mode="bridge", # 可配置为 "none" 禁止网络
volumes={
"/tmp/deerflow/share": {"bind": "/share", "mode": "rw"}
}
)
return self.container.id
def execute(self, code: str) -> str:
"""在容器中执行代码"""
result = self.container.exec_run(
cmd=["python", "-c", code],
workdir="/share"
)
return result.output.decode("utf-8")
def destroy(self):
"""销毁容器"""
if self.container:
self.container.stop()
self.container.remove()
性能优化:
- 使用容器池(Pool)复用已创建的容器,避免冷启动开销(冷启动约 2-3 秒,池化后可降至 < 200ms)
- 支持快照(Snapshot)功能:保存容器状态,快速恢复到某个检查点
4. Memory System(记忆系统)
DeerFlow 实现了分层记忆架构:
┌─────────────────────────────────────┐
│ │
│ ┌────────────┐ ┌──────────────┐│
│ │ Short-term │ │ Long-term ││
│ │ Memory │ │ Memory ││
│ │ (对话历史) │ │ (知识图谱) ││
│ └────────────┘ └──────────────┘│
│ │ │ │
│ └───────┬───────┘ │
│ │ │
│ ┌───────▼────────┐ │
│ │ Vector Store │ │
│ │ (Qdrant) │ │
│ └────────────────┘ │
└─────────────────────────────────────┘
- Short-term Memory:使用 LangChain 的
ChatMessageHistory,保存最近 20 轮对话 - Long-term Memory:使用 Qdrant 向量数据库,存储跨会话的知识
- 记忆检索:基于语义相似度(Embedding:BGE-M3 / OpenAI text-embedding-3-small)
from langchain_community.vectorstores import Qdrant
from langchain_community.embeddings import HuggingFaceEmbeddings
class MemorySystem:
def __init__(self):
self.embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-m3")
self.vectorstore = Qdrant.from_existing_collection(
collection_name="deerflow_memory",
embedding=self.embeddings,
url="http://localhost:6333"
)
def store(self, content: str, metadata: dict):
"""存储长期记忆"""
self.vectorstore.add_texts(
texts=[content],
metadatas=[metadata]
)
def retrieve(self, query: str, k: int = 5) -> list[str]:
"""检索相关记忆"""
docs = self.vectorstore.similarity_search(query, k=k)
return [doc.page_content for doc in docs]
5. Skills System(技能系统)
DeerFlow 2.0 引入了基于 Markdown 的技能定义,使得扩展能力变得极其简单:
技能示例(skills/arxiv_search.md):
# Arxiv Search Skill
## Description
Search academic papers on Arxiv.org
## Parameters
- `query`: Search keyword (required)
- `max_results`: Maximum number of results (default: 10)
## Workflow
1. Call `arxiv.search(query, max_results)`
2. Parse paper titles, authors, abstracts
3. Return structured results
## Code
```python
import arxiv
def search(query: str, max_results: int = 10):
client = arxiv.Client()
search = arxiv.Search(
query=query,
max_results=max_results,
sort_by=arxiv.SortCriterion.Relevance
)
results = []
for result in client.results(search):
results.append({
"title": result.title,
"authors": [a.name for a in result.authors],
"summary": result.summary
})
return results
**技能加载机制**:
- 启动时扫描 `skills/` 目录
- 解析 Markdown 文件,提取 `Description`、`Parameters`、`Workflow`、`Code`
- 动态注册为 LangChain `Tool`,可被智能体调用
```python
from langchain.tools import Tool
import re
class SkillLoader:
def load_skills(self, skills_dir: str) -> list[Tool]:
tools = []
for md_file in Path(skills_dir).glob("*.md"):
with open(md_file, "r") as f:
content = f.read()
# 解析 Markdown 中的代码块
code_match = re.search(r"```python\n(.*?)\n```", content, re.DOTALL)
if code_match:
code = code_match.group(1)
# 动态执行代码,提取函数
exec_globals = {}
exec(code, exec_globals)
func_name = list(exec_globals.keys())[-1] # 假设最后一个定义是主函数
func = exec_globals[func_name]
# 创建 Tool
tool = Tool(
name=md_file.stem,
func=func,
description=self._extract_description(content)
)
tools.append(tool)
return tools
6. MCP Protocol Support(MCP 协议支持)
DeerFlow 2.0 完整支持 Model Context Protocol (MCP),使得:
- 智能体可以调用外部工具(通过 HTTP/SSE)
- 工具提供方可以标准化地暴露能力(类似 LSP for tools)
- 支持工具链组合(Tool Chaining)
from mcp import Client
class MCPToolCaller:
def __init__(self, mcp_server_url: str):
self.client = Client(mcp_server_url)
def call_tool(self, tool_name: str, params: dict) -> dict:
"""调用 MCP 工具"""
response = self.client.call_tool(tool_name, params)
return response
三、架构分析:14 层中间件栈的设计美学
DeerFlow 2.0 最精妙的设计在于其 14 层中间件栈(Middleware Stack),借鉴了 Express.js / Koa 的中间件思想,但应用于 AI Agent 编排场景。
3.1 中间件层级(从外到内)
| 层级 | 名称 | 功能 |
|---|---|---|
| 1 | Auth Middleware | JWT / API Key 鉴权 |
| 2 | Rate Limit Middleware | 限流(防止滥用) |
| 3 | Audit Middleware | 操作审计(记录到数据库) |
| 4 | Logging Middleware | 结构化日志(JSON 格式) |
| 5 | Tracing Middleware | 分布式追踪(OpenTelemetry) |
| 6 | Context Injection | 注入用户上下文(UserInfo、TenantID) |
| 7 | Memory Retrieval | 从向量库检索相关记忆 |
| 8 | Task Planning | 任务拆解(使用 LLM) |
| 9 | Sub-Agent Routing | 子智能体路由(选择最合适的 Agent) |
| 10 | Sandbox Provisioning | 分配 Docker 沙箱 |
| 11 | Tool Orchestration | 工具编排(调用外部 API / MCP) |
| 12 | Code Execution | 代码执行(在沙箱中) |
| 13 | Result Aggregation | 结果聚合(多智能体结果合并) |
| 14 | Response Formatting | 响应格式化(Markdown / JSON) |
3.2 中间件实现示例
from typing import Callable, Awaitable
from dataclasses import dataclass
@dataclass
class MiddlewareContext:
user_id: str
task: str
state: DeerFlowState
sandbox_id: str | None = None
class Middleware:
async def __call__(
self,
ctx: MiddlewareContext,
next: Callable[[MiddlewareContext], Awaitable[DeerFlowState]]
) -> DeerFlowState:
# Before logic
result = await next(ctx)
# After logic
return result
class AuthMiddleware(Middleware):
async def __call__(self, ctx, next):
# 验证 JWT Token
if not self._verify_token(ctx.user_id):
raise UnauthorizedError("Invalid token")
return await next(ctx)
class MemoryRetrievalMiddleware(Middleware):
async def __call__(self, ctx, next):
# 从向量库检索相关记忆
relevant_memories = memory_system.retrieve(ctx.task)
ctx.state["memory_context"] = relevant_memories
return await next(ctx)
# 组装中间件链
middleware_stack = [
AuthMiddleware(),
RateLimitMiddleware(),
AuditMiddleware(),
LoggingMiddleware(),
TracingMiddleware(),
ContextInjectionMiddleware(),
MemoryRetrievalMiddleware(),
TaskPlanningMiddleware(),
SubAgentRoutingMiddleware(),
SandboxProvisioningMiddleware(),
ToolOrchestrationMiddleware(),
CodeExecutionMiddleware(),
ResultAggregationMiddleware(),
ResponseFormattingMiddleware()
]
async def execute_with_middleware(ctx: MiddlewareContext):
"""执行中间件链"""
async def runner(index: int) -> DeerFlowState:
if index >= len(middleware_stack):
return ctx.state # 到达终点,返回状态
middleware = middleware_stack[index]
return await middleware(ctx, lambda c: runner(index + 1))
return await runner(0)
3.3 为什么需要 14 层中间件?
传统 AI Agent 框架(如 LangChain、AutoGen)通常将逻辑硬编码在 Agent 内部,导致:
- 难以扩展:新增功能需要修改 Agent 代码
- 关注点不分离:鉴权、日志、业务逻辑混杂在一起
- 难以测试:中间件可以独立单元测试
DeerFlow 通过中间件栈实现了:
- 关注点分离(Separation of Concerns)
- 可插拔架构(Pluggable Architecture)
- 类似 Kubernetes 的声明式编排
四、代码实战:从零构建一个 DeerFlow 子智能体
4.1 环境准备
# 克隆仓库
git clone https://github.com/bytedance/deer-flow.git
cd deer-flow
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装依赖
pip install -r requirements.txt
# 启动 Docker(确保 Docker Daemon 运行)
# 启动 Qdrant(向量数据库)
docker run -p 6333:6333 qdrant/qdrant
# 配置环境变量
cp .env.example .env
# 编辑 .env,填入 OPENAI_API_KEY 等
4.2 定义一个自定义子智能体
假设我们要创建一个 "代码审查智能体"(Code Review Agent),它可以:
- 接收一段代码
- 使用 LLM 分析代码质量
- 给出改进建议
- 生成单元测试
# agents/code_reviewer.py
from langchain.agents import create_openai_functions_agent
from langchain.tools import Tool
from langchain.prompts import ChatPromptTemplate
class CodeReviewAgent:
def __init__(self, llm, tools: list[Tool]):
self.llm = llm
self.tools = tools
self.agent = self._create_agent()
def _create_agent(self):
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert code reviewer.
Your job is to:
1. Analyze code quality (readability, efficiency, security)
2. Suggest improvements
3. Generate unit tests
Always provide specific, actionable feedback."""),
("human", "{input}"),
("placeholder", "{agent_scratchpad}")
])
return create_openai_functions_agent(
llm=self.llm,
tools=self.tools,
prompt=prompt
)
def review(self, code: str) -> dict:
"""执行代码审查"""
result = self.agent.invoke({"input": f"Review this code:\n\n{code}"})
return {
"analysis": result["output"],
"suggested_tests": self._generate_tests(code)
}
def _generate_tests(self, code: str) -> str:
"""生成单元测试(在沙箱中执行)"""
test_gen_prompt = f"Generate pytest unit tests for:\n\n{code}"
result = self.agent.invoke({"input": test_gen_prompt})
return result["output"]
4.3 将子智能体注册到 Orchestrator
# orchestrator.py
from langgraph.graph import StateGraph, END
def create_deerflow_orchestrator():
graph = StateGraph(DeerFlowState)
# 定义节点
graph.add_node("planner", planner_agent)
graph.add_node("code_reviewer", code_review_agent)
graph.add_node("reporter", reporter_agent)
# 定义边(流程)
graph.add_edge(START, "planner")
graph.add_conditional_edges(
"planner",
lambda state: state["task_type"], # 根据任务类型路由
{
"code_review": "code_reviewer",
"research": "researcher",
"coding": "coder"
}
)
graph.add_edge("code_reviewer", "reporter")
graph.add_edge("reporter", END)
return graph.compile()
4.4 在 Docker 沙箱中执行代码
# sandbox.py
import docker
import tarfile
from pathlib import Path
class CodeSandbox:
def __init__(self):
self.client = docker.from_env()
def execute_code(self, code: str, timeout: int = 60) -> dict:
"""在隔离容器中执行代码"""
container = self.client.containers.run(
image="python:3.12-slim",
command='sleep 3600', # 保持运行
detach=True,
mem_limit="1g",
cpu_quota=50000, # 50% CPU
network_mode="none" # 禁止网络访问
)
try:
# 将代码写入容器
tarstream = self._create_tarball(code)
container.put_archive("/tmp", tarstream)
# 执行代码
result = container.exec_run(
cmd=["python", "/tmp/code_to_run.py"],
timeout=timeout
)
return {
"exit_code": result.exit_code,
"output": result.output.decode("utf-8")
}
finally:
container.stop()
container.remove()
def _create_tarball(self, code: str) -> bytes:
"""创建 tar 归档(用于上传文件到容器)"""
import io
tarstream = io.BytesIO()
with tarfile.open(fileobj=tarstream, mode="w") as tar:
data = code.encode("utf-8")
info = tarfile.TarInfo(name="code_to_run.py")
info.size = len(data)
tar.addfile(info, io.BytesIO(data))
tarstream.seek(0)
return tarstream.read()
4.5 完整示例:自动化代码审查工作流
# main.py
from deerflow import DeerFlow
from agents.code_reviewer import CodeReviewAgent
from sandbox import CodeSandbox
# 初始化 DeerFlow
app = DeerFlow(
llm="gpt-4o",
enable_sandbox=True,
enable_memory=True
)
# 注册自定义智能体
app.register_agent("code_reviewer", CodeReviewAgent)
# 执行任务
result = app.run("""
I have a Python function that calculates Fibonacci numbers.
Please review it for efficiency and suggest improvements.
Also generate unit tests.
```python
def fib(n):
if n <= 1:
return n
return fib(n-1) + fib(n-2)
""")
print(result["output"])
**输出示例**:
```markdown
## Code Review Report
### 1. Analysis
The Fibonacci implementation has a **critical performance issue**:
- Time Complexity: O(2^n) (exponential)
- The recursive implementation re-computes the same subproblems thousands of times
### 2. Suggested Improvements
Use **memoization** or **dynamic programming**:
```python
def fib_optimized(n: int, memo: dict = {}) -> int:
"""O(n) time, O(n) space"""
if n in memo:
return memo[n]
if n <= 1:
return n
memo[n] = fib_optimized(n-1, memo) + fib_optimized(n-2, memo)
return memo[n]
3. Unit Tests
import pytest
def test_fib_zero():
assert fib_optimized(0) == 0
def test_fib_one():
assert fib_optimized(1) == 1
def test_fib_large():
assert fib_optimized(10) == 55
def test_fib_performance():
import time
start = time.time()
fib_optimized(100)
elapsed = time.time() - start
assert elapsed < 0.01 # Should be fast with memoization
---
## 五、性能优化:让 DeerFlow 快如闪电
### 5.1 并行化子智能体执行
DeerFlow 2.0 的核心优势在于**智能体并行编排**。通过 LangGraph 的 `Send` API,可以实现 Map-Reduce 模式:
```python
from langgraph.graph import StateGraph
def map_reduce_workflow():
graph = StateGraph(DeerFlowState)
# Mapper:并行处理多个任务
graph.add_node("mapper", mapper_agent)
# Reducer:聚合结果
graph.add_node("reducer", reducer_agent)
# 动态生成并行节点
graph.add_conditional_edges(
"mapper",
lambda state: [Send("worker", {"task": t}) for t in state["tasks"]]
)
graph.add_edge("worker", "reducer")
return graph.compile()
性能对比:
| 任务类型 | 串行执行 | 并行执行 | 加速比 |
|---|---|---|---|
| 10 个代码文件审查 | 120s | 18s | 6.7x |
| 50 个 Arxiv 论文摘要 | 300s | 25s | 12x |
| 5 个数据分析任务 | 60s | 15s | 4x |
5.2 Docker 沙箱池化
每次创建 Docker 容器需要 2-3 秒,对于高频任务来说这是不可接受的。DeerFlow 通过沙箱池解决这个问题:
from queue import Queue
class SandboxPool:
def __init__(self, pool_size: int = 10):
self.pool = Queue(maxsize=pool_size)
self._initialize_pool(pool_size)
def _initialize_pool(self, size: int):
"""预创建容器"""
for _ in range(size):
container = self.client.containers.run(
image="deerflow/sandbox:latest",
command="tail -f /dev/null",
detach=True,
mem_limit="2g"
)
self.pool.put(container)
def acquire(self) -> docker.models.containers.Container:
"""获取容器(阻塞直到可用)"""
return self.pool.get()
def release(self, container):
"""释放容器(清理状态后放回池)"""
# 清理容器内的临时文件
container.exec_run("rm -rf /tmp/*")
self.pool.put(container)
性能提升:
- 冷启动:2000-3000ms
- 池化后:< 50ms(从池中获取)
5.3 记忆系统优化
向量检索(Qdrant)的延迟通常在 50-200ms,对于实时任务来说可以接受,但对于高频调用仍需优化:
优化策略 1:缓存热门记忆
from functools import lru_cache
class CachedMemorySystem(MemorySystem):
@lru_cache(maxsize=1000)
def retrieve(self, query: str, k: int = 5) -> list[str]:
return super().retrieve(query, k)
优化策略 2:异步批量检索
import asyncio
async def batch_retrieve(memories: list[str]) -> list[list[str]]:
"""批量检索(减少网络往返)"""
tasks = [memory_system.retrieve_async(q) for q in memories]
return await asyncio.gather(*tasks)
5.4 LLM 调用优化
DeerFlow 大量使用 LLM(GPT-4o、Claude 3.5),如何降低成本和提高速度?
策略 1:使用更快的模型做简单任务
def select_model(task_complexity: int) -> str:
"""根据任务复杂度选择模型"""
if task_complexity <= 3:
return "gpt-4o-mini" # 快速、便宜
elif task_complexity <= 7:
return "gpt-4o"
else:
return "claude-3.5-opus" # 最强推理
策略 2:流式输出(Streaming)
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
llm = ChatOpenAI(
model="gpt-4o",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
策略 3:Prompt Caching(提示词缓存)
Anthropic Claude 3.5 支持 Prompt Caching,可以缓存系统提示词(System Prompt),降低延迟和成本:
from anthropic import Anthropic
client = Anthropic()
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
system=[{
"type": "text",
"text": "You are a code reviewer...",
"cache_control": {"type": "ephemeral"} # 缓存 5 分钟
}],
messages=[{"role": "user", "content": "Review my code..."}]
)
六、实战案例:3 小时完成创新药临床试验深度研究报告
6.1 任务描述
用户需求:
"我需要在 3 小时内完成一份关于‘PD-1 抑制剂在非小细胞肺癌一线治疗中的临床试验进展'的深度研究报告,包括:
- 检索最近 3 年的 Arxiv 和 PubMed 论文
- 提取关键数据(ORR、PFS、OS)
- 生成对比表格
- 分析趋势和模式
- 生成结构化报告(PDF + 播客脚本)"
6.2 DeerFlow 执行流程
用户请求
│
▼
┌─────────────────┐
│ Lead Agent │ (理解任务、拆解子任务)
│ - 任务拆解 │ → ["文献检索", "数据提取", "趋势分析", "报告生成"]
│ - 分配子智能体 │
└─────────────────┘
│
├─── 并行执行 ────────────────────────────────┐
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Researcher │ │ Data Agent │
│ - 调用 Arxiv │ │ - 解析 PDF │
│ - 调用 PubMed│ │ - 提取数据 │
│ - 爬取网页 │ │ - 生成表格 │
└──────────────┘ └──────────────┘
│ │
└─────────────── 汇聚到 ───────────────────────┘
│
▼
┌─────────────────┐
│ Analyzer Agent │
│ - 统计分析 │
│ - 趋势识别 │
└─────────────────┘
│
▼
┌─────────────────┐
│ Reporter Agent │
│ - 生成 Markdown │
│ - 转换为 PDF │
│ - 生成播客脚本 │
└─────────────────┘
6.3 关键代码实现
步骤 1:文献检索(Researcher Agent)
# agents/researcher.py
from agents import Agent, function_tool
import arxiv
import requests
@function_tool
def search_arxiv(query: str, max_results: int = 20):
"""搜索 Arxiv 论文"""
client = arxiv.Client()
search = arxiv.Search(
query=query,
max_results=max_results,
sort_by=arxiv.SortCriterion.Relevance
)
results = []
for result in client.results(search):
results.append({
"title": result.title,
"authors": [a.name for a in result.authors],
"summary": result.summary,
"pdf_url": result.pdf_url
})
return results
@function_tool
def search_pubmed(query: str, max_results: int = 20):
"""搜索 PubMed 论文"""
# 使用 NCBI E-utilities API
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
search_url = f"{base_url}esearch.fcgi"
params = {
"db": "pubmed",
"term": query,
"retmax": max_results,
"retmode": "json"
}
response = requests.get(search_url, params=params)
pmids = response.json()["esearchresult"]["idlist"]
# 获取详细信息
fetch_url = f"{base_url}esummary.fcgi"
fetch_params = {
"db": "pubmed",
"id": ",".join(pmids),
"retmode": "json"
}
fetch_response = requests.get(fetch_url, params=fetch_params)
results = []
for uid, data in fetch_response.json()["result"].items():
if uid == "uids":
continue
results.append({
"title": data.get("title", ""),
"authors": [a["name"] for a in data.get("authors", [])],
"journal": data.get("fulljournalname", ""),
"pubdate": data.get("pubdate", "")
})
return results
researcher_agent = Agent(
name="Researcher",
instructions="You are a literature review expert. Use the provided tools to search academic papers.",
tools=[search_arxiv, search_pubmed]
)
步骤 2:数据提取(Data Agent)
# agents/data_agent.py
from agents import Agent, function_tool
import pdfplumber
import pandas as pd
@function_tool
def extract_table_from_pdf(pdf_url: str) -> list[dict]:
"""从 PDF 中提取表格数据"""
# 下载 PDF
response = requests.get(pdf_url)
with open("/tmp/paper.pdf", "wb") as f:
f.write(response.content)
# 解析 PDF
tables = []
with pdfplumber.open("/tmp/paper.pdf") as pdf:
for page in pdf.pages:
for table in page.extract_tables():
tables.append(table)
# 转换为结构化数据
results = []
for table in tables:
headers = table[0]
for row in table[1:]:
row_dict = dict(zip(headers, row))
results.append(row_dict)
return results
@function_tool
def create_comparison_table(papers: list[dict]) -> str:
"""生成对比表格"""
df = pd.DataFrame(papers)
return df.to_markdown(index=False)
data_agent = Agent(
name="DataAgent",
instructions="You extract structured data from academic papers.",
tools=[extract_table_from_pdf, create_comparison_table]
)
步骤 3:报告生成(Reporter Agent)
# agents/reporter.py
from agents import Agent, function_tool
import markdown
import subprocess
@function_tool
def generate_pdf_report(markdown_content: str, output_path: str):
"""将 Markdown 转换为 PDF"""
# 使用 pandoc 转换
with open("/tmp/report.md", "w") as f:
f.write(markdown_content)
subprocess.run([
"pandoc", "/tmp/report.md",
"-o", output_path,
"--pdf-engine=xelatex",
"-V", "mainfont=Times New Roman"
])
return output_path
@function_tool
def generate_podcast_script(report_content: str) -> str:
"""生成播客脚本"""
prompt = f"""Convert the following research report into a podcast script
(host interviewing an expert). Make it engaging and accessible.
Report:
{report_content}
"""
# 调用 LLM 生成脚本
script = llm.invoke(prompt)
return script.content
reporter_agent = Agent(
name="Reporter",
instructions="You generate professional research reports.",
tools=[generate_pdf_report, generate_podcast_script]
)
6.4 执行结果
时间线:
- 0-30 分钟:Researcher Agent 检索到 47 篇相关论文
- 30-90 分钟:Data Agent 提取了 12 篇关键论文的数据
- 90-120 分钟:Analyzer Agent 完成了统计分析
- 120-180 分钟:Reporter Agent 生成了 25 页 PDF 报告和 15 分钟播客脚本
输出示例(报告片段):
# PD-1 抑制剂在 NSCLC 一线治疗中的临床试验进展(2023-2026)
## 核心发现
1. **ORR(客观缓解率)提升**:从 2023 年的 44.7%(Keytruda)提升至 2026 年的 58.3%(组合疗法)
2. **PFS(无进展生存期)延长**:中位 PFS 从 8.9 个月延长至 12.4 个月
3. **生物标志物进展**:PD-L1 表达 ≥ 50% 的患者获益最显著(HR = 0.62)
## 对比表格
| 试验名称 | 药物 | 样本量 | ORR (%) | PFS (月) | OS (月) |
|---------|------|--------|---------|----------|---------|
| KEYNOTE-189 | Pembrolizumab + 化疗 | 616 | 47.6 | 9.0 | 22.0 |
| CheckMate 227 | Nivolumab + Ipilimumab | 577 | 45.3 | 10.4 | 23.0 |
| ... | ... | ... | ... | ... | ... |
七、总结与展望:DeerFlow 的产业价值
7.1 技术亮点总结
| 维度 | 创新点 |
|---|---|
| 架构设计 | 14 层中间件栈,关注点分离 |
| 安全隔离 | Docker Sandbox Pool,池化降低延迟 |
| 可扩展性 | Markdown 技能系统,MCP 协议支持 |
| 性能优化 | 并行编排、Prompt Caching、沙箱池化 |
| 记忆系统 | 短期 + 长期 + 向量检索 |
7.2 适用场景
DeerFlow 最适合以下场景:
- 深度研究任务:需要数小时的信息收集和分析
- 代码生成 + 验证:生成代码后在沙箱中执行验证
- 数据分析流水线:多步骤的 ETL + 分析 + 可视化
- 内容生成:报告、播客、PPT 的自动化生成
7.3 局限性
- 学习曲线陡峭:需要理解 LangGraph、Docker、中间件等概念
- 资源消耗大:每个任务需要独立的 Docker 容器(内存占用 ~ 500MB/容器)
- LLM 成本高:复杂任务可能需要数千次 LLM 调用
7.4 未来展望
根据 DeerFlow 的 Roadmap,未来版本将支持:
- 分布式编排:跨多台机器的智能体协作
- RLHF 微调:根据用户反馈优化智能体策略
- 多模态支持:处理图像、视频、音频
- 边缘部署:在资源受限的设备上运行(如树莓派)
八、参考资料
- DeerFlow 官方仓库:https://github.com/bytedance/deer-flow
- LangGraph 文档:https://langchain-ai.github.io/langgraph/
- MCP 协议规范:https://modelcontextprotocol.io/
- Docker SDK for Python:https://docker-py.readthedocs.io/
- Qdrant 向量数据库:https://qdrant.tech/
作者简介:程序员茄子,10 年全栈开发经验,热爱开源技术和 AI 应用落地。欢迎关注我的博客 https://www.chenxutan.com 获取更多深度技术文章。
版权声明:本文为原创内容,未经授权禁止转载。如需转载,请联系作者获取许可。