DeerFlow 核心系统实现分析文档
本文档提供了对 DeerFlow 代理编排系统核心组件的全面技术分析,包括主代理 (Lead Agent) 架构、中间件管道、状态管理和记忆系统。该分析基于对代码库结构和实现模式的深入探索。
1. 主代理 (Lead Agent) 实现
Section titled “1. 主代理 (Lead Agent) 实现”主代理是 DeerFlow 系统的中央编排者,作为一个基于 LangGraph 的代理实现,并支持动态配置。
核心文件: /backend/packages/harness/deerflow/agents/lead_agent/agent.py
入口点: make_lead_agent(config: RunnableConfig)
| 特性 | 实现细节 |
|---|---|
| 动态模型解析 | 基于配置自动选择合适的模型,并包含针对缺失模型的降级逻辑。模型优先级:请求参数 model_name > 自定义代理配置的模型 > 全局默认模型 |
| 自定义代理支持 | 支持加载自定义代理配置(通过 agent_name 参数),具有独立的工具集、模型和记忆存储 |
| 思考模式支持 | 解析模型能力,为支持的模型启用扩展的思考 (Thinking) 功能 |
| 计划模式集成 | 在启用计划模式时,激活 TodoMiddleware 用于复杂的多步骤任务跟踪 |
| 子代理编排 | 支持将任务委托给子代理,并可配置并发限制 |
| 引导模式 (Bootstrap Mode) | 提供用于自定义代理创建工作流的最小代理实例 |
| 动态工具加载 | 基于模型能力、子代理启用状态和代理配置来加载工具 |
主代理通过严格排序的中间件链(详见第 2 节)在调用 LLM 前后执行请求,以确保所有交互处理的一致性。
2. 中间件系统实现
Section titled “2. 中间件系统实现”所有中间件都继承自 AgentMiddleware 基类,并实现 before_agent() 和/或 after_agent() 钩子。它们在共享的 ThreadState 对象上运行,可以修改状态、中断执行或触发副作用。
执行顺序:
- LLM 调用前: 中间件按定义的顺序运行,以在生成提示前对状态进行预处理
- LLM 调用后: 中间件按相反的顺序运行,以在状态持久化前对响应进行后处理
- 中断支持: 中间件可以返回
Command(goto=END)来提前停止执行(例如用于请求澄清)
完整中间件清单(执行顺序)
Section titled “完整中间件清单(执行顺序)”中间件按以下严格顺序执行。如果基于配置有条件加载,则标记为 [可选]:
| 顺序 | 中间件 | 文件路径 | 核心目的 | 启用条件 |
|---|---|---|---|---|
| 1 | ThreadDataMiddleware | /agents/middlewares/thread_data_middleware.py | 创建每线程的 workspace、uploads 和 outputs 目录,并将路径注入状态 | 始终启用 |
| 2 | UploadsMiddleware | /agents/middlewares/uploads_middleware.py | 跟踪新上传的文件并将其注入对话上下文 | 始终启用 |
| 3 | SandboxMiddleware | /sandbox/middleware.py | 获取执行沙盒实例并将 sandbox_id 存储在状态中 | 始终启用 |
| 4 | DanglingToolCallMiddleware | /agents/middlewares/dangling_tool_call_middleware.py | 为缺少对应结果的工具调用注入占位符响应 | 始终启用 |
| 5 | GuardrailMiddleware | /guardrails/middleware.py | 通过可插拔的 GuardrailProvider 协议提供工具调用前的授权 | [可选] 当配置中设置了 guardrails.enabled 时启用 |
| 6 | ToolErrorHandlingMiddleware | /agents/middlewares/tool_error_handling_middleware.py | 将工具执行异常转换为格式化的 ToolMessages 供 LLM 处理 | 始终启用 |
| 7 | SummarizationMiddleware | LangChain 內置 | 当接近 Token 限制时执行自动上下文缩减 | [可选] 当配置中设置了 summarization.enabled 时启用 |
| 8 | TodoMiddleware | /agents/middlewares/todo_middleware.py | 为计划模式实现在 write_todos 工具中的任务跟踪 | [可选] 当激活计划模式时启用 |
| 9 | TokenUsageMiddleware | /agents/middlewares/token_usage_middleware.py | 跟踪和报告所有请求的 Token 消耗 | [可选] 当配置了 token_usage 跟踪时启用 |
| 10 | TitleMiddleware | /agents/middlewares/title_middleware.py | 在第一次完整对话交流后自动生成描述性的线程标题 | 始终启用 |
| 11 | MemoryMiddleware | /agents/middlewares/memory_middleware.py | 将完成的对话排队以进行异步的长期记忆更新 | 始终启用 |
| 12 | ViewImageMiddleware | /agents/middlewares/view_image_middleware.py | 为支持视觉的模型将 base64 图像数据注入状态 | [可选] 仅当所选模型支持视觉能力时启用 |
| 13 | DeferredToolFilterMiddleware | /agents/middlewares/deferred_tool_filter_middleware.py | 启用工具搜索时对模型隐藏延迟的工具 Schema | [可选] 当配置了 tool_search 特性时启用 |
| 14 | SubagentLimitMiddleware | /agents/middlewares/subagent_limit_middleware.py | 通过截断多余的并行任务调用来强制执行最大并发子代理限制 | [可选] 当开启子代理特性时启用 |
| 15 | LoopDetectionMiddleware | /agents/middlewares/loop_detection_middleware.py | 检测并阻断重复的工具调用循环,以防止无限执行 | 始终启用 |
| 16 | ClarificationMiddleware | /agents/middlewares/clarification_middleware.py | 拦截 ask_clarification 工具调用并中断执行以请求用户输入 | 始终启用(必须是链中的最后一个中间件) |
3. 状态管理系统实现
Section titled “3. 状态管理系统实现”线程状态 Schema
Section titled “线程状态 Schema”系统使用扩展的 LangChain AgentState 来存储所有与对话相关的数据。
核心文件: /backend/packages/harness/deerflow/agents/thread_state.py
class ThreadState(AgentState): sandbox: NotRequired[SandboxState] # 执行沙盒标识符 (可选字段) thread_data: NotRequired[ThreadDataState] # Workspace/uploads/outputs 路径 (可选字段) title: NotRequired[str] # 自动生成的线程标题 (可选字段) artifacts: Annotated[list[str], merge_artifacts] # 生成的输出文件 todos: NotRequired[list] # 计划模式的任务列表 (可选字段) uploaded_files: NotRequired[list[dict]] # 当前会话中上传的文件 (可选字段) viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images] # 缓存的图像数据自定义 Reducer
Section titled “自定义 Reducer”merge_artifacts: 合并和去重 Artifact,同时保持顺序merge_viewed_images: 合并图像字典,支持通过空字典清理缓存
Checkpointer (状态持久化)
Section titled “Checkpointer (状态持久化)”Checkpointer 系统为对话状态提供持久化存储,支持多种后端选项。
核心文件:
- 同步 Provider:
/backend/packages/harness/deerflow/agents/checkpointer/provider.py - 异步 Provider:
/backend/packages/harness/deerflow/agents/checkpointer/async_provider.py
| 后端 | 使用场景 |
|---|---|
| In-Memory | 用于测试/开发的默认选项,非持久化 |
| SQLite | 用于单节点部署的基于文件的持久化 |
| PostgreSQL | 用于多节点生产部署的分布式持久化 |
- 单例模式用于同步使用 (跨调用复用,在进程退出时关闭)
- 上下文管理器模式用于具有确定性清理的一次性操作
- 首次使用时自动设置数据库 Schema
- 通过
config.yaml的checkpointer部分进行配置 - 当没有配置持久化 Checkpointer 时自动降级到 InMemorySaver
4. 记忆系统实现
Section titled “4. 记忆系统实现”记忆系统实现了双层架构:
- 短期记忆: 由 Checkpointer 系统管理,存储完整的对话状态
- 长期记忆: 对用户上下文、事实和历史信息的持久化存储
长期记忆组件
Section titled “长期记忆组件”记忆存储 (Memory Storage)
Section titled “记忆存储 (Memory Storage)”核心文件: /backend/packages/harness/deerflow/agents/memory/storage.py
- 抽象接口:
MemoryStorage为存储提供商定义了标准接口 - 默认实现:
FileMemoryStorage将记忆作为 JSON 文件存储在磁盘上 - 可插拔架构: 通过配置支持自定义存储提供商
- 缓存: 带文件修改时间检查的内存缓存,以保证数据新鲜度
- 原子写入: 使用 临时文件 + 重命名 模式来防止数据损坏
- 多租户支持: 为自定义代理提供分离的记忆存储
记忆数据结构
Section titled “记忆数据结构”{ "version": "1.0", "lastUpdated": "ISO 8601 时间戳", "user": { "workContext": {"summary": "", "updatedAt": ""}, "personalContext": {"summary": "", "updatedAt": ""}, "topOfMind": {"summary": "", "updatedAt": ""} }, "history": { "recentMonths": {"summary": "", "updatedAt": ""}, "earlierContext": {"summary": "", "updatedAt": ""}, "longTermBackground": {"summary": "", "updatedAt": ""} }, "facts": [ { "id": "fact_xxxx", "content": "事实内容", "category": "preference/knowledge/context/behavior/goal", "confidence": 0.0-1.0, "createdAt": "", "source": "thread_id" } ]}记忆更新器 (Memory Updater)
Section titled “记忆更新器 (Memory Updater)”核心文件: /backend/packages/harness/deerflow/agents/memory/updater.py
- 使用 LLM 从对话上下文中提取有意义的更新
- 更新用户上下文部分 (work, personal, top of mind)
- 更新历史上下文部分 (recent months, earlier, long term)
- 提取具有置信度分数的离散事实
- 基于标准化内容对事实进行去重
- 实施最大事实数限制(保留置信度最高的事实)
- 自动剥离文件上传相关的提及,以避免过期的引用
- 执行原子更新以防止记忆损坏
记忆队列 (Memory Queue)
Section titled “记忆队列 (Memory Queue)”核心文件: /backend/packages/harness/deerflow/agents/memory/queue.py
- 防抖更新队列,以避免频繁的 LLM 调用
- 在可配置的窗口(默认:30秒)内批处理多个对话更新
- 线程去重:每个线程只保留最新的对话
- 后台线程处理,以避免阻塞代理执行
- 通过 flush() 方法支持优雅关机
记忆中间件 (Memory Middleware)
Section titled “记忆中间件 (Memory Middleware)”核心文件: /backend/packages/harness/deerflow/agents/middlewares/memory_middleware.py
- 在代理执行后运行,将对话排队以进行记忆更新
- 过滤消息,仅保留用户输入和最终的助手响应
- 从记忆中移除中间工具调用/结果
- 从用户消息中剥离临时的上传块
- 跳过仅有上传而没有真正用户文本的轮次
- 仅将至少包含一条用户消息和一条助手消息的对话排队
5. 组件交互架构
Section titled “5. 组件交互架构”- 请求入口: 用户请求进入系统并被包装在 ThreadState 中
- 中间件链构建: 主代理基于配置动态构建中间件链,仅包含启用的可选中间件
- 中间件预处理: 中间件按定义的顺序运行,以在生成提示前对状态进行预处理
- 主代理执行: LLM 处理准备好的状态并生成响应/工具调用
- 工具执行: 如果生成了工具调用,ToolErrorHandlingMiddleware 会带有错误处理地执行它们
- 中间件后处理: 中间件按相反顺序运行,以在状态持久化之前处理响应
- 状态持久化: 更新后的 ThreadState 被保存到 Checkpointer 中
- 记忆更新: 完成的对话被排队进行异步长期记忆更新
关键依赖关系
Section titled “关键依赖关系”| 组件交互 | 描述 |
|---|---|
| 主代理 ↔ 中间件 | 主代理在初始化期间构建中间件链,基于配置动态地仅包含启用的可选中间件 |
| 中间件 ↔ 状态管理 | 所有中间件从共享的 ThreadState 对象读取并进行修改 |
| ToolErrorHandlingMiddleware ↔ 工具执行 | 捕获工具执行异常并将其转换为格式化的 ToolMessages 供 LLM 处理,避免执行中断 |
| 记忆中间件 ↔ 记忆系统 | MemoryMiddleware 将过滤后的对话排队到 MemoryUpdateQueue 中进行异步处理 |
| Checkpointer ↔ 线程状态 | Checkpointer 会在每个执行步骤后自动持久化完整的 ThreadState |
| 主代理 ↔ 子代理 | 主代理使用子代理工具委托任务,并通过 SubagentLimitMiddleware 强制实施并发限制 |
所有组件都遵循严格的依赖注入模式,并且完全可以通过中心的 config.yaml 文件进行配置,而无需修改代码。