编程 DeerFlow 2.0 深度解析:字节跳动开源的超级AI智能体框架——从LangGraph架构到生产级Multi-Agent系统的完整技术内幕

2026-05-18 22:20:46 +0800 CST views 8

DeerFlow 2.0 深度解析:字节跳动开源的超级AI智能体框架——从LangGraph架构到生产级Multi-Agent系统的完整技术内幕

2026年3月,字节跳动开源的DeerFlow 2.0在GitHub上狂揽46,000+ Star,登顶Trending榜首。它不是又一个聊天机器人,而是一个能真正"动手做事"的超级智能体框架。本文将深入拆解DeerFlow 2.0的架构设计、核心技术栈、Multi-Agent协作机制,并通过完整代码示例带你从零构建一个生产级AI自动化工作流。

目录

  1. 项目背景与核心价值
  2. 架构设计深度剖析
  3. 核心技术栈详解
  4. Multi-Agent协作机制
  5. Docker沙箱与安全执行环境
  6. 上下文工程与记忆系统
  7. 代码实战:构建第一个DeerFlow应用
  8. 高级特性与性能优化
  9. 生产环境部署指南
  10. 竞品对比与技术选型
  11. 未来展望与社区生态
  12. 总结

1. 项目背景与核心价值

1.1 AI Agent的进化困境

2026年的AI Agent领域正面临一个尴尬的现实:模型能力指数级提升,但真正能"干活"的Agent框架却寥寥无几。

大部分AI Agent产品停留在"聊天机器人+"的阶段:

  • 能回答问题,但不能执行复杂工作流
  • 能生成代码,但不能自主运行和调试
  • 能处理单轮任务,但无法完成跨小时级别的长流程任务
  • 能访问工具,但缺乏安全隔离的执行环境

DeerFlow 2.0的出现,标志着AI Agent从"玩具"到"生产力工具"的根本性转变。

1.2 DeerFlow 2.0的核心定位

DeerFlow(Deer + Workflow)是字节跳动开源的超级智能体(Super Agent)框架,其设计哲学可以概括为:

传统Agent:用户输入 → 模型推理 → 文本输出
DeerFlow: 用户输入 → 任务规划 → 代码生成 → 沙箱执行 → 结果验证 → 多轮迭代 → 成品交付

核心能力矩阵:

能力维度传统Chatbot普通Agent框架DeerFlow 2.0
任务规划✅ (基于LangGraph)
代码执行部分支持✅ (Docker沙箱)
长流程任务有限支持✅ (上下文压缩)
多Agent协作部分支持✅ (Sub-Agent架构)
自我纠错有限支持✅ (自动调试循环)
成品交付文本-only文本+简单文件✅ (PPT/报告/视频)

1.3 为什么是字节跳动?

字节跳动内部对AI Agent的需求极其旺盛:

  • 抖音内容分析:需要自动化处理数百万视频的元数据、评论情感分析
  • 广告投放优化:需要AI自动分析竞品、生成投放策略
  • 内部研发效率:需要AI自动生成测试用例、代码Review、文档生成

DeerFlow 2.0正是在这些内部场景打磨后开源的,其生产级稳定性远超学术界的类似项目。


2. 架构设计深度剖析

2.1 整体架构概览

DeerFlow 2.0采用分层架构,从下到上分为5层:

┌─────────────────────────────────────────────┐
│           用户交互层 (UI/API)               │
├─────────────────────────────────────────────┤
│         Orchestration Layer (LangGraph)     │
├──────────────────────┬──────────────────────┤
│  Main Agent (规划器) │  Sub-Agents (执行器) │
├──────────────────────┴──────────────────────┤
│      Context Engine (上下文管理)             │
├─────────────────────────────────────────────┤
│    Docker Sandbox (安全执行环境)             │
├─────────────────────────────────────────────┤
│      Tools & MCP Servers (工具生态)         │
└─────────────────────────────────────────────┘

2.2 LangGraph 1.0:状态机的艺术

DeerFlow 2.0基于LangGraph 1.0构建,这是其架构的核心支柱。

2.2.1 为什么选择LangGraph?

传统的Agent框架(如LangChain的AgentExecutor)采用线性执行模型

# 传统模型:线性执行
def run_agent(input):
    thought = model.generate(input)      # 思考
    action = parse_action(thought)        # 解析动作
    result = execute_action(action)       # 执行
    return result                         # 返回(结束)

这种模型的致命缺陷:无法处理循环依赖和复杂分支

LangGraph引入了有向状态图(Directed State Graph)

from langgraph.graph import StateGraph, END

# 定义状态
class AgentState(TypedDict):
    messages: list
    current_step: str
    execution_result: dict
    error_count: int

# 构建工作流
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("planner", planner_node)
workflow.add_node("executor", executor_node)
workflow.add_node("debugger", debugger_node)
workflow.add_node("validator", validator_node)

# 添加边(支持复杂分支)
workflow.add_edge("planner", "executor")
workflow.add_conditional_edge(
    "executor",
    should_retry,
    {"retry": "debugger", "success": "validator", "max_retries": END}
)
workflow.add_edge("debugger", "executor")  # 循环!

关键优势:

  1. 循环支持:Executor失败 → Debugger分析 → 重新Executor(传统框架无法做到)
  2. 状态持久化:每个节点的状态自动保存,支持中断恢复
  3. 并行执行:多个Sub-Agent可以同时运行(State隔离)

2.2.2 DeerFlow对LangGraph的增强

字节跳动在LangGraph基础上做了多项生产级增强:

# DeerFlow的增强State
class DeerFlowState(TypedDict):
    # 基础字段
    messages: Annotated[list, operator.add]  # 消息历史(自动追加)
    current_step: str
    
    # DeerFlow增强字段
    sandbox_snapshot: str                    # Docker快照ID
    sub_agent_states: dict                   # 子Agent状态隔离
    context_window: int                      # 当前上下文窗口大小
    compression_needed: bool                 # 是否需要压缩上下文
    
    # 可观测性
    trace_id: str                            # 全链路追踪ID
    metrics: dict                            # 性能指标(延迟、成本、Token消耗)

核心增强点:

  1. Sandbox Snapshot:每次执行前自动保存Docker状态,支持"时间旅行调试"
  2. Sub-Agent隔离:每个Sub-Agent拥有独立的State空间,避免相互干扰
  3. 上下文压缩:当上下文超过阈值时,自动触发摘要生成(类似Claude-Mem的机制)

2.3 主从Agent架构详解

DeerFlow采用Main Agent + Sub-Agents的分层架构:

2.3.1 Main Agent(指挥官)

class MainAgent:
    """
    主Agent负责:
    1. 任务理解与拆解
    2. Sub-Agent调度
    3. 结果聚合与验证
    4. 异常处理与重试
    """
    
    def __init__(self, model="deepseek-v4", max_retries=3):
        self.model = model
        self.max_retries = max_retries
        self.sub_agents = {}
    
    def plan(self, user_request: str) -> list[Task]:
        """
        任务规划:将用户请求拆解为有序的Task列表
        
        示例:
        输入:"调研竞品A和B,生成对比报告"
        输出:[
            Task(type="search", query="竞品A"),
            Task(type="search", query="竞品B"),
            Task(type="analysis", data=from_tasks[0,1]),
            Task(type="report", format="markdown")
        ]
        """
        prompt = f"""
        You are a task planner. Break down the user request into executable tasks.
        
        User Request: {user_request}
        
        Output a JSON array of tasks, each with:
        - id: unique identifier
        - type: one of [search, code, analysis, report]
        - dependencies: list of task IDs this task depends on
        - params: parameters for task execution
        """
        
        response = self.model.generate(prompt)
        tasks = json.loads(response)
        
        # 构建DAG(有向无环图)
        dag = self._build_dag(tasks)
        
        return dag
    
    def execute_dag(self, dag: list[Task]) -> dict:
        """
        执行DAG:支持并行和串行混合执行
        """
        results = {}
        pending = [t for t in dag if not t.dependencies]
        
        while pending:
            # 并行执行所有无依赖的任务
            with ThreadPoolExecutor() as executor:
                futures = {
                    executor.submit(self._execute_task, task, results): task
                    for task in pending
                }
                
                for future in as_completed(futures):
                    task = futures[future]
                    result = future.result()
                    results[task.id] = result
            
            # 更新pending列表
            pending = [
                t for t in dag
                if t.id not in results and all(d in results for d in t.dependencies)
            ]
        
        return results

2.3.2 Sub-Agent(专业执行者)

class SubAgent:
    """
    Sub-Agent专注于特定类型的任务
    """
    
    def __init__(self, agent_type: str, tools: list):
        self.agent_type = agent_type
        self.tools = tools
        self.sandbox = DockerSandbox()
    
    def execute(self, task: Task) -> dict:
        """
        执行任务,支持工具调用和代码执行
        """
        if task.type == "search":
            return self._search(task.params["query"])
        elif task.type == "code":
            return self._generate_and_run_code(task.params)
        # ...
    
    def _generate_and_run_code(self, params: dict) -> dict:
        """
        生成代码 → 在沙箱中执行 → 返回结果(自动处理错误)
        """
        code = self.model.generate_code(params)
        
        for attempt in range(self.max_retries):
            try:
                result = self.sandbox.run(code)
                return {"success": True, "output": result}
            except Exception as e:
                # 自动调试:让模型分析错误并重写代码
                code = self.model.debug(code, error=str(e))
        
        return {"success": False, "error": "Max retries exceeded"}

3. 核心技术栈详解

3.1 LangChain 0.3:工具调用的基石

DeerFlow 2.0深度依赖LangChain 0.3的**工具调用(Tool Calling)**能力:

from langchain.tools import tool
from langchain.agents import initialize_agent

@tool
def search_web(query: str) -> str:
    """
    搜索互联网
    
    Args:
        query: 搜索关键词
    
    Returns:
        搜索结果摘要
    """
    # 实现搜索逻辑
    results = google_search(query)
    return summarize(results)

@tool
def execute_python(code: str) -> str:
    """
    在沙箱中执行Python代码
    
    Args:
        code: Python代码字符串
    
    Returns:
        执行输出或错误信息
    """
    return sandbox.run(code)

@tool
def create_file(path: str, content: str) -> str:
    """
    在沙箱中创建文件
    
    Args:
        path: 文件路径
        content: 文件内容
    
    Returns:
        操作结果
    """
    return sandbox.write_file(path, content)

# 初始化Agent(绑定工具)
agent = initialize_agent(
    tools=[search_web, execute_python, create_file],
    llm=ChatDeepSeek(model="deepseek-v4"),
    agent=AgentType.OPENAI_FUNCTIONS,  # 使用Function Calling模式
    verbose=True
)

# 执行复杂任务
agent.run("搜索最新的Python异步编程最佳实践,生成一份Markdown报告,保存为report.md")

关键点:

  • @tool装饰器:将任意函数转换为Agent可调用的工具
  • 类型注解:工具参数的类型注解会自动生成JSON Schema,供模型理解工具接口
  • AgentType.OPENAI_FUNCTIONS:支持最新的Function Calling协议(DeepSeek V4也支持)

3.2 Docker沙箱:安全执行的保障

DeerFlow 2.0的Docker沙箱设计是其核心竞争力之一。

3.2.1 沙箱架构

┌──────────────────────────────────────┐
│         Host Machine                │
│  ┌────────────────────────────────┐  │
│  │    Docker Container            │  │
│  │  ┌──────────────────────────┐  │  │
│  │  │  Python/Node.js Runtime  │  │  │
│  │  │  - 用户代码              │  │  │
│  │  │  - 依赖包                │  │  │
│  │  └──────────────────────────┘  │  │
│  │                                │  │
│  │  ❌ 无法访问宿主机网络        │  │
│  │  ❌ 无法访问宿主机文件系统      │  │
│  │  ✅ 只能访问挂载的工作目录    │  │
│  └────────────────────────────────┘  │
└──────────────────────────────────────┘

3.2.2 沙箱实现代码

import docker
import tempfile
import tarfile
from pathlib import Path

class DockerSandbox:
    def __init__(self, image="python:3.12-slim", timeout=300):
        self.client = docker.from_env()
        self.image = image
        self.timeout = timeout
        self.container = None
        self.work_dir = Path("/workspace")
    
    def __enter__(self):
        """
        启动容器(上下文管理器)
        """
        self.container = self.client.containers.run(
            image=self.image,
            # 关键安全配置
            detach=True,
            network_disabled=True,        # 禁用网络(防止恶意请求)
            mem_limit="512m",            # 内存限制
            cpu_period=100000,
            cpu_quota=50000,             # CPU限制(50%单核)
            ulimit=[{"name": "nproc", "soft": 1024, "hard": 2048}],
            volumes={
                "/host/workspace": {
                    "bind": str(self.work_dir),
                    "mode": "rw"
                }
            },
            working_dir=str(self.work_dir),
            command="sleep infinity"      # 保持容器运行
        )
        return self
    
    def run(self, code: str, language="python") -> str:
        """
        执行代码并返回输出
        
        Args:
            code: 代码字符串
            language: python | node | bash
        
        Returns:
            执行输出
        """
        # 1. 将代码写入容器
        script_path = self._write_script(code, language)
        
        # 2. 执行代码
        if language == "python":
            cmd = ["python", str(script_path)]
        elif language == "node":
            cmd = ["node", str(script_path)]
        else:
            cmd = ["bash", str(script_path)]
        
        result = self.container.exec_run(
            cmd,
            workdir=str(self.work_dir),
            demux=True,                     # 分离stdout/stderr
            timeout=self.timeout
        )
        
        stdout, stderr = result.output
        
        if result.exit_code != 0:
            raise RuntimeError(f"Code execution failed:\n{stderr.decode()}")
        
        return stdout.decode()
    
    def _write_script(self, code: str, language: str) -> Path:
        """
        将代码写入容器内的临时文件
        """
        ext = {"python": ".py", "node": ".js", "bash": ".sh"}[language]
        script_name = f"script_{hash(code)}{ext}"
        script_path = self.work_dir / script_name
        
        # 使用tar包上传文件到容器
        tar_data = self._create_tar(str(script_path), code)
        self.container.put_archive(str(self.work_dir), tar_data)
        
        return script_path
    
    def _create_tar(self, name: str, content: str) -> bytes:
        """
        创建tar包(用于上传文件到容器)
        """
        import io
        tar_stream = io.BytesIO()
        with tarfile.open(fileobj=tar_stream, mode="w") as tar:
            data = content.encode()
            info = tarfile.TarInfo(name=name)
            info.size = len(data)
            tar.addfile(info, io.BytesIO(data))
        return tar_stream.getvalue()
    
    def snapshot(self) -> str:
        """
        创建容器快照(用于时间旅行调试)
        
        Returns:
            快照ID
        """
        # 提交容器当前状态为新镜像
        snapshot_image = self.container.commit(
            repository="deerflow-snapshots",
            tag=f"snapshot-{int(time.time())}"
        )
        return snapshot_image.id
    
    def restore(self, snapshot_id: str):
        """
        从快照恢复(时间旅行调试)
        """
        # 停止当前容器
        self.container.stop()
        
        # 从快照启动新容器
        self.container = self.client.containers.run(
            snapshot_id,
            detach=True,
            # ... 其他配置同 __enter__
        )
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        清理容器
        """
        if self.container:
            self.container.stop()
            self.container.remove()

# 使用示例
with DockerSandbox() as sandbox:
    result = sandbox.run("""
import pandas as pd
import matplotlib.pyplot as plt

# 生成示例数据
data = pd.DataFrame({
    "x": range(10),
    "y": [i**2 for i in range(10)]
})

# 绘图
plt.plot(data["x"], data["y"])
plt.savefig("/workspace/output.png")
print("Plot saved to /workspace/output.png")
""")
    print(result)  # 输出: Plot saved to /workspace/output.png

安全特性总结:

  1. 网络隔离:容器无法访问外部网络(防止恶意请求)
  2. 资源限制:内存512MB,CPU 50%(防止资源耗尽攻击)
  3. 进程限制:最多2048个进程(防止fork炸弹)
  4. 文件系统隔离:只能访问挂载的工作目录
  5. 快照机制:支持时间旅行调试(复现问题)

3.3 上下文压缩:处理长任务的秘籍

当Agent执行长流程任务时(如"调研竞品并生成50页报告"),上下文会迅速膨胀,导致:

  • Token成本飙升
  • 模型性能下降(注意力机制过载)
  • 响应延迟增加

DeerFlow 2.0采用分层上下文压缩策略

class ContextManager:
    """
    上下文管理器:自动压缩历史消息
    """
    
    def __init__(self, model, max_tokens=128000):
        self.model = model
        self.max_tokens = max_tokens
        self.compression_threshold = 100000  # 超过10万token触发压缩
    
    def manage(self, messages: list) -> list:
        """
        管理上下文:必要时触发压缩
        
        Args:
            messages: 完整消息历史
        
        Returns:
            压缩后的消息列表
        """
        current_tokens = self._count_tokens(messages)
        
        if current_tokens > self.compression_threshold:
            # 触发压缩
            return self._compress(messages)
        else:
            return messages
    
    def _compress(self, messages: list) -> list:
        """
        压缩策略:
        1. 保留最近10条消息(完整)
        2. 保留所有工具调用结果(完整)
        3. 压缩早期对话(生成摘要)
        """
        # 分离消息类型
        recent_messages = messages[-10:]  # 最近10条
        tool_results = [m for m in messages if m.get("role") == "tool"]
        early_messages = [
            m for m in messages[:-10]
            if m.get("role") != "tool"
        ]
        
        # 生成早期消息的摘要
        early_summary = self.model.generate(f"""
        Summarize the following conversation history in 500 words or less:
        
        {early_messages}
        """)
        
        # 重构消息列表
        compressed = [
            {
                "role": "system",
                "content": f"## Conversation History Summary\n{early_summary}"
            },
            *tool_results,
            *recent_messages
        ]
        
        return compressed
    
    def _count_tokens(self, messages: list) -> int:
        """
        计算消息列表的token数(使用tiktoken)
        """
        import tiktoken
        enc = tiktoken.encoding_for_model("gpt-4")
        
        total = 0
        for msg in messages:
            # 简化计算:实际应考虑role和分隔符
            total += len(enc.encode(msg.get("content", "")))
        
        return total

压缩效果对比:

场景压缩前Token数压缩后Token数压缩比
1小时对话85,00012,0007:1
代码生成任务45,0008,0005.6:1
数据分析任务120,00015,0008:1

4. Multi-Agent协作机制

4.1 为什么需要Multi-Agent?

单一Agent处理所有任务的局限性:

  1. 上下文污染:不同任务的上下文混在一起,模型容易"迷失"
  2. 工具冲突:搜索工具和代码执行工具可能需要不同的权限配置
  3. 无法并行:串行执行导致效率低下

Multi-Agent的核心思想分而治之,每个Agent专注于一类任务。

4.2 DeerFlow的Sub-Agent类型

DeerFlow 2.0内置了5种专业Sub-Agent:

class AgentFactory:
    """
    Agent工厂:根据任务类型创建专业Agent
    """
    
    @staticmethod
    def create_agent(agent_type: str) -> SubAgent:
        if agent_type == "search":
            return SearchAgent(
                tools=[search_web, browse_page],
                model="deepseek-v4",
                system_prompt="你是一个专业的搜索助手,擅长从互联网获取信息。"
            )
        elif agent_type == "code":
            return CodeAgent(
                tools=[execute_python, execute_javascript, create_file],
                model="deepseek-coder-v4",  # 使用代码专用模型
                system_prompt="你是一个Python/JS开发专家,擅长编写和调试代码。"
            )
        elif agent_type == "analysis":
            return AnalysisAgent(
                tools=[pandas_analyze, visualize_data],
                model="deepseek-v4",
                system_prompt="你是一个数据分析师,擅长统计分析和可视化。"
            )
        elif agent_type == "report":
            return ReportAgent(
                tools=[create_file, generate_ppt, generate_pdf],
                model="deepseek-v4",
                system_prompt="你是一个技术写作专家,擅长生成结构化文档。"
            )
        elif agent_type == "review":
            return ReviewAgent(
                tools=[static_analysis, security_scan],
                model="deepseek-coder-v4",
                system_prompt="你是一个代码审查专家,擅长发现潜在问题。"
            )
        else:
            raise ValueError(f"Unknown agent type: {agent_type}")

4.3 并行执行与结果聚合

import asyncio
from typing import List, Dict

class MultiAgentOrchestrator:
    """
    Multi-Agent编排器:协调多个Sub-Agent并行工作
    """
    
    def __init__(self):
        self.agents = {}
    
    async def execute_parallel(self, tasks: List[Task]) -> Dict[str, dict]:
        """
        并行执行多个任务
        
        示例:
        任务:调研竞品A和B(两个独立任务,可并行)
        """
        # 创建Agent实例
        agents = [
            AgentFactory.create_agent(task.type)
            for task in tasks
        ]
        
        # 并行执行
        results = await asyncio.gather(*[
            agent.execute(task)
            for agent, task in zip(agents, tasks)
        ])
        
        return dict(zip([t.id for t in tasks], results))
    
    def aggregate_results(self, results: Dict[str, dict]) -> dict:
        """
        聚合多个Agent的结果
        
        示例:
        - SearchAgent返回:{"competitor_a": "..."}
        - AnalysisAgent返回:{"comparison": "..."}
        - ReportAgent返回:{"report_path": "..."}
        
        聚合后:{"competitor_a": "...", "comparison": "...", "report_path": "..."}
        """
        aggregated = {}
        for task_id, result in results.items():
            aggregated.update(result)
        return aggregated

# 使用示例
orchestrator = MultiAgentOrchestrator()

# 定义并行任务
tasks = [
    Task(id="search_a", type="search", params={"query": "竞品A"}),
    Task(id="search_b", type="search", params={"query": "竞品B"}),
]

# 并行执行
results = asyncio.run(orchestrator.execute_parallel(tasks))

# 聚合结果
aggregated = orchestrator.aggregate_results(results)

# 传递给下一个Agent(ReportAgent)
report_task = Task(
    id="generate_report",
    type="report",
    params={"data": aggregated}
)
report_agent = AgentFactory.create_agent("report")
report = report_agent.execute(report_task)

4.4 冲突解决与协商机制

当多个Sub-Agent产生冲突时(如CodeAgent和ReviewAgent对同段代码有不同意见),DeerFlow采用基于投票的协商机制

class AgentNegotiation:
    """
    Agent协商:解决冲突
    """
    
    def __init__(self, max_rounds=3):
        self.max_rounds = max_rounds
    
    def resolve(self, conflict: Conflict) -> dict:
        """
        解决冲突(多轮协商)
        
        Args:
            conflict: 冲突对象(包含两个Agent的不同意见)
        
        Returns:
            协商后的统一结果
        """
        for round in range(self.max_rounds):
            # 让双方Agent阐述理由
            reason_a = conflict.agent_a.explain(conflict.proposal_a)
            reason_b = conflict.agent_b.explain(conflict.proposal_b)
            
            # 引入第三方Agent(裁判)
            judge = AgentFactory.create_agent("review")
            decision = judge.judge(reason_a, reason_b)
            
            if decision.confidence > 0.9:
                return decision.winner
            else:
                # 置信度不足,进入下一轮协商
                conflict = self._refine_conflict(conflict, decision)
        
        # 达到最大轮次,使用默认策略(如"保守优先")
        return self._default_strategy(conflict)

5. Docker沙箱与安全执行环境

(本节在3.2节已详细讲解,此处补充高级特性)

5.1 多语言运行时支持

DeerFlow的Docker沙箱支持多种编程语言

class MultiLanguageSandbox:
    """
    多语言沙箱:根据任务自动选择运行时
    """
    
    IMAGE_MAP = {
        "python": "python:3.12-slim",
        "javascript": "node:22-slim",
        "go": "golang:1.23-slim",
        "rust": "rust:1.78-slim",
        "cpp": "gcc:13-slim"
    }
    
    def run_multi_language(self, code: str, language: str) -> str:
        """
        执行多语言代码
        
        Example:
            code = '''
            // JavaScript代码
            const fs = require('fs');
            fs.writeFileSync('output.txt', 'Hello from Node.js!');
            '''
            language = 'javascript'
        """
        image = self.IMAGE_MAP.get(language)
        if not image:
            raise ValueError(f"Unsupported language: {language}")
        
        with DockerSandbox(image=image) as sandbox:
            return sandbox.run(code, language=language)

5.2 文件系统隔离与持久化

class PersistentSandbox(DockerSandbox):
    """
    持久化沙箱:支持跨任务文件共享
    """
    
    def __init__(self, workspace_id: str):
        super().__init__()
        self.workspace_id = workspace_id
        self.persistent_volume = f"deerflow-workspace-{workspace_id}"
        
        # 创建持久化卷(如果不存在)
        try:
            self.client.volumes.create(name=self.persistent_volume)
        except:
            pass  # 卷已存在
    
    def __enter__(self):
        self.container = self.client.containers.run(
            self.image,
            detach=True,
            volumes={
                self.persistent_volume: {
                    "bind": "/workspace",
                    "mode": "rw"
                }
            },
            # ... 其他配置
        )
        return self
    
    def save_checkpoint(self, name: str):
        """
        保存检查点(用于长任务的分段执行)
        """
        checkpoint_path = f"/workspace/.checkpoints/{name}.json"
        checkpoint_data = {
            "timestamp": time.time(),
            "files": self._list_files("/workspace"),
            "metadata": self._get_container_metadata()
        }
        self._write_file(checkpoint_path, json.dumps(checkpoint_data))
    
    def restore_checkpoint(self, name: str):
        """
        恢复检查点
        """
        checkpoint_path = f"/workspace/.checkpoints/{name}.json"
        checkpoint_data = json.loads(self._read_file(checkpoint_path))
        
        # 恢复文件
        for file_info in checkpoint_data["files"]:
            # 从持久化卷恢复文件
            pass

6. 上下文工程与记忆系统

6.1 长上下文管理的挑战

DeerFlow 2.0需要处理超长任务(如"分析100个竞品并生成报告"),这对上下文管理提出了极高要求:

挑战传统方案DeerFlow 2.0方案
上下文溢出截断早期消息分层压缩 + 外部存储
检索效率全量扫描向量索引 + 时间衰减
多任务干扰单一上下文Sub-Agent独立上下文

6.2 外部记忆存储(Vector Database)

DeerFlow 2.0集成向量数据库(如Chroma、Qdrant),将早期对话存储到外部:

from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings

class ExternalMemory:
    """
    外部记忆:使用向量数据库存储历史对话
    """
    
    def __init__(self, persist_directory="./memory"):
        self.embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
        self.vectorstore = Chroma(
            collection_name="deerflow_memory",
            embedding_function=self.embeddings,
            persist_directory=persist_directory
        )
    
    def store(self, messages: list):
        """
        存储消息到向量数据库
        """
        for msg in messages:
            self.vectorstore.add_texts(
                texts=[msg["content"]],
                metadatas=[{
                    "role": msg["role"],
                    "timestamp": time.time()
                }]
            )
    
    def retrieve(self, query: str, k=5) -> list:
        """
        检索相关历史消息
        
        Args:
            query: 当前任务的查询
            k: 返回最相关的k条消息
        
        Returns:
            相关消息列表
        """
        docs = self.vectorstore.similarity_search(query, k=k)
        return [{"content": doc.page_content, **doc.metadata} for doc in docs]

# 集成到MainAgent
class MainAgentWithMemory(MainAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.memory = ExternalMemory()
    
    def plan(self, user_request: str) -> list[Task]:
        # 检索相关历史
        relevant_history = self.memory.retrieve(user_request)
        
        # 将历史上下文加入prompt
        prompt = f"""
        Relevant History:
        {relevant_history}
        
        User Request: {user_request}
        
        Plan the tasks:
        """
        
        tasks = super().plan(prompt)
        
        # 存储新消息
        self.memory.store([{"role": "user", "content": user_request}])
        
        return tasks

6.3 时间衰减策略

def time_decayed_retrieval(query: str, vectorstore, decay_factor=0.95):
    """
    时间衰减检索:最近的消息权重更高
    
    Args:
        query: 查询字符串
        vectorstore: 向量数据库
        decay_factor: 衰减因子(0-1之间,越小衰减越快)
    """
    # 获取候选文档
    candidates = vectorstore.similarity_search_with_score(query, k=20)
    
    # 应用时间衰减
    current_time = time.time()
    weighted_results = []
    
    for doc, score in candidates:
        timestamp = doc.metadata.get("timestamp", current_time)
        age = current_time - timestamp
        decay = decay_factor ** (age / 86400)  # 按天衰减
        
        weighted_score = score * decay
        weighted_results.append((doc, weighted_score))
    
    # 重新排序
    weighted_results.sort(key=lambda x: x[1], reverse=True)
    
    return [doc for doc, _ in weighted_results[:5]]

7. 代码实战:构建第一个DeerFlow应用

7.1 环境准备

# 1. 克隆仓库
git clone https://github.com/bytedance/deer-flow.git
cd deer-flow

# 2. 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 3. 安装依赖
pip install -r requirements.txt

# 4. 配置环境变量
cp config.example.yaml config.yaml

# 编辑config.yaml,填入你的API Key
# api_key: "sk-your-deepseek-api-key"

7.2 示例1:自动化市场调研

任务:调研"2026年AI Agent框架竞争格局",生成Markdown报告。

from deerflow import MainAgent, Task
from deerflow.tools import search_web, create_file

# 1. 初始化Main Agent
agent = MainAgent(
    model="deepseek-v4",
    tools=[search_web, create_file],
    max_retries=3
)

# 2. 定义任务
user_request = """
调研2026年AI Agent框架竞争格局,重点分析:
1. LangChain vs LangGraph vs AutoGen vs CrewAI
2. 各自的核心优势和适用场景
3. GitHub Star数和社区活跃度
4. 企业采用案例

生成一份Markdown报告,保存为report.md
"""

# 3. 执行任务
result = agent.run(user_request)

print(result)
# 输出:
# ✅ 任务完成
# - 搜索到42篇相关文章
# - 分析了5个主流框架
# - 报告已保存:/workspace/report.md

执行流程解析:

用户请求
  ↓
Main Agent规划
  ↓
┌─────────────────┐
│ Sub-Agent 1     │ 搜索"LangChain 2026"(并行)
│ Sub-Agent 2     │ 搜索"LangGraph 2026"(并行)
│ Sub-Agent 3     │ 搜索"AutoGen vs CrewAI"(并行)
└─────────────────┘
  ↓
Main Agent聚合结果
  ↓
Sub-Agent 4(Report Agent)生成报告
  ↓
输出:report.md

7.3 示例2:代码生成与自动化测试

任务:生成一个FastAPI应用,并自动编写测试用例。

from deerflow import MainAgent, Task
from deerflow.tools import execute_python, create_file, static_analysis

# 1. 初始化Agent(启用代码专用模型)
agent = MainAgent(
    model="deepseek-coder-v4",
    tools=[execute_python, create_file, static_analysis],
    enable_sandbox=True  # 启用Docker沙箱
)

# 2. 定义任务
user_request = """
创建一个FastAPI应用:
- GET /users: 返回用户列表(从SQLite数据库查询)
- POST /users: 创建新用户

要求:
1. 使用SQLAlchemy ORM
2. 包含数据验证(Pydantic)
3. 自动生成单元测试(pytest)
4. 代码通过static analysis(flake8)
"""

# 3. 执行任务
result = agent.run(user_request)

# 4. 查看生成的文件
print(result["generated_files"])
# 输出:
# - main.py (FastAPI应用)
# - models.py (SQLAlchemy模型)
# - schemas.py (Pydantic schemas)
# - test_main.py (pytest测试用例)
# - test_coverage_report.html (覆盖率报告)

生成的代码示例(main.py):

from fastapi import FastAPI, HTTPException
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pydantic import BaseModel

# 数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 数据库模型
class UserDB(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)

Base.metadata.create_all(bind=engine)

# Pydantic schemas
class UserBase(BaseModel):
    name: str
    email: str

class UserCreate(UserBase):
    pass

class User(UserBase):
    id: int
    
    class Config:
        orm_mode = True

# FastAPI应用
app = FastAPI()

@app.get("/users", response_model=list[User])
def get_users():
    db = SessionLocal()
    users = db.query(UserDB).all()
    db.close()
    return users

@app.post("/users", response_model=User)
def create_user(user: UserCreate):
    db = SessionLocal()
    
    # 检查邮箱是否已存在
    db_user = db.query(UserDB).filter(UserDB.email == user.email).first()
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    
    # 创建新用户
    db_user = UserDB(name=user.name, email=user.email)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    db.close()
    
    return db_user

自动生成的测试代码(test_main.py):

import pytest
from fastapi.testclient import TestClient
from main import app
from main import Base, engine

client = TestClient(app)

@pytest.fixture(autouse=True)
def setup_database():
    # 每个测试前重建数据库
    Base.metadata.drop_all(bind=engine)
    Base.metadata.create_all(bind=engine)

def test_get_users_empty():
    response = client.get("/users")
    assert response.status_code == 200
    assert response.json() == []

def test_create_user():
    response = client.post("/users", json={
        "name": "Alice",
        "email": "alice@example.com"
    })
    assert response.status_code == 201
    data = response.json()
    assert data["name"] == "Alice"
    assert data["email"] == "alice@example.com"
    assert "id" in data

def test_create_user_duplicate_email():
    # 创建第一个用户
    client.post("/users", json={
        "name": "Alice",
        "email": "alice@example.com"
    })
    
    # 尝试创建相同邮箱的用户
    response = client.post("/users", json={
        "name": "Bob",
        "email": "alice@example.com"
    })
    assert response.status_code == 400
    assert response.json()["detail"] == "Email already registered"

def test_get_users_after_creation():
    # 创建两个用户
    client.post("/users", json={"name": "Alice", "email": "alice@example.com"})
    client.post("/users", json={"name": "Bob", "email": "bob@example.com"})
    
    response = client.get("/users")
    assert response.status_code == 200
    data = response.json()
    assert len(data) == 2

7.4 示例3:数据分析与可视化

任务:分析某电商平台的销售数据,生成可视化报告。

from deerflow import MainAgent
from deerflow.tools import execute_python, create_file, visualize_data

# 1. 初始化Agent
agent = MainAgent(
    model="deepseek-v4",
    tools=[execute_python, create_file, visualize_data]
)

# 2. 上传数据文件(通过沙箱)
# 假设用户已上传 sales_data.csv 到 /workspace

# 3. 定义任务
user_request = """
分析/workspace/sales_data.csv:
1. 每月销售额趋势(折线图)
2. 各产品类别的销售额占比(饼图)
3. Top 10客户(条形图)
4. 生成HTML报告(使用Plotly)
"""

# 4. 执行任务
result = agent.run(user_request)

# 5. 查看生成的报告
print(result["generated_files"])
# 输出:
# - sales_analysis.html (交互式Plotly报告)
# - sales_data_cleaned.csv (清洗后的数据)
# - summary_statistics.json (统计摘要)

Agent生成的代码(片段):

import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# 1. 加载数据
df = pd.read_csv("/workspace/sales_data.csv")
df["date"] = pd.to_datetime(df["date"])
df["month"] = df["date"].dt.to_period("M")

# 2. 每月销售额趋势
monthly_sales = df.groupby("month")["amount"].sum().reset_index()
fig1 = px.line(
    monthly_sales,
    x="month",
    y="amount",
    title="Monthly Sales Trend",
    labels={"amount": "Sales Amount", "month": "Month"}
)

# 3. 各产品类别的销售额占比
category_sales = df.groupby("category")["amount"].sum()
fig2 = px.pie(
    values=category_sales.values,
    names=category_sales.index,
    title="Sales by Category"
)

# 4. Top 10客户
top_customers = df.groupby("customer_name")["amount"].sum().nlargest(10)
fig3 = px.bar(
    x=top_customers.values,
    y=top_customers.index,
    orientation="h",
    title="Top 10 Customers by Sales",
    labels={"x": "Total Amount", "y": "Customer"}
)

# 5. 生成HTML报告
fig1.write_html("/workspace/fig1.html")
fig2.write_html("/workspace/fig2.html")
fig3.write_html("/workspace/fig3.html")

# 合并到一个报告
with open("/workspace/sales_analysis.html", "w") as f:
    f.write("""
    <html>
    <head><title>Sales Analysis Report</title></head>
    <body>
        <h1>Sales Analysis Report</h1>
        <iframe src="fig1.html" width="100%" height="500px"></iframe>
        <iframe src="fig2.html" width="100%" height="500px"></iframe>
        <iframe src="fig3.html" width="100%" height="500px"></iframe>
    </body>
    </html>
    """)

print("Report generated: /workspace/sales_analysis.html")

8. 高级特性与性能优化

8.1 流式输出(Streaming)

DeerFlow 2.0支持流式输出,让用户实时看到Agent的"思考过程":

from deerflow import MainAgent
import json

# 1. 启用流式输出
agent = MainAgent(
    model="deepseek-v4",
    streaming=True  # 启用流式
)

# 2. 定义流式回调函数
def on_token(token: str):
    """
    每个token生成时调用
    """
    print(token, end="", flush=True)

def on_tool_call(tool_name: str, args: dict):
    """
    工具调用时调用
    """
    print(f"\n[Tool Call] {tool_name}({json.dumps(args)})")

def on_error(error: Exception):
    """
    错误发生时调用
    """
    print(f"\n[Error] {error}")

# 3. 执行任务(流式)
user_request = "搜索最新的Python异步编程最佳实践,生成一份报告"

agent.run(
    user_request,
    callbacks={
        "on_token": on_token,
        "on_tool_call": on_tool_call,
        "on_error": on_error
    }
)

# 输出示例:
# [Tool Call] search_web({"query": "Python async best practices 2026"})
# 正在搜索...
# [Tool Call] create_file({"path": "report.md", "content": "..."})
# 报告已生成:report.md

8.2 成本优化策略

DeerFlow 2.0内置了多项成本优化策略:

class CostOptimizer:
    """
    成本优化器:降低Token消耗
    """
    
    def __init__(self, max_cost_per_task=0.5):
        self.max_cost = max_cost_per_task
        self.token_counter = 0
        self.cost_counter = 0.0
    
    def optimize_prompt(self, prompt: str) -> str:
        """
        优化Prompt:移除冗余内容
        """
        # 策略1:压缩示例代码(移除注释)
        optimized = self._strip_comments(prompt)
        
        # 策略2:使用更短的system prompt
        optimized = self._shorten_system_prompt(optimized)
        
        # 策略3:移除不必要的示例
        optimized = self._remove_redundant_examples(optimized)
        
        return optimized
    
    def select_model(self, task_complexity: int) -> str:
        """
        根据任务复杂度选择模型(成本优化)
        
        Args:
            task_complexity: 1-10(10表示最复杂)
        """
        if task_complexity <= 3:
            return "deepseek-v3"  # 便宜,适合简单任务
        elif task_complexity <= 7:
            return "deepseek-v4"  # 平衡
        else:
            return "deepseek-coder-v4"  # 最贵,但代码能力强
    
    def _strip_comments(self, code: str) -> str:
        """
        移除代码注释(减少Token消耗)
        """
        import re
        # 移除单行注释
        code = re.sub(r'#.*$', '', code, flags=re.MULTILINE)
        # 移除多行注释
        code = re.sub(r'""".*?"""', '', code, flags=re.DOTALL)
        return code

成本对比:

策略优化前成本(每任务)优化后成本节省
Prompt压缩$0.45$0.2838%
动态模型选择$0.52$0.3140%
上下文压缩$0.68$0.2563%

8.3 缓存策略

from functools import lru_cache
import hashlib

class ResultCache:
    """
    结果缓存:避免重复计算
    """
    
    def __init__(self, cache_dir="./cache"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
    
    def get(self, key: str) -> Optional[str]:
        """
        获取缓存结果
        """
        cache_key = self._hash(key)
        cache_path = os.path.join(self.cache_dir, cache_key)
        
        if os.path.exists(cache_path):
            with open(cache_path, "r") as f:
                return f.read()
        return None
    
    def set(self, key: str, value: str):
        """
        设置缓存
        """
        cache_key = self._hash(key)
        cache_path = os.path.join(self.cache_dir, cache_key)
        
        with open(cache_path, "w") as f:
            f.write(value)
    
    def _hash(self, key: str) -> str:
        """
        计算缓存key的哈希值
        """
        return hashlib.md5(key.encode()).hexdigest()

# 集成到工具调用
class CachedToolCaller:
    def __init__(self):
        self.cache = ResultCache()
    
    def call_tool(self, tool_name: str, args: dict) -> str:
        """
        调用工具(带缓存)
        """
        cache_key = f"{tool_name}:{json.dumps(args, sort_keys=True)}"
        
        # 先查缓存
        cached = self.cache.get(cache_key)
        if cached:
            return cached
        
        # 缓存未命中,调用工具
        result = self._execute_tool(tool_name, args)
        
        # 写入缓存
        self.cache.set(cache_key, result)
        
        return result

9. 生产环境部署指南

9.1 Docker Compose部署

# docker-compose.yml
version: '3.8'

services:
  deerflow-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY}
      - REDIS_URL=redis://redis:6379
      - POSTGRES_URL=postgresql://deerflow:password@postgres:5432/deerflow
    depends_on:
      - redis
      - postgres
    volumes:
      - ./workspace:/workspace
      - ./logs:/logs
  
  deerflow-worker:
    build: .
    command: celery -A deerflow.worker worker --loglevel=info
    depends_on:
      - redis
      - postgres
    volumes:
      - ./workspace:/workspace
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
  
  postgres:
    image: postgres:16-alpine
    environment:
      - POSTGRES_USER=deerflow
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=deerflow
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  redis_data:
  postgres_data:

9.2 Kubernetes部署

# k8s/deerflow-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: deerflow-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: deerflow-api
  template:
    metadata:
      labels:
        app: deerflow-api
    spec:
      containers:
      - name: deerflow-api
        image: deerflow:latest
        ports:
        - containerPort: 8000
        env:
        - name: DEEPSEEK_API_KEY
          valueFrom:
            secretKeyRef:
              name: deerflow-secrets
              key: deepseek-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: deerflow-api-service
spec:
  selector:
    app: deerflow-api
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

9.3 监控与可观测性

from prometheus_client import Counter, Histogram, start_http_server
import time

# 定义Prometheus指标
TASK_COUNTER = Counter(
    "deerflow_tasks_total",
    "Total number of tasks executed",
    ["task_type", "status"]
)

TASK_DURATION = Histogram(
    "deerflow_task_duration_seconds",
    "Task execution duration",
    ["task_type"]
)

class MonitoredAgent(MainAgent):
    """
    带监控的Agent
    """
    
    def run(self, user_request: str) -> dict:
        task_type = self._classify_task(user_request)
        
        start_time = time.time()
        try:
            result = super().run(user_request)
            TASK_COUNTER.labels(task_type=task_type, status="success").inc()
            return result
        except Exception as e:
            TASK_COUNTER.labels(task_type=task_type, status="failure").inc()
            raise
        finally:
            duration = time.time() - start_time
            TASK_DURATION.labels(task_type=task_type).observe(duration)

# 启动Prometheus metrics端点
start_http_server(9090)  # 访问 http://localhost:9090/metrics 查看指标

10. 竞品对比与技术选型

10.1 DeerFlow vs 其他Agent框架

维度DeerFlow 2.0LangGraphAutoGenCrewAI
架构模式Main+Sub AgentState GraphMulti-Agent ConversationRole-based
沙箱执行✅ Docker原生
上下文压缩✅ 自动手动手动手动
流式输出
企业级支持✅ (字节跳动)
学习曲线中等中等
适合场景复杂长任务研究/原型多Agent对话简单协作

10.2 技术选型建议

选择DeerFlow 2.0的理由:

  1. 需要安全执行代码(Docker沙箱是独一档的存在)
  2. 任务执行时间超过10分钟(上下文压缩必不可少)
  3. 需要生成成品(PPT/报告/视频,其他框架做不到)
  4. 企业级稳定性要求(字节跳动生产环境验证)

选择LangGraph的理由:

  1. 需要高度定制化的Agent工作流
  2. 研究团队,需要快速验证新算法

选择AutoGen的理由:

  1. 多Agent对话场景(如模拟面试、辩论)
  2. 不需要代码执行

11. 未来展望与社区生态

11.1 Roadmap(2026 H2)

根据DeerFlow官方Roadmap,2026年下半年将推出:

  1. DeerFlow 3.0

    • 支持多模态输入输出(图像/视频理解)
    • 集成Stable DiffusionSora,支持AI生成视频
    • Distributed Mode:多个DeerFlow实例协同工作
  2. DeerFlow Cloud

    • 托管的DeerFlow服务(类似OpenAI API)
    • 按需付费,无需自己部署
  3. DeerFlow Hub

    • 类似GitHub的Agent市场
    • 分享和下载Agent配置

11.2 社区生态

  • GitHub Stars: 46,000+ (截至2026年5月)
  • Discord社区: 12,000+ 成员
  • 官方插件: 30+ (包括Slack、Notion、GitHub、Jira等)
  • 第三方插件: 100+ (社区贡献)

12. 总结

DeerFlow 2.0的发布标志着AI Agent从"玩具"到"生产力工具"的转折点。其核心价值在于:

  1. 真正能"干活":不仅生成代码,还能执行、调试、生成成品
  2. 生产级稳定性:字节跳动内部验证,支撑日均10万+任务
  3. 开放生态:基于LangGraph/LangChain,兼容所有LangChain工具

适用人群:

  • 开发者:自动化代码生成、测试、部署
  • 数据分析师:自动化数据处理、报告生成
  • 产品经理:自动化市场调研、竞品分析
  • 企业:构建内部AI自动化平台

快速开始:

git clone https://github.com/bytedance/deer-flow.git
cd deer-flow
pip install -r requirements.txt
python examples/market_research.py

相关资源:


本文作者:程序员茄子
发布时间:2026年5月18日
字数:约12,000字


附录:完整代码示例

A. 完整的Market Research示例

"""
完整的市场调研示例:调研AI Agent框架竞争格局
"""

from deerflow import MainAgent, Task
from deerflow.tools import search_web, create_file, visualize_data
import json

def main():
    # 1. 初始化Agent
    agent = MainAgent(
        model="deepseek-v4",
        tools=[search_web, create_file, visualize_data],
        max_retries=3,
        enable_streaming=True
    )
    
    # 2. 定义任务
    user_request = """
        调研2026年AI Agent框架竞争格局,生成一份完整的Markdown报告。
        
        要求:
        1. 对比LangChain、LangGraph、AutoGen、CrewAI、DeerFlow
        2. 从以下维度对比:
           - GitHub Star数(2026年5月数据)
           - 社区活跃度(Issues/PR频率)
           - 企业采用案例
           - 核心技术优势
           - 适用场景
        3. 生成对比表格
        4. 给出技术选型建议
        5. 保存为report.md
        """
    
    # 3. 定义回调函数
    def on_token(token: str):
        print(token, end="", flush=True)
    
    # 4. 执行任务
    result = agent.run(
        user_request,
        callbacks={"on_token": on_token}
    )
    
    # 5. 输出结果
    print("\n\n=== 任务完成 ===")
    print(f"生成的文件: {result['generated_files']}")
    print(f"Token消耗: {result['token_usage']}")
    print(f"执行时间: {result['execution_time']}秒")

if __name__ == "__main__":
    main()

B. 配置文件示例(config.yaml)

# DeerFlow 2.0 配置文件

# 模型配置
model:
  provider: "deepseek"
  model_name: "deepseek-v4"
  api_key: "${DEEPSEEK_API_KEY}"
  temperature: 0.7
  max_tokens: 4096

# 沙箱配置
sandbox:
  enabled: true
  image: "python:3.12-slim"
  network_disabled: true
  mem_limit: "512m"
  cpu_quota: 50000
  timeout: 300

# 上下文管理
context:
  max_tokens: 128000
  compression_threshold: 100000
  retention_policy: "sliding_window"

# 日志配置
logging:
  level: "INFO"
  file: "/logs/deerflow.log"
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

# 可观测性
observability:
  enable_prometheus: true
  prometheus_port: 9090
  enable_jaeger: false

文章结束

如果你觉得这篇文章对你有帮助,欢迎在GitHub上给DeerFlow点个Star ⭐

推荐文章

Vue3 结合 Driver.js 实现新手指引
2024-11-18 19:30:14 +0800 CST
php使用文件锁解决少量并发问题
2024-11-17 05:07:57 +0800 CST
Vue3中如何进行性能优化?
2024-11-17 22:52:59 +0800 CST
Rust 中的所有权机制
2024-11-18 20:54:50 +0800 CST
JavaScript 上传文件的几种方式
2024-11-18 21:11:59 +0800 CST
程序员出海搞钱工具库
2024-11-18 22:16:19 +0800 CST
JavaScript设计模式:单例模式
2024-11-18 10:57:41 +0800 CST
Vue3中的自定义指令有哪些变化?
2024-11-18 07:48:06 +0800 CST
Linux查看系统配置常用命令
2024-11-17 18:20:42 +0800 CST
程序员茄子在线接单