Panda_quantflow源码分析之Panda Server模块深度分析
  大道至简 2026年02月12日 121 0

Panda Server 模块深度分析

src/panda_server 是 Panda Quantflow 平台的控制中枢,基于 FastAPI 构建。本文档基于对源代码的深度解析,详细阐述其启动流程、模块架构、通信机制及核心业务逻辑。

1. 系统启动与生命周期 (Entry Point & Lifecycle)

入口文件为 src/panda_server/main.py,其启动流程严格遵循以下顺序:

1.1 启动序列

  1. 环境加载: 优先加载 .env 文件,并设置 sys.path 包含项目根目录及 src 目录。
  2. FastAPI 应用初始化: 创建 app 实例,配置 lifespan 上下文管理器。
  3. Lifespan (Startup 阶段):
    • 数据库连接: mongodb.connect_db() 建立异步连接。
    • 索引初始化: mongodb.init_local_db() 自动创建必要的 Collection 索引。
    • 插件加载: load_all_nodes() 动态扫描并加载所有工作流节点(内置及用户插件)。
    • 消息队列 (CLOUD 模式):
      • 连接 RabbitMQ (AsyncRabbitMQ).
      • 根据 SERVER_ROLE (CONSUMER/ALL) 启动 QueueConsumerManager,并行运行 WorkflowConsumerLogConsumer
  4. 中间件与路由: 配置 CORS,挂载静态资源 (/quantflow, /charts),注册各模块 Router。
  5. 服务运行: 通过 uvicorn.run 在 8000 端口启动 ASGI 服务。

2. 目录结构与模块映射

src/panda_server/ ├── main.py # 入口:生命周期、依赖注入、中间件配置 ├── config/ # 配置:Database(Motor), Env, MongoDB Indexes ├── routes/ # 控制层 (Controller):解析参数,调用 Logic │ ├── workflow_routes.py # 工作流管理 -> logic/workflow_*.py │ ├── chat_routes.py # LLM 对话 (SSE) -> services/llm │ ├── trading/ # 实盘交易 -> (Inline Logic + Redis) │ └── ... ├── logic/ # 业务层 (Service):纯 Python 业务逻辑,无 HTTP 依赖 │ ├── workflow_run_logic.py # 运行分发 (Local/Cloud 策略) │ ├── userPlugin/ # 插件保存与验证逻辑 │ └── ... ├── services/ # 外部服务集成 │ └── llm/ # LLM 核心架构 (Agent, CodeChecker) ├── messaging/ # 异步消息系统 (RabbitMQ) │ ├── consumer_manager.py # 消费者进程管理 │ ├── workflow_consumer.py# 工作流执行消费者 │ └── log_consumer.py # 日志持久化消费者 ├── utils/ # 核心工具库 │ ├── run_workflow_utils.py # DAG 调度与执行引擎 │ └── userPlugin/ # AST 代码安全检查器 └── models/ # 数据模型 (Pydantic Request/Response)

3. 分模块深度解析

3.1 阶段一:基础架构模块 (Infrastructure)

本阶段包含系统的底层支撑组件,负责环境配置、数据库连接、类型定义及核心工具库。

3.1.1 Config (配置管理)

负责系统基础设施的连接与配置。

  • Database (database.py): 使用 motor 封装异步 MongoDB 连接。支持 ReplicaSetSingle 模式(通过 MONGO_TYPE 切换)。
  • Env (env.py): 集中加载 .env 环境变量,提供 MONGO_URI, RABBITMQ_URI 等关键配置的默认值。
  • Index Management (mongodb_index_*.py):
    • 声明式定义: mongodb_index_config.py 定义了各集合(如 workflow, workflow_logs)的索引结构。
    • 自动同步: mongodb_index_manager.py 在启动时对比并自动创建缺失索引,确保 Schema 性能优化。

3.1.2 Enums (枚举定义)

定义系统通用的状态与类型常量。

  • WorkflowRunStatus: 运行状态枚举 (PENDING, RUNNING, SUCCESS, FAILED, TERMINATED)。
  • FeatureTag: 功能标签枚举,用于区分不同类型的插件或节点功能。

3.1.3 Utils (核心工具)

  • Workflow Engine (run_workflow_utils.py):
    • 拓扑排序: determine_workflow_execution_order 计算节点执行层级。
    • 并发调度: 使用 run_in_threadpool 将同步的节点计算任务放入线程池,避免阻塞主循环。
  • Storage (db_storage.py): 封装了 cloudpickle 序列化,自动根据数据大小选择存储在普通 Collection 或 GridFS (大文件)。
  • Time (time_utils.py): 统一系统时间为北京时间,处理时区转换。
  • UserPlugin Validator (userPlugin/):
    • AST 静态分析: user_plugin_validator.py 利用抽象语法树 (AST) 对用户代码进行深度审计,而非简单的正则匹配。
    • 安全规则: user_plugin_rules.py 定义了黑名单,禁止导入 os, sys, subprocess 等危险模块,禁止使用 eval, exec 等动态执行函数,确保插件在沙箱般的约束下运行。

3.1.4 Migrations (版本迁移)

位于 migrations/ 目录,提供数据库 Schema 演进能力。该模块的设计理念是原子性增量更新

  • 通用索引管理 (index_common_manager.py):
    • 封装了 create_collection_indexesdrop_collection_indexes,支持原子化的索引变更。
    • 提供 verify_indexes_exist 机制,确保索引变更真实落地。
  • 业务索引优化:
    • index_workflow_logs.py: 针对日志查询慢的问题,引入了复合索引(如按 user_id + workflow_id + time 排序),并支持“先建后删”的平滑迁移模式。
    • index_workflow_sequence_counters.py: 通过添加唯一索引 (unique: True),从数据库物理层强制保证了序列计数器的唯一性,解决了并发下的数据一致性问题。
  • 数据迁移事务:
    • workflow_demo_migration.py 展示了使用 MongoDB 事务 (async with session.start_transaction()) 进行多集合(Workflow, Run, GridFS)级联更新的最佳实践,确保迁移过程中不会出现数据“断头”或状态不一致。

3.2 阶段二:业务核心模块 (Business Core)

本阶段包含核心业务逻辑与数据模型,实现了工作流编排、回测数据处理及实盘交易控制。

3.2.1 Logic (业务逻辑层)

纯 Python 编写的核心业务层,与 HTTP 协议解耦,便于测试和复用。

  • Workflow Domain (工作流域):

    • 执行策略 (workflow_run_logic.py):
      • 双模式调度: 依据 RUN_MODE 环境变量,将任务分发至 RabbitMQ (CLOUD 模式) 或本地线程池 (LOCAL 模式)。
      • 事务初始化: 使用 MongoDB 事务原子性地创建 workflow_run 记录并更新主表 last_run_id,确保状态强一致。
    • 版本控制 (workflow_save_logic.py):
      • 克隆机制: 若修改公共模板 (owner='*'),自动触发“克隆”逻辑,创建归属于当前用户的新副本。
      • 权限校验: 严格校验 owner 字段,防止越权修改他人工作流。
    • CRUD管理:
      • workflow_list_logic.py: 支持基于 feature_tag 的多条件筛选(MongoDB $and 查询)和按更新时间倒序的分页查询。
      • workflow_delete_logic.py: 执行批量删除,包含严格的存在性检查和权限验证,确保操作的原子性和安全性。
    • 执行状态机: workflow_run_logic.py 初始化 PENDING 状态,run_workflow_utils.py 接管后续 RUNNING -> SUCCESS/FAILED 状态流转。支持 workflow_terminate_logic.py 触发的 MANUAL_STOP 手动终止(软停止)。
    • 日志优化 (workflow_logs_get_logic.py): 采用基于 Sequence 的增量查询机制,替代传统分页,有效解决日志实时滚动场景下的“跳页”问题。
    • 大对象处理 (workflow_output_logic.py / workflow_run_output_by_last_run_logic.py): 集成 GridFS 存储,并提供 Locator 定位器(如 data.list.0)支持按需提取深层嵌套数据,同时支持对 List 类型的内存切片分页,避免传输整个大文件。
    • 动态反射 (get_all_plugins_logic.py):
      • Schema 生成: 利用 Pydantic 的 model_json_schema 实时将 Python 节点类转换为前端可用的 JSON Schema,实现“代码即 UI”的动态表单生成。
      • 插件树构建: 根据 group 字段自动构建多层级的插件选择树。
  • Backtest Domain (回测域):

    • 核心组件: 包含 backtest_account_get_logic.py (账户), backtest_position_get_logic.py (持仓), backtest_profit_get_logic.py (收益), backtest_trade_get_logic.py (交易) 等模块。
    • 查询特性: 所有模块均实现了基于 back_id 的标准分页查询,并集成了 Pydantic 模型校验 (model_validate) 以过滤脏数据。
    • 日志流: backtest_user_strategy_log_get_logic.py 实现了高效的 Cursor Pagination (游标分页),使用 sort 字段进行范围查询 ($gt),专为海量日志流设计。
  • Trading Domain (交易域):

    • real_trade_order_logic.py: 提供实盘订单的标准 CRUD,自动维护 update_time
    • real_trade_binding_logic.py: 维护策略 ID 与实盘账户(如期货账户)的绑定关系,是交易指令路由的核心依据。
    • trad_constant.py: 集中定义了 Redis 通信协议的 Key(如 FUTURE_TRADE_ROUTE, REAL_TRADE_PROGRESS),确保多进程间的状态同步一致性。
  • UserPlugin Domain (插件域):

    • plugin_save_logic.py: 负责用户自定义代码的存储。在保存前调用 Validator 进行安全审计,并强制执行重名检测和所有权校验。

3.2.2 Models (数据模型)

基于 Pydantic 的类型定义,分为请求模型、响应模型和数据库模型三大类。

1. Backtest Domain Models (回测域)

专注于回测数据的标准化输出,采用统一的 items + pagination 列表响应结构。

  • models/backtest/query_account_response.py: 定义回测账户资金曲线的响应结构。
  • models/backtest/query_position_response.py: 定义回测每日持仓明细的响应结构。
  • models/backtest/query_profit_response.py: 定义回测每日收益统计的响应结构。
  • models/backtest/query_trade_response.py: 定义回测逐笔成交记录的响应结构。
  • models/backtest/query_user_strategy_log_response.py: 定义用户策略日志响应,特别采用了 Cursor Pagination (游标分页) 机制。
  • models/backtest/query_backtest_response.py: 定义单次回测任务的详细元数据及绩效指标响应。
2. UserPlugin Domain Models (插件域)

管理用户自定义代码的生命周期与权限。

  • models/userPlugin/user_plugin_model.py: 定义用户插件的 DB Schema,存储源代码、名称及所有权。
  • models/userPlugin/published_plugin_model.py: 定义已发布插件的快照 Schema,用于版本回溯。
  • models/userPlugin/plugin_subscription_model.py: 定义插件订阅关系的 Schema,包含有效期管理。
  • models/userPlugin/save_user_plugin_request.py: 定义保存插件的请求体,包含代码内容的输入验证。
  • models/userPlugin/save_user_plugin_response.py: 定义保存插件的响应体,返回 plugin_id
3. Core Workflow Models (核心工作流域)

定义 DAG 图结构及运行时状态,采用后端模型与前端 LiteGraph 双模共存策略。

  • models/workflow_model.py: 定义完整的工作流 DB Schema。
    • DAG 结构: 聚合 nodes (节点列表) 和 links (连线列表)。
    • UI 兼容: litegraph 字段存储前端画布原始 JSON。
  • models/work_node_model.py: 定义单个节点的属性,包含 static_input_data (静态参数) 和 output_db_id (输出结果引用)。
  • models/link_model.py: 定义节点间的连线关系,包含源节点与目标节点的端口映射。
  • models/workflow_run_model.py: 定义工作流运行实例的 DB Schema,独立存储运行状态 (status, progress),实现执行与定义的解耦。
  • models/feature_tag_detail_model.py: 定义功能标签详情,用于对工作流节点进行业务分类。
4. API Request/Response Models (通用交互)
  • models/save_workflow_request.py: 定义保存工作流的请求体,实现了引用完整性检查(如连线端点必须存在)。
  • models/save_workflow_response.py: 定义保存工作流的响应体,返回生成的 workflow_id
  • models/run_workflow_request.py: 定义触发运行及终止运行的请求体。
  • models/run_workflow_response.py: 定义触发运行的响应体,返回生成的 workflow_run_id
  • models/delete_workflow_request.py: 定义批量删除工作流的请求体。
  • models/query_workflow_run_response.py: 定义工作流运行状态查询的响应体。包含状态(status)、进度(progress)、各节点执行状态(running/success/failed_node_ids)及连线状态,用于前端实时监控工作流执行进度。
  • models/general_code_assistant_response.py: 定义通用代码助手(LLM)的响应结构。包含会话ID(session_id)和消息内容(new_code, explanation),用于标准化 AI 辅助编程接口的返回格式。
  • models/base_api_response.py: 定义所有 API 响应的基类 BaseAPIResponse。使用泛型 Generic[T] 包装 codemessagedata 字段,确保系统 API 接口返回结构的统一性。
  • models/query_workflows_response.py: 定义工作流列表查询的响应结构。
  • models/query_logs_response.py: 定义日志查询的响应结构。
  • models/all_plugins_response.py: 定义系统插件树的响应结构,返回所有可用插件的元数据及参数 Schema。

3.3 阶段三:服务与通信模块整合 (Integration)

本阶段负责系统的外部交互、消息传递及高级服务集成。

3.3.1 Messaging (消息队列)

基于 aio_pika 构建的高性能异步消息处理架构,实现了业务解耦、流量削峰和分布式任务分发。

  • RabbitMQ Client (rabbitmq_client.py):

    • 连接管理: 实现了健壮的连接池与自动重连机制 (connect_robust),支持指数退避重试 (max_retries)。
    • 通道复用: 维护全局 Channel 对象,减少 TCP 开销。
    • 消息原语:
      • publish: 声明 Exchange 并发布持久化消息。
      • consume: 封装了队列声明、绑定及 QoS (prefetch_count) 设置。支持自动 ACK/NACK 机制(异常时 requeue=False 防止死循环)。
      • consume_full_message: 专为日志设计,传递包含 Header 元信息的完整 JSON 包。
  • Consumer Manager (consumer_manager.py):

    • 消费者生命周期的大管家。
    • 并发控制: 根据 PANDA_SERVER_WORKFLOW_WORKERS 环境变量动态伸缩 Worker 数量。通常日志 Worker 数量设为执行 Worker 的一半 (// 2)。
    • 并行启动: 使用 asyncio.create_task 并行拉起所有消费者协程。
  • Workflow Consumer (workflow_consumer.py):

    • 监听: WORKFLOW_RUN_QUEUE (绑定到 WORKFLOW_EXCHANGE).
    • 动作: 收到消息后调用 run_workflow_in_background 触发 DAG 执行。
    • 容错: 捕获 Worker 级异常,并尝试调用 mark_workflow_run_failed 将数据库状态置为 FAILED,防止任务假死。
  • Log System (log_consumer.py & log_processor.py):

    • 架构: 实现了“生产-消费”模式的日志落库,将高频 IO 操作移出主业务链路。
    • 分发: LogConsumer 使用 consume_full_message 获取完整封包,根据 type="insert_workflow_log" 路由消息。
    • 处理器 (log_processor.py):
      • 序列号生成: 这是一个关键设计。使用 MongoDB 的原子操作 find_one_and_updateworkflow_sequence_counters 集合中维护每个 workflow_run_id 的自增 sequence。这确保了即使在多线程/多进程环境下,日志条目也能严格保持时序,解决了分布式日志乱序问题。
      • 持久化: 最终将带序列号的 UserLog 对象写入 workflow_logs 集合。

3.3.2 Routes (路由层)

FastAPI 的 Controller 层,负责请求分发、参数校验和上下文提取(如 uid)。根据业务复杂度,采用三种设计模式:

  • System Infrastructure (系统基础):

    • Base (base_routes.py): 系统健康检查接口。
      • Root Endpoint: / 路由返回 BaseAPIResponse,用于负载均衡器 (Load Balancer) 或监控系统确认 API Server 是否存活。
  • Standard Mode (标准业务模式 - Controller-Service):

    • 设计原则: 路由函数仅作为“瘦”入口,立即调用 logic/ 目录下的纯 Python 业务函数。
    • 模块分布:
      • Workflow (workflow_routes.py): 核心编排接口。
        • /save: 委托 workflow_save_logic,处理 DAG 存储与校验。
        • /all: 委托 workflow_list_logic,处理筛选与分页。
      • Plugins (plugins_routes.py): 插件系统接口。
        • /all: 委托 get_all_plugins_logic,构建动态插件树。
        • /save: 委托 user_plugin_save_logic,执行 AST 安全检查与存储。
      • Backtest (backtest_route.py): 数据查询接口。
        • 提供 /account, /position, /trade 等标准 REST 接口,严格对应 Logic 层的查询服务,确保回测数据的只读访问安全。
  • Streaming Mode (流式交互模式):

    • Chat (chat_routes.py): 专为 LLM 交互设计。
      • SSE 支持: /code-assistant-stream 使用 StreamingResponse 返回 text/event-stream,配合生成器 (yield) 实现打字机效果。
      • 协议封装: 自动将 Logic 层生成的 JSON 对象封装为 SSE 格式 (data: {...}\n\n)。
  • Inline/Hybrid Mode (内联/混合模式):

    • Trading (trading/trading_routes.py): 实盘交易控制接口。
    • IPC 通信: 深度集成 RedisClient,通过 Pub/Sub 向独立的实盘进程发送指令,并直接读写 Redis Hash 维护状态。
    • 依赖: 引用了 panda_trading 共享包的模型。

3.3.3 Services (外部服务)

services/llm 模块是 PandaAI 的智能核心,深度集成了 OpenAI API、垂直领域智能体 (Agents)、代码静态分析 (Code Checker) 及会话状态管理 (Session Management)。它不仅提供回测策略生成、因子挖掘和通用代码优化服务,还通过 “生成 -> 校验 -> 修正” 的自愈闭环,确保生成的代码既符合业务逻辑又具备安全性。

1. 智能体层 (agents/)
  • backtest_assistant.py (回测策略专家)
    • 功能: 专注于生成基于 panda_backtest 框架的量化回测策略代码。
    • 核心实现: 维护 [BacktestAssistant]类。它通过 [PromptsProvider]动态读取并注入回测引擎的 API 文档(实现逻辑见 [get_backtest_engine_readme]),并实现了核心的 自愈循环 (Self-Correction Loop)(代码实现请参考 [process_response_and_check_code]及 [process_message_nonstream] 中的重试循环):Agent 生成代码 -> 调用 BacktestCodeChecker 校验 -> 若失败,将错误信息作为 System Message 反馈给 Agent -> Agent 自我修正 -> 输出最终代码。
  • factor_assistant.py (因子挖掘专家)
    • 功能: 专注于生成量化因子,支持数学公式表达式 (Formula) 和 Python 类 (Class) 两种形式。
    • 核心实现: 维护 FactorAssistant 类。内置了 RANK, DELAY, CORRELATION 等常用金融算子的使用说明,指导 Agent 将自然语言需求转化为严谨的因子逻辑。
  • code_assistant.py (通用代码助手)
    • 功能: 处理非特定领域的通用 Python 代码生成、解释与优化任务。
    • 核心实现: 维护 CodeAssistant 类,提供基础的编程辅助能力,适用于数据清洗、工具函数编写等场景。
  • prompts_provider.py (提示词工程中心)
    • 功能: 作为系统的"提示词工厂",集中管理所有的 System Prompts 模板与构建逻辑,确保不同 Agent 的人设统一且专业。
    • 核心实现:
      • 提示词分层架构 (Prompt Layering): 采用模块化拼接方式构建 System Prompt,确保上下文的完整性与逻辑性。典型构建顺序如下(代码参考 join):
        1. Role & Context: 定义 Agent 的角色设定、任务边界及工作流上下文(如 role_and_context_backtest_assistant)。
        2. Response Format: 强制约束 LLM 输出严格的 JSON 格式(如 response_format),确保能够被系统程序化解析。
        3. Code Requirements: 注入特定领域的代码规范、依赖限制及最佳实践(如 backtest_code_requirements)。
        4. Domain Knowledge (RAG): 动态注入框架文档,赋予 Agent 领域知识。
      • 动态文档注入技术 (Dynamic RAG):
        • 为了让 Agent 能够准确使用系统内部 API,实现了实时文档读取机制(read_markdown_file)。
        • 智能标题降级 (Heading Shift): 在注入外部 Markdown 文档(如 panda_backtest/README.md)时,会自动调整文档中的标题层级(shift_markdown_headings),使其作为子章节自然融入 System Prompt 的整体结构中,避免破坏 Prompt 的逻辑层级。

    💡 架构设计洞察:模板方法 (Template Method)

    BacktestAssistantFactorAssistant 共享完全相同的交互骨架(“接收需求 -> 调用LLM -> 代码检查 -> 自动修正 -> 返回结果”),体现了模板方法设计模式。它们的差异仅在于配置注入

    1. 大脑设定 (Prompt): 通过 PromptsProvider 注入不同的专家人设与文档(回测文档 vs 因子文档)。
    2. 安检标准 (Checker): 调用不同的领域校验器(BacktestCodeChecker vs FactorCodeChecker)。
    #backtest_assistant.py system_prompt = pp.join( pp.role_and_context_backtest_assistant, # 设定为回测专家 pp.response_format, pp.backtest_code_requirements, # 要求使用 panda_backtest API pp.get_backtest_engine_doc(), # 注入 panda_backtest/README.md ) #factor_assistant.py system_prompt = pp.join( pp.role_and_context_factor_assistant, # 设定为因子专家 pp.response_format, pp.factor_code_requirements, # 要求使用 Formula 或 Factor Class pp.get_factor_engine_doc(), # 注入 panda_factor 文档 )
2. 基础服务层 (base/ & Root)
  • base/llm_service.py (LLM 通信基座)
    • 功能: 封装与 OpenAI API 的底层通信。
    • 核心实现: 初始化 AsyncOpenAI 客户端,提供统一的 chat_completion (非流式) 和 chat_completion_stream (流式) 接口。处理 API 鉴权、重试及异常捕获,屏蔽了底层 HTTP 细节。
  • utils.py (会话管理工具集)
    • 功能: 提供会话生命周期管理和消息持久化能力。
    • 核心实现: LLMServiceUtils 类。
      • CRUD: 负责 MongoDB chat_sessions 集合的增删改查。
      • 可见性控制: 实现了 Internal Message 机制。例如,代码检查器的报错信息 (is_internal=True) 仅对 AI 可见,用户无感知,保持了前端对话界面的整洁。
  • socketioUtils.py
    • 功能: 提供 WebSocket 通信支持,作为 SSE (Server-Sent Events) 的备选实时通信方案。
3. 代码静态分析层 (code_checker/)
  • base_code_checker.py (分析器基类)
    • 功能: 定义代码检查的通用接口。
    • 核心实现: 利用 Python 标准库 ast 解析代码生成抽象语法树,提供基础的语法错误检查 (check_syntax)。
  • backtest_code_checker.py (回测校验器)
    • 功能: 针对回测策略代码的深度审计。
    • 核心实现: 检查 initialize, handle_data 等必要回调函数是否定义;验证是否引用了不存在的 API;拦截非法全局变量使用。
  • factor_code_checker.py (因子校验器)
    • 功能: 针对因子代码的逻辑审计。
    • 核心实现: 验证因子表达式的合法性,或检查 Factor 子类的结构规范 (如必须包含 calculate 方法)。
  • object_usage_checker.py (对象调用检查)
    • 功能: 防止非法调用未授权的方法或属性。
    • 核心实现: 追踪变量类型,确保 Agent 仅调用了白名单内的对象方法 (如 context.order 是合法的,但 os.system 是非法的)。
  • variable_tracker.py (变量追踪器)
    • 功能: 辅助分析变量的生命周期与作用域,支持复杂的静态分析逻辑。
  • rules/ 目录
    • 功能: 配置具体的校验规则。
    • 核心实现: backtest_code_rules.pyfactor_code_rules.py 定义了具体的黑白名单、必需函数列表及 API 签名约束。
4. 业务逻辑层 (logic/)

负责编排 API 请求与 Agent 的交互,分离关注点 (Separation of Concerns)

  • *_logic.py (核心编排)
    • backtest_assistant_stream_logic.py / _nonstream_logic.py
    • factor_assistant_stream_logic.py / _nonstream_logic.py
    • code_assistant_stream_logic.py / _nonstream_logic.py
    • 功能: 接收 Route 层传入的参数,初始化对应的 Assistant 实例,调用 process_messageprocess_message_stream,并处理返回的生成器或结果对象。
  • 会话管理逻辑
    • get_session_list_logic.py: 获取用户的历史会话列表。
    • get_session_detail_visible_logic.py: 获取特定会话的详细消息记录,自动过滤掉 is_internal=True 的系统中间消息。
    • delete_session_logic.py: 执行会话删除操作。
5. 数据模型层 (models/)
  • Request Models (*_request.py)
    • 定义 API 输入参数的 Pydantic 模型,如 BacktestAssistantRequest (包含 session_id, message, model, original_code),确保输入数据的类型安全。
  • Database Models (chat_session_model.py)
    • 定义 MongoDB chat_sessions 集合的文档结构,包含 user_id, messages 列表等。
  • Message Model (message_model.py)
    • 定义单条消息的结构:role (user/assistant/system), content (内容), reasoning (思维链), is_internal (可见性标记)。
  • Enum Constants (enums/)
    • llm_model_type.py: 定义支持的模型 (如 DeepSeek-V3) 及配置。
    • role_type.py: 定义角色常量。
6. 核心交互流程 (Interaction Flow)
  1. 请求入口: 用户发送消息 -> FastAPI Route -> logic 层。
  2. 上下文构建: Logic 层调用 Assistant,通过 LLMServiceUtils 获取历史会话,结合 System Prompt (由 PromptsProvider 生成) 构建完整 Context。
  3. LLM 推理: LLMService 将 Context 发送给 OpenAI API。
  4. 流式响应与校验:
    • Agent 接收 LLM 的流式输出。
    • 若生成了代码,立即触发 CodeChecker 进行静态分析。
    • Pass: 将代码和解释通过 Generator 返回给前端。
    • Fail: 生成一条 Internal System Message 描述错误,自动触发 LLM 重试 (用户无感知),直到通过或达到最大重试次数。
  5. 持久化: LLMServiceUtils 将最终的用户提问和 AI 回答 (包含清洗后的代码) 存入 MongoDB。

4. 通信与并发模型

组件 通信方式 并发模型 备注
API Server HTTP/REST, SSE AsyncIO (Single Process) 高并发 I/O
Workflow Exec RabbitMQ (Msg) ThreadPool (in Async Worker) CPU 密集型任务隔离
RealTrade Redis Pub/Sub AsyncIO 实时指令下发
Logs RabbitMQ (Msg) Async Consumer 异步写库,低延迟

5. 关键依赖库

  • Web: FastAPI, Uvicorn, Starlette
  • Database: Motor (Async MongoDB), Redis (via global client)
  • Queue: aio_pika (Async RabbitMQ)
  • LLM: openai (Async Client)
  • Analysis: ast (Python Standard Lib)
最后一次编辑于 2026年02月12日 0

暂无评论

推荐阅读