编程 DeerFlow 2.0 深度解析:63K Star 的超级智能体执行底座,从架构哲学到生产实战

2026-04-26 13:13:17 +0800 CST views 4

DeerFlow 2.0 深度解析:63K Star 的超级智能体执行底座,从架构哲学到生产实战

写在前面

2026 年 2 月 28 日,字节跳动把 DeerFlow 2.0 推向开源社区,24 小时内冲上 GitHub Trending 第一名,三个月后 Star 数突破 6.3 万,Fork 超 8300。这不是又一个套壳聊天机器人——它是一个从底层重新定义"AI Agent 该怎么干活"的超级智能体执行底座(Super Agent Harness)。

作为一个从 1.0 时代就在关注 DeerFlow 的开发者,我想说的不是"又一个框架",而是一个更本质的问题:当 AI 从"对话工具"走向"执行系统",我们到底需要什么样的基础设施?

这篇文章不搬运 README,不罗列功能清单。我会从架构设计的底层逻辑出发,拆解 DeerFlow 2.0 的每一个关键决策,配合完整的代码实战,让你真正理解它为什么能成为 2026 年最火的 AI Agent 框架。


一、从 Deep Research 到 Super Agent Harness:一次范式跃迁

1.1 1.0 的局限:研究助手的天花板

DeerFlow 1.0 的定位很清晰——深度研究助手(Deep Exploration and Efficient Research Flow)。它能把一个研究问题拆成子任务,自动搜索、汇总、生成带引用的研究报告。在 2025 年 5 月开源后,社区跑出了很多意想不到的用法:数据管道、幻灯片生成、仪表盘搭建、内容自动化工作流……

这暴露了一个核心矛盾:DeerFlow 1.0 是一个"你把它拼起来"的框架,不是一个"开箱即用"的运行时。

开发者不得不自己处理:

  • 子任务之间的状态传递和错误恢复
  • 代码执行的安全隔离
  • 长时程任务的上下文管理
  • 跨会话的记忆持久化

这些问题不是 DeerFlow 独有的,而是整个 AI Agent 行业的通病。

1.2 2.0 的决断:零代码共享的全量重构

2025 年 9 月,字节团队启动 2.0 版本的全量重构。和 1.x 分支共享的代码为零,整个架构基于 LangGraph 1.0 + LangChain 技术栈完全重写。

这个决策背后的哲学转变才是关键:

维度DeerFlow 1.0DeerFlow 2.0
定位深度研究框架超级智能体执行底座
架构链式管道(Pipeline)编排式运行时(Harness)
能力扩展改代码加 Skill
执行环境宿主机直接运行Docker 沙箱隔离
上下文管理单会话全量分层压缩 + 文件系统卸载
记忆无持久化跨会话长期记忆
子任务串行执行并行子智能体

从 Pipeline 到 Harness,不只是换个词。Pipeline 是"按顺序执行步骤",Harness 是"给 Agent 一台真正的计算机,让它自己决定怎么干活"。

1.3 为什么是"执行优先"

传统 AI 对话框架的核心假设是:用户发一条消息,模型回一条消息。交互模型是请求-响应。

DeerFlow 2.0 的核心假设是:用户提一个目标,Agent 自主规划、拆解、执行、汇报,全程可能持续几分钟到几小时。 交互模型是目标-执行-结果。

这个转变要求底层架构从根本上支持:

  1. 自主闭环工作流——从任务拆解、信息收集、代码执行到报告生成,无需人工干预
  2. 渐进式技能加载——技能仅在任务需要时才被加载到上下文中,而非一次性全部载入
  3. 安全隔离执行——每个任务运行在独立的 Docker 容器中,Agent 拥有完整的文件系统和 Bash 执行能力

这三个能力构成了 DeerFlow 2.0 的架构三角:自主性、高效性、安全性


二、架构深度拆解:五层设计哲学

2.1 整体架构

DeerFlow 2.0 的架构可以分成五个清晰的层次:

┌──────────────────────────────────────────────────┐
│                   前端 UI 层                      │
│         Next.js + React (端口 3000)              │
├──────────────────────────────────────────────────┤
│                  API 网关层                       │
│         Gateway API (端口 8001)                   │
│    ┌──────────┐ ┌──────────┐ ┌──────────┐       │
│    │ IM 通道  │ │ REST API │ │ WebSocket│       │
│    └──────────┘ └──────────┘ └──────────┘       │
├──────────────────────────────────────────────────┤
│               智能体运行时层                      │
│         LangGraph Server (端口 2024)             │
│    ┌──────────────────────────────────┐          │
│    │        Lead Agent(主智能体)     │          │
│    │  ┌─────┐ ┌─────┐ ┌─────┐       │          │
│    │  │Sub- │ │Sub- │ │Sub- │ ...   │          │
│    │  │Agent│ │Agent│ │Agent│       │          │
│    │  └─────┘ └─────┘ └─────┘       │          │
│    └──────────────────────────────────┘          │
├──────────────────────────────────────────────────┤
│                基础设施层                         │
│    ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐         │
│    │沙箱  │ │记忆  │ │技能  │ │文件  │         │
│    │Sandbox│ │Memory│ │Skills│ │Files │         │
│    └──────┘ └──────┘ └──────┘ └──────┘         │
├──────────────────────────────────────────────────┤
│                 反向代理层                        │
│              Nginx (端口 2026)                   │
└──────────────────────────────────────────────────┘

这五层各司其职,但最值得关注的是中间三层:网关层、运行时层、基础设施层。它们之间的交互方式决定了 DeerFlow 的核心能力。

2.2 网关层:Agent 的 HTTP 入口

Gateway API 不仅仅是一个 REST 代理,它是整个系统的控制平面:

# Gateway API 的核心路由
# backend/packages/harness/deerflow/gateway/

# 线程管理
POST   /api/threads                    # 创建对话线程
GET    /api/threads/{thread_id}        # 获取线程详情
DELETE /api/threads/{thread_id}        # 删除线程

# 消息交互
POST   /api/threads/{thread_id}/chat   # 发送消息
POST   /api/threads/{thread_id}/stream # 流式响应

# 文件操作
POST   /api/threads/{thread_id}/files  # 上传文件
GET    /api/threads/{thread_id}/files  # 列出文件

# 配置管理
GET    /api/models                     # 列出可用模型
GET    /api/skills                     # 列出技能
PUT    /api/skills/{name}              # 更新技能配置

网关层的一个重要设计是 IM 通道集成。DeerFlow 原生支持 Telegram、Slack、飞书、企业微信、微信五个 IM 平台,而且配置极其简单——只需在 config.yaml 中启用对应通道,填入凭证即可,无需公网 IP:

# config.yaml - IM 通道配置
channels:
  langgraph_url: http://localhost:2024
  gateway_url: http://localhost:8001

  telegram:
    enabled: true
    bot_token: ${TELEGRAM_BOT_TOKEN}
    allowed_users: []    # 空 = 允许所有

  feishu:
    enabled: true
    app_id: ${FEISHU_APP_ID}
    app_secret: ${FEISHU_APP_SECRET}

  wecom:
    enabled: true
    bot_id: ${WECOM_BOT_ID}
    bot_secret: ${WECOM_BOT_SECRET}

这种设计意味着你可以用 Telegram 机器人的方式给 DeerFlow 下达任务,它会在沙箱里执行完再把结果推回来。一个"随叫随到的研究助手"就这样变成了现实。

2.3 运行时层:LangGraph 驱动的智能体编排

DeerFlow 2.0 选择 LangGraph 作为编排引擎,这不是偶然。LangGraph 的核心优势在于**状态图(StateGraph)**模型——把 Agent 的工作流定义为一个有向图,节点是处理步骤,边是条件转换。

Lead Agent:项目经理

Lead Agent 是整个系统的"项目经理"。它负责:

  1. 接收用户的目标
  2. 分析目标,决定是否需要拆分
  3. 如果需要,派生子智能体
  4. 收集子智能体的结果
  5. 综合输出最终结果
# Lead Agent 的核心逻辑(简化版)
# 实际代码在 backend/packages/harness/deerflow/agents/

from langgraph.graph import StateGraph, END

class LeadAgent:
    """主智能体:接收目标、规划任务、编排子智能体"""

    def __init__(self, config):
        self.config = config
        self.sub_agent_manager = SubAgentManager(config)
        self.memory_manager = MemoryManager(config)
        self.skill_loader = SkillLoader(config)

    def build_graph(self):
        """构建工作流状态图"""
        graph = StateGraph(AgentState)

        # 定义节点
        graph.add_node("analyze", self.analyze_task)
        graph.add_node("plan", self.create_plan)
        graph.add_node("execute", self.execute_plan)
        graph.add_node("synthesize", self.synthesize_results)
        graph.add_node("respond", self.generate_response)

        # 定义边
        graph.set_entry_point("analyze")
        graph.add_conditional_edges(
            "analyze",
            self.should_plan,
            {
                True: "plan",      # 复杂任务 → 规划
                False: "respond",  # 简单问题 → 直接回答
            }
        )
        graph.add_edge("plan", "execute")
        graph.add_edge("execute", "synthesize")
        graph.add_edge("synthesize", "respond")
        graph.add_edge("respond", END)

        return graph.compile()

    async def analyze_task(self, state: AgentState):
        """分析任务复杂度,决定执行策略"""
        task = state["messages"][-1]

        # 渐进式技能加载:只在需要时加载相关技能
        relevant_skills = self.skill_loader.find_relevant(task)
        if relevant_skills:
            state["loaded_skills"] = relevant_skills

        # 评估是否需要子智能体
        complexity = self.assess_complexity(task)
        state["complexity"] = complexity

        return state

    async def execute_plan(self, state: AgentState):
        """执行计划:并行派生子智能体"""
        plan = state["plan"]
        sub_tasks = plan["sub_tasks"]

        # 并行派生子智能体
        results = await self.sub_agent_manager.dispatch_parallel(
            sub_tasks,
            parent_context=state["context"]
        )

        state["sub_results"] = results
        return state

Sub-Agent:专业执行者

子智能体是 DeerFlow 2.0 最核心的创新之一。每个子智能体拥有:

  • 隔离的上下文——看不到主智能体和其他子智能体的上下文,避免信息干扰
  • 独立的工具集——只拥有完成任务所需的最小工具集
  • 独立的终止条件——完成后自动退出,释放资源
# 子智能体管理器
class SubAgentManager:
    """子智能体管理器:创建、调度、监控子智能体"""

    async def dispatch_parallel(self, sub_tasks, parent_context):
        """并行派生多个子智能体"""
        tasks = []
        for sub_task in sub_tasks:
            # 创建隔离上下文
            isolated_context = self.create_isolated_context(
                parent_context,
                sub_task
            )

            # 选择合适的工具集
            tools = self.select_tools(sub_task)

            # 创建子智能体
            agent = SubAgent(
                task=sub_task,
                context=isolated_context,
                tools=tools,
                termination=self.create_termination(sub_task)
            )

            tasks.append(agent.run())

        # 并行执行,收集结果
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 处理异常
        processed = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed.append({
                    "task": sub_tasks[i],
                    "status": "failed",
                    "error": str(result)
                })
            else:
                processed.append({
                    "task": sub_tasks[i],
                    "status": "completed",
                    "result": result
                })

        return processed

    def create_isolated_context(self, parent_context, sub_task):
        """创建隔离的子智能体上下文

        关键设计:子智能体只能看到自己的任务描述,
        不能看到主智能体的完整上下文或其他子智能体的信息。
        这样做有两个好处:
        1. 减少上下文窗口占用
        2. 避免信息干扰,提高执行质量
        """
        return {
            "task": sub_task["description"],
            "constraints": sub_task.get("constraints", []),
            "parent_thread_id": parent_context["thread_id"],
            "workspace": self.allocate_workspace(),
        }

这种"Lead Agent + Sub-Agent"的模式本质上是一种分布式计算思想在 AI Agent 上的应用:把大任务分解成小任务,分配给独立的执行单元,最后聚合结果。和 MapReduce 的哲学一脉相承。

2.4 基础设施层:沙箱、记忆、技能、文件

沙箱系统:给 Agent 一台真正的计算机

这是 DeerFlow 2.0 和普通聊天机器人最本质的区别。

DeerFlow 的沙箱系统提供三种执行模式:

  1. Local Execution——直接在宿主机运行(开发调试用)
  2. Docker Execution——在隔离的 Docker 容器中运行(推荐)
  3. Kubernetes Execution——在 K8s Pod 中运行(生产环境)
# 沙箱提供者接口
class SandboxProvider(ABC):
    """沙箱提供者基类"""

    @abstractmethod
    async def create_sandbox(self, thread_id: str) -> SandboxInstance:
        """创建沙箱实例"""
        pass

    @abstractmethod
    async def execute_code(self, sandbox_id: str, code: str) -> ExecutionResult:
        """在沙箱中执行代码"""
        pass

    @abstractmethod
    async def read_file(self, sandbox_id: str, path: str) -> str:
        """读取沙箱中的文件"""
        pass

    @abstractmethod
    async def write_file(self, sandbox_id: str, path: str, content: str):
        """写入文件到沙箱"""
        pass

    @abstractmethod
    async def destroy_sandbox(self, sandbox_id: str):
        """销毁沙箱"""
        pass


class AioSandboxProvider(SandboxProvider):
    """基于 Docker 的异步沙箱提供者(推荐)"""

    def __init__(self, config):
        self.docker_client = docker.DockerClient()
        self.sandbox_image = config.get("sandbox_image", "deerflow/sandbox:latest")
        self.provisioner_url = config.get("provisioner_url")

    async def create_sandbox(self, thread_id: str) -> SandboxInstance:
        """为每个线程创建独立的 Docker 容器

        文件系统结构:
        /mnt/user-data/
        ├── uploads/      ← 用户上传的文件
        ├── workspace/    ← Agent 的工作目录
        └── outputs/      ← 最终输出物

        /mnt/skills/
        ├── public/       ← 内置技能
        └── custom/       ← 自定义技能
        """
        container = await self.docker_client.containers.run(
            self.sandbox_image,
            detach=True,
            name=f"deerflow-sandbox-{thread_id}",
            # 资源限制
            mem_limit="2g",
            cpu_count=2,
            # 网络隔离
            network_mode="bridge",
            # 只读根文件系统 + 可写数据卷
            read_only=True,
            tmpfs={"/tmp": "size=500m"},
            volumes={
                f"deerflow-data-{thread_id}": {
                    "bind": "/mnt/user-data",
                    "mode": "rw"
                }
            },
            # 安全选项
            security_opt=["no-new-privileges"],
            cap_drop=["ALL"],
        )

        return SandboxInstance(
            id=container.id,
            thread_id=thread_id,
            container=container
        )

    async def execute_code(self, sandbox_id: str, code: str) -> ExecutionResult:
        """在容器中安全执行代码

        执行流程:
        1. 将代码写入容器内的临时文件
        2. 在容器内执行 python3 /tmp/task.py
        3. 收集 stdout、stderr、返回码
        4. 设置超时保护(默认 300s)
        """
        container = self.docker_client.containers.get(sandbox_id)

        # 写入代码文件
        exec_id = container.exec_run(
            f"bash -c 'cat > /tmp/task.py << \"EOF\"\n{code}\nEOF'",
            workdir="/mnt/user-data/workspace"
        )

        # 执行代码
        result = container.exec_run(
            "python3 /tmp/task.py",
            workdir="/mnt/user-data/workspace",
            demux=True,
            stdout=True,
            stderr=True
        )

        return ExecutionResult(
            exit_code=result.exit_code,
            stdout=result.output[0].decode() if result.output[0] else "",
            stderr=result.output[1].decode() if result.output[1] else "",
        )

沙箱内的文件系统是三层结构:

/mnt/user-data/
├── uploads/          ← 用户上传的文件
├── workspace/        ← Agent 的工作目录(中间产物)
└── outputs/          ← 最终输出物(报告、图表、代码等)

/mnt/skills/
├── public/           ← 内置技能
│   ├── research/SKILL.md
│   ├── report-generation/SKILL.md
│   ├── slide-creation/SKILL.md
│   ├── web-page/SKILL.md
│   └── image-generation/SKILL.md
└── custom/           ← 你的自定义技能
    └── your-skill/SKILL.md

这个设计把"输入-加工-输出"的流转关系映射到了文件系统层级上,非常清晰。

记忆系统:让 Agent 真正"认识"你

大多数 Agent 的记忆是对话历史的简单回放。DeerFlow 2.0 的记忆系统是分层的:

  1. 短期记忆(Working Memory)——当前会话的上下文,存储在 LangGraph 的状态图中
  2. 中期记忆(Session Memory)——跨对话的用户偏好、技术栈、工作习惯
  3. 长期记忆(Long-term Memory)——持久化的知识积累,存储在本地文件中
# 记忆管理器(简化版)
class MemoryManager:
    """分层记忆管理器"""

    def __init__(self, config):
        self.memory_store = LocalMemoryStore(config["memory_path"])

    async def apply_memory(self, user_id: str, thread_id: str):
        """在会话开始时加载用户记忆

        去重设计:重复的偏好和上下文不会无限累积,
        apply 时会跳过已存在的相同事实条目。
        """
        memory = await self.memory_store.load(user_id)

        # 去重
        existing_facts = set(memory.get("facts", []))
        new_facts = []
        for fact in memory.get("pending_facts", []):
            if fact not in existing_facts:
                new_facts.append(fact)
                existing_facts.add(fact)

        memory["facts"] = list(existing_facts)
        memory["pending_facts"] = []

        await self.memory_store.save(user_id, memory)
        return memory

    async def update_memory(self, user_id: str, new_info: dict):
        """在会话中更新记忆

        不是简单的追加,而是智能合并:
        - 新偏好覆盖旧偏好
        - 新事实跳过已有重复
        - 上下文信息按相关性排序
        """
        current = await self.memory_store.load(user_id)

        # 合并偏好(新值覆盖旧值)
        for key, value in new_info.get("preferences", {}).items():
            current.setdefault("preferences", {})[key] = value

        # 合并事实(跳过重复)
        existing = set(current.get("facts", []))
        for fact in new_info.get("facts", []):
            if fact not in existing:
                current.setdefault("facts", []).append(fact)
                existing.add(fact)

        await self.memory_store.save(user_id, current)

记忆系统的另一个巧妙之处:它和沙箱文件系统是协作的。大块的中间数据(比如分析结果、生成的图表)不会被塞进上下文窗口,而是被卸载到文件系统,记忆中只保存文件路径和摘要。这是一个典型的"用空间换时间"策略——用磁盘空间换取上下文窗口的可用性。

技能系统:Markdown 即能力

DeerFlow 的技能系统是它最优雅的设计之一。一个技能就是一个 Markdown 文件,定义了能力的工作流、最佳实践和参考资源。

skills/
├── public/
│   ├── research/
│   │   └── SKILL.md          ← 深度研究技能
│   ├── report-generation/
│   │   └── SKILL.md          ← 报告生成技能
│   ├── slide-creation/
│   │   └── SKILL.md          ← 幻灯片创建技能
│   ├── web-page/
│   │   └── SKILL.md          ← 网页生成技能
│   └── image-generation/
│       └── SKILL.md          ← 图片生成技能
└── custom/
    └── your-skill/
        └── SKILL.md          ← 你的自定义技能

渐进式加载是关键设计:技能只在任务需要时才被加载到上下文中。这意味着:

  • 简单问答:零技能加载,上下文极小
  • 研究任务:只加载 research 技能
  • 生成报告:加载 research + report-generation
  • 创建 PPT:加载 research + slide-creation + image-generation
# 技能加载器(简化版)
class SkillLoader:
    """渐进式技能加载器"""

    def __init__(self, skills_dir: str):
        self.skills_dir = Path(skills_dir)
        self.loaded_skills = {}
        self._skill_registry = self._build_registry()

    def _build_registry(self):
        """扫描所有技能,建立索引"""
        registry = {}
        for skill_dir in self.skills_dir.rglob("SKILL.md"):
            skill_name = skill_dir.parent.name
            with open(skill_dir) as f:
                content = f.read()
            # 提取技能元数据(frontmatter)
            metadata = self._parse_frontmatter(content)
            registry[skill_name] = {
                "path": skill_dir,
                "metadata": metadata,
                "keywords": metadata.get("keywords", []),
                "token_count": len(content) // 4,  # 粗估
            }
        return registry

    def find_relevant(self, task: str) -> list[str]:
        """根据任务描述找到相关技能

        不是简单的关键词匹配,而是基于语义相关性排序:
        1. 提取任务中的关键意图
        2. 与技能的 keywords 和 description 做语义匹配
        3. 按相关性和 token 开销排序
        4. 在上下文预算内选择最优组合
        """
        relevant = []
        for name, info in self._skill_registry.items():
            score = self._compute_relevance(task, info)
            if score > 0.5:  # 相关性阈值
                relevant.append((name, score, info["token_count"]))

        # 按相关性降序排序
        relevant.sort(key=lambda x: x[1], reverse=True)

        # 在 token 预算内贪心选择
        selected = []
        total_tokens = 0
        max_tokens = 8000  # 技能上下文预算
        for name, score, tokens in relevant:
            if total_tokens + tokens <= max_tokens:
                selected.append(name)
                total_tokens += tokens

        return selected

    async def load_skill(self, skill_name: str) -> str:
        """加载技能内容到上下文"""
        if skill_name in self.loaded_skills:
            return self.loaded_skills[skill_name]

        info = self._skill_registry[skill_name]
        with open(info["path"]) as f:
            content = f.read()

        self.loaded_skills[skill_name] = content
        return content

这种设计让 DeerFlow 在 token 敏感的模型上也能高效运行。你不需要把所有技能一次性塞进上下文,而是按需加载,用完即卸。


三、代码实战:从零搭建一个自动化研究系统

理论够了,现在上手。

3.1 快速启动

# 克隆仓库
git clone https://github.com/bytedance/deer-flow.git
cd deer-flow

# 运行安装向导(约2分钟)
make setup

# 验证安装
make doctor

# 启动开发服务
make dev

make setup 会启动一个交互式向导,引导你选择 LLM 提供商、可选的 Web 搜索、执行/安全偏好(沙箱模式、Bash 访问、文件写入工具)。它会生成一个最小化的 config.yaml 并把密钥写入 .env

3.2 配置文件深度解读

DeerFlow 的配置中心是 config.yaml,这里有一个生产级别的配置示例:

# config.yaml - 生产级配置

# LLM 提供商配置
llm:
  default_provider: openai
  providers:
    openai:
      api_key: ${OPENAI_API_KEY}
      model: gpt-4o
      temperature: 0.7
      max_tokens: 4096
    deepseek:
      api_key: ${DEEPSEEK_API_KEY}
      model: deepseek-chat
      base_url: https://api.deepseek.com/v1
    kimi:
      api_key: ${KIMI_API_KEY}
      model: moonshot-v1-128k
      base_url: https://api.moonshot.cn/v1

# 沙箱配置
sandbox:
  use: deerflow.community.aio_sandbox:AioSandboxProvider
  image: deerflow/sandbox:latest
  # 资源限制
  memory_limit: "2g"
  cpu_limit: 2
  # 执行超时
  execution_timeout: 300
  # K8s 部署时的 Provisioner
  # provisioner_url: http://sandbox-provisioner:8080

# 记忆配置
memory:
  enabled: true
  storage_path: ./.deer-flow/memory
  # 记忆更新策略
  dedup: true          # 跳过重复事实
  max_facts: 1000      # 最大事实条目数
  max_preferences: 100 # 最大偏好条目数

# 技能配置
skills:
  public_dir: ./skills/public
  custom_dir: ./skills/custom
  # 渐进式加载预算
  max_context_tokens: 8000

# IM 通道
channels:
  langgraph_url: http://localhost:2024
  gateway_url: http://localhost:8001

  telegram:
    enabled: true
    bot_token: ${TELEGRAM_BOT_TOKEN}
    allowed_users: ["123456789"]

# 可观测性
tracing:
  langsmith:
    enabled: true
    project: deerflow-prod
  langfuse:
    enabled: false

3.3 自定义技能开发

DeerFlow 的技能就是一个 SKILL.md 文件。我们来开发一个"竞品分析"技能:

---
name: competitor-analysis
version: 1.0.0
author: your-name
keywords: competitor, analysis, market-research, benchmark
description: 竞品分析技能 - 自动收集竞争对手信息并生成结构化对比报告
---

# 竞品分析技能

## 概述

当用户需要进行竞品分析时,使用此技能。该技能会:
1. 识别竞品范围
2. 从多个维度收集数据
3. 生成结构化对比表格
4. 提供战略建议

## 工作流

### Step 1: 确定分析框架

在开始搜索之前,先和用户确认分析框架:

- 目标产品/服务
- 竞争对手列表(至少3个)
- 分析维度:功能、定价、技术栈、用户评价、市场份额

### Step 2: 并行信息收集

对每个竞品,并行执行以下搜索:

搜索关键词模板:

  • "{产品名} 功能介绍 2026"
  • "{产品名} 定价方案"
  • "{产品名} 技术架构"
  • "{产品名} 用户评价 优缺点"
  • "{产品名} 市场份额 数据"

### Step 3: 数据整理与对比

将收集到的数据整理为结构化表格:

| 维度 | 我方 | 竞品A | 竞品B | 竞品C |
|------|------|-------|-------|-------|
| 核心功能 | ... | ... | ... | ... |
| 定价 | ... | ... | ... | ... |
| 技术栈 | ... | ... | ... | ... |
| 用户口碑 | ... | ... | ... | ... |

### Step 4: 战略建议

基于对比结果,提供:
- 差异化优势
- 需要追赶的短板
- 建议的功能优先级
- 定价策略建议

## 输出格式

使用 Markdown 格式,包含:
1. 执行摘要(200字内)
2. 详细对比表格
3. 雷达图(用 Mermaid 语法生成)
4. 战略建议
5. 数据来源列表

## 注意事项

- 数据必须标注来源和日期
- 主观评价需标注"基于公开信息"
- 定价信息需标注"截至 YYYY-MM-DD"
- 不确定的数据标注"待验证"

把这个文件放到 skills/custom/competitor-analysis/SKILL.md,DeerFlow 就会自动发现并注册这个技能。下次你让它做竞品分析时,它会自动加载这个技能到上下文中。

3.4 嵌入式 Python 客户端

DeerFlow 不一定要跑完整的服务端,你也可以把它作为 Python 库嵌入到自己的应用中:

from deerflow.client import DeerFlowClient

# 创建客户端(直连模式,无需 HTTP 服务)
client = DeerFlowClient()

# 同步聊天
response = client.chat(
    "帮我分析一下 React 19 和 Vue 4 的 Server Components 方案差异",
    thread_id="tech-comparison"
)
print(response)

# 流式响应
for event in client.stream("分析一下最近的 GitHub Trending 项目"):
    if event.type == "messages-tuple" and event.data.get("type") == "ai":
        print(event.data["content"], end="", flush=True)

# 上传文件进行分析
client.upload_files("thread-1", ["./report.pdf", "./data.csv"])

# 管理技能
skills = client.list_skills()
print(f"可用技能: {skills}")

# 启用/禁用技能
client.update_skill("competitor-analysis", enabled=True)

# 列出模型
models = client.list_models()
print(f"可用模型: {models}")

这个嵌入式客户端的 API 完全对齐 Gateway HTTP API 的响应模型,在 CI 中有 TestGatewayConformance 测试确保两者始终一致。这意味着你可以先在嵌入模式下开发调试,无缝切换到 HTTP 服务模式部署生产。

3.5 实战:用 DeerFlow 构建自动化技术雷达

让我用一个完整案例展示 DeerFlow 的端到端能力:构建一个自动化技术雷达系统,定期扫描技术趋势,生成可视化报告。

# tech_radar_agent.py
"""
自动化技术雷达:用 DeerFlow 定期扫描技术趋势
"""

import asyncio
import json
from datetime import datetime
from deerflow.client import DeerFlowClient

class TechRadarAgent:
    """技术雷达 Agent"""

    # 雷达四象限
    QUADRANTS = {
        "techniques": "技术方法",
        "platforms": "平台与基础设施",
        "tools": "工具链",
        "languages": "语言与框架"
    }

    # 环形分级
    RINGS = {
        "adopt": "采纳",      # 建议立即采用
        "trial": "试验",      # 值得尝试
        "assess": "评估",     # 值得关注
        "hold": "暂缓"        # 暂不建议
    }

    def __init__(self):
        self.client = DeerFlowClient()
        self.thread_id = f"tech-radar-{datetime.now().strftime('%Y-%m-%d')}"

    async def scan_trends(self):
        """扫描技术趋势"""
        prompt = """
        请执行以下技术雷达扫描任务:

        1. 搜索过去一周的 GitHub Trending 项目
        2. 搜索最近的技术博客和新闻
        3. 对每个发现的技术/项目进行分类:
           - 象限:techniques/platforms/tools/languages
           - 环形:adopt/trial/assess/hold
        4. 生成 Markdown 格式的技术雷达报告

        报告格式:
        ## 采纳(Adopt)
        - **[技术名]** - 一句话说明为什么建议采纳

        ## 试验(Trial)
        - **[技术名]** - 一句话说明为什么值得尝试

        ## 评估(Assess)
        - **[技术名]** - 一句话说明为什么值得关注

        ## 暂缓(Hold)
        - **[技术名]** - 一句话说明为什么暂不建议

        要求:
        - 每个分类至少3个条目
        - 必须有具体的版本号或时间信息
        - 判断依据必须基于事实而非主观偏好
        """

        result = await self.client.chat(
            prompt,
            thread_id=self.thread_id
        )
        return result

    async def generate_radar_chart(self, report: str):
        """生成雷达可视化(Mermaid 格式)"""
        prompt = f"""
        基于以下技术雷达报告,生成一个 Mermaid mindmap 图:

        {report}

        要求:
        1. 根节点为"技术雷达 YYYY-MM-DD"
        2. 四个象限作为一级子节点
        3. 环形分级作为二级子节点
        4. 具体技术作为三级子节点

        只输出 Mermaid 代码,不要其他内容。
        """

        result = await self.client.chat(
            prompt,
            thread_id=self.thread_id
        )
        return result

    async def run(self):
        """运行完整流程"""
        print("🔍 正在扫描技术趋势...")
        report = await self.scan_trends()

        print("📊 正在生成雷达可视化...")
        chart = await self.generate_radar_chart(report)

        # 保存结果
        output = {
            "date": datetime.now().isoformat(),
            "report": report,
            "chart": chart
        }

        with open(f"tech-radar-{datetime.now().strftime('%Y%m%d')}.json", "w") as f:
            json.dump(output, f, ensure_ascii=False, indent=2)

        print("✅ 技术雷达生成完成!")
        return output


if __name__ == "__main__":
    radar = TechRadarAgent()
    asyncio.run(radar.run())

四、上下文工程:长时程任务的生命线

4.1 问题:上下文窗口的"内存泄漏"

AI Agent 执行长时程任务时,最大的敌人不是模型能力不够,而是上下文窗口溢出

一个典型的场景:你让 Agent 做一份深度研究报告,它需要搜索 10 个网页,每个网页的内容都塞进上下文,再加上中间的分析步骤、子智能体的汇报……很快,128K 的上下文窗口就满了。结果就是:Agent 忘了最初的目标,开始胡说八道。

DeerFlow 2.0 的上下文工程(Context Engineering)是解决这个问题的系统方案。

4.2 三大策略

策略一:隔离式子智能体上下文

每个子智能体运行在完全隔离的上下文中。它看不到主智能体的完整上下文,也看不到其他子智能体的信息。

这意味着:

  • 10 个子智能体并行工作,每个只消耗自己的上下文窗口
  • 主智能体的上下文窗口只保留任务描述和最终结果
  • 总的 token 消耗是 O(n × k) 而不是 O(n × n),其中 n 是子任务数,k 是每个子任务的上下文大小

策略二:渐进式上下文压缩

在一个会话内,DeerFlow 会主动管理上下文:

class ContextCompressor:
    """上下文压缩器"""

    async def compress(self, state: AgentState) -> AgentState:
        """压缩已完成子任务的上下文

        策略:
        1. 已完成的子任务 → 只保留摘要
        2. 中间产物 → 卸载到文件系统,上下文只保留路径
        3. 搜索结果 → 只保留关键信息,丢弃原始 HTML
        """
        completed = state.get("completed_sub_tasks", [])

        for task in completed:
            # 子任务结果卸载到文件
            if task.get("result_size", 0) > 2000:  # 超过2K字符
                file_path = await self.offload_to_file(
                    thread_id=state["thread_id"],
                    task_id=task["id"],
                    content=task["result"]
                )
                # 上下文中只保留摘要和文件路径
                task["result_summary"] = self.summarize(task["result"])
                task["result_file"] = file_path
                task["result"] = None  # 清空完整结果

        return state

    def summarize(self, text: str, max_length: int = 200) -> str:
        """生成摘要(使用模型或规则)"""
        if len(text) <= max_length:
            return text

        # 简单规则:取首句 + 关键数据
        first_sentence = text.split("。")[0] + "。"
        # 提取数字和百分比
        key_data = re.findall(r'\d+\.?\d*%?', text)
        summary = first_sentence
        if key_data:
            summary += f" 关键数据: {', '.join(key_data[:5])}"
        return summary[:max_length]

策略三:严格工具调用恢复

这是一个很多人忽略的工程细节。当 LLM 提供商或中间件强制中断一个工具调用循环时,上下文中会留下"悬挂"的工具调用——模型已经发出了 tool_call,但还没有收到对应的 tool_result。

DeerFlow 的处理方式是:在下一个模型调用之前,剥离提供商级别的原始工具调用元数据,并为悬挂的调用注入占位符工具结果

class ToolCallRecovery:
    """工具调用恢复器

    解决的问题:OpenAI 兼容模型会严格验证
    tool_call_id 序列,如果上下文中有悬挂的
    tool_call(没有对应的 tool_result),
    下一次模型调用会报错。
    """

    def recover(self, messages: list) -> list:
        """恢复悬挂的工具调用"""
        recovered = []
        pending_tool_calls = {}

        for msg in messages:
            if msg.get("role") == "assistant" and msg.get("tool_calls"):
                # 记录所有工具调用 ID
                for tc in msg["tool_calls"]:
                    pending_tool_calls[tc["id"]] = tc

            elif msg.get("role") == "tool":
                # 工具结果已经收到,移除对应的 pending
                tc_id = msg.get("tool_call_id")
                if tc_id in pending_tool_calls:
                    del pending_tool_calls[tc_id]

            recovered.append(msg)

        # 为悬挂的工具调用注入占位符结果
        for tc_id, tc in pending_tool_calls.items():
            recovered.append({
                "role": "tool",
                "tool_call_id": tc_id,
                "content": f"[工具调用被中断,结果不可用] 函数: {tc['function']['name']}"
            })

        return recovered

这个看似小的细节,实际上是生产环境中 Agent 稳定性的关键。没有它,Agent 在长时间运行中会因为格式错误而崩溃。


五、部署与性能优化

5.1 部署模式选择

DeerFlow 提供两种部署模式:标准模式(Standard)和网关模式(Gateway)。

维度标准模式网关模式
架构Gateway + LangGraph (4 进程)Gateway 嵌入运行时 (3 进程)
并发受限于 LangGraph worker 许可--workers × 异步任务(无上限)
资源更高(两个 Python 运行时)更低(单一 Python 运行时)
冷启动较慢(两个服务初始化)更快
LangGraph 许可生产镜像需要不需要
# 标准模式
make up

# 网关模式(推荐生产环境)
make up-pro
# 或
./scripts/deploy.sh --gateway

5.2 资源规划

官方给出的部署规模参考:

场景最低配置推荐配置
本地评估4 vCPU, 8 GB RAM8 vCPU, 16 GB RAM
Docker 开发4 vCPU, 8 GB RAM8 vCPU, 16 GB RAM
长期运行服务8 vCPU, 16 GB RAM16 vCPU, 32 GB RAM

注意:这些配置是给 DeerFlow 本身的,如果你还要在本地跑 LLM,需要额外规划。

5.3 Docker 生产部署

# 一步构建+启动
deploy.sh --gateway

# 分步:先构建,后启动(支持切换模式)
deploy.sh build
deploy.sh start --gateway

# 停止
deploy.sh down

Docker 生产部署的一个关键优化:国内镜像加速

# 设置国内镜像(构建前)
export UV_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
export NPM_REGISTRY=https://registry.npmmirror.com

# 然后再构建
deploy.sh build

5.4 可观测性

DeerFlow 内置了 LangSmith 和 Langfuse 双追踪支持:

# .env
# LangSmith
LANGSMITH_TRACING=true
LANGSMITH_ENDPOINT=https://api.smith.langchain.com
LANGSMITH_API_KEY=lsv2_pt_xxxxxxxxxxxxxxxx
LANGSMITH_PROJECT=deerflow-prod

# Langfuse
LANGFUSE_TRACING=true
LANGFUSE_PUBLIC_KEY=pk-lf-xxxxxxxxxxxxxxxx
LANGFUSE_SECRET_KEY=sk-lf-xxxxxxxxxxxxxxxx
LANGFUSE_BASE_URL=https://cloud.langfuse.com

两个可以同时启用,DeerFlow 会把相同的模型活动同时上报给两个系统。

5.5 K8s 沙箱:生产级安全隔离

对于需要高安全性的生产环境,DeerFlow 支持 Kubernetes 沙箱模式。每个任务运行在独立的 Pod 中,通过 Provisioner 服务管理 Pod 的生命周期:

# K8s Provisioner 部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: sandbox-provisioner
spec:
  replicas: 1
  selector:
    matchLabels:
      app: sandbox-provisioner
  template:
    metadata:
      labels:
        app: sandbox-provisioner
    spec:
      containers:
      - name: provisioner
        image: deerflow/sandbox-provisioner:latest
        ports:
        - containerPort: 8080
        env:
        - name: KUBECONFIG_PATH
          value: /etc/kubernetes/kubeconfig
        resources:
          requests:
            memory: "256Mi"
            cpu: "500m"
          limits:
            memory: "512Mi"
            cpu: "1"

DeerFlow 的最新版本还支持为沙箱卷配置可选的 PVC(PersistentVolumeClaim),让沙箱的文件持久化更灵活。


六、Claude Code 集成:终端里的 AI 团队

DeerFlow 提供了一个 claude-to-deerflow 技能,让你直接在 Claude Code 终端里和运行中的 DeerFlow 实例交互:

# 安装技能
npx skills add https://github.com/bytedance/deer-flow --skill claude-to-deerflow

# 使用(确保 DeerFlow 在 http://localhost:2026 运行)
# 在 Claude Code 中输入 /claude-to-deerflow

支持的操作:

  • 发送研究任务并获取流式响应
  • 选择执行模式:flash(快速)、standard、pro(规划)、ultra(子智能体)
  • 检查 DeerFlow 健康状态、列出模型/技能/智能体
  • 管理线程和对话历史
  • 上传文件进行分析

这意味着你可以在 Claude Code 的终端里完成"写代码 → 让 DeerFlow 做研究 → 获取结果 → 继续写代码"的完整工作流,无需切换窗口。


七、安全:不只是沙箱

DeerFlow 默认绑定 127.0.0.1,只在本地可访问。如果你要部署到公网,官方给出了三个安全建议:

  1. IP 白名单——用 iptables 或防火墙 ACL,只允许授权 IP 访问
  2. 认证网关——在前面放一个 Nginx 反向代理,开启强认证
  3. 网络隔离——把 Agent 和可信设备放在同一个 VLAN

最新版本还有一个安全增强:Gateway 对 HTML/SVG 等活跃 Web 内容类型强制下载而非内联渲染,降低 XSS 攻击面。这是一个很容易被忽视但非常重要的安全细节——Agent 生成的 HTML 文件如果直接在浏览器里渲染,可能包含恶意脚本。


八、与同类框架的对比

2026 年的 AI Agent 框架市场已经相当拥挤。DeerFlow、Hermes Agent、OpenClaw 是三个定位不同但经常被拿来比较的项目。

维度DeerFlow 2.0Hermes AgentOpenClaw
核心定位超级智能体编排框架自进化 AI 代理框架个人 AI Agent 运行时
开发者字节跳动NousResearch社区
架构LangGraph 编排 + 沙箱 + 技能自进化 + 技能树多通道 + 技能 + 节点
适用场景企业级复杂任务个人 AI 助手个人/小型团队
部署复杂度中高
沙箱隔离Docker/K8s容器可选
长期记忆
IM 集成5 平台多平台
自托管

三者不是直接竞品,而是覆盖了 AI Agent 从个人轻量使用到企业级复杂任务的全场景。


九、总结与展望

DeerFlow 2.0 的核心贡献

  1. 重新定义了 AI Agent 的定位——从"对话工具"到"执行系统"
  2. 解决了长时程任务的工程难题——上下文压缩、子智能体隔离、文件系统卸载
  3. 提供了生产级的安全隔离——Docker/K8s 沙箱、严格的访问控制
  4. 实现了渐进式能力加载——按需加载技能,token 敏感模型也能高效运行
  5. 打通了 IM 通道——从 Telegram 到飞书,一个配置搞定

我的看法

DeerFlow 2.0 不是银弹。它最擅长的是长时间、多步骤、需要代码执行和分析能力的任务——比如深度研究、竞品分析、技术雷达、自动化报告。对于简单的问答或单步工具调用,它有点杀鸡用牛刀。

但它代表了一个正确的方向:AI Agent 不应该只是一个更聪明的聊天机器人,它应该是一个真正能干活的执行系统。 这需要的不只是更强的模型,更需要更完善的工程基础设施——沙箱、记忆、技能、编排、安全。DeerFlow 2.0 在这些方面做出了业界领先的设计。

适合你的场景

  • ✅ 需要自动化深度研究工作流

  • ✅ 需要长时间运行的 Agent 任务

  • ✅ 需要安全的代码执行环境

  • ✅ 需要多 IM 平台集成

  • ✅ 有 Docker/K8s 运维经验

  • ❌ 只需要简单的对话式 AI

  • ❌ 没有服务器资源

  • ❌ 需要极度轻量级的方案


参考

推荐文章

15 个你应该了解的有用 CSS 属性
2024-11-18 15:24:50 +0800 CST
curl错误代码表
2024-11-17 09:34:46 +0800 CST
如何优化网页的 SEO 架构
2024-11-18 14:32:08 +0800 CST
Vue3中如何实现状态管理?
2024-11-19 09:40:30 +0800 CST
Vue3中如何进行异步组件的加载?
2024-11-17 04:29:53 +0800 CST
JavaScript 流程控制
2024-11-19 05:14:38 +0800 CST
js迭代器
2024-11-19 07:49:47 +0800 CST
程序员茄子在线接单