Panda Server 模块深度分析
src/panda_server 是 Panda Quantflow 平台的控制中枢,基于 FastAPI 构建。本文档基于对源代码的深度解析,详细阐述其启动流程、模块架构、通信机制及核心业务逻辑。
1. 系统启动与生命周期 (Entry Point & Lifecycle)
入口文件为 src/panda_server/main.py,其启动流程严格遵循以下顺序:
1.1 启动序列
- 环境加载: 优先加载
.env文件,并设置sys.path包含项目根目录及src目录。 - FastAPI 应用初始化: 创建
app实例,配置lifespan上下文管理器。 - Lifespan (Startup 阶段):
- 数据库连接:
mongodb.connect_db()建立异步连接。 - 索引初始化:
mongodb.init_local_db()自动创建必要的 Collection 索引。 - 插件加载:
load_all_nodes()动态扫描并加载所有工作流节点(内置及用户插件)。 - 消息队列 (CLOUD 模式):
- 连接 RabbitMQ (
AsyncRabbitMQ). - 根据
SERVER_ROLE(CONSUMER/ALL) 启动QueueConsumerManager,并行运行WorkflowConsumer和LogConsumer。
- 连接 RabbitMQ (
- 数据库连接:
- 中间件与路由: 配置 CORS,挂载静态资源 (
/quantflow,/charts),注册各模块 Router。 - 服务运行: 通过
uvicorn.run在 8000 端口启动 ASGI 服务。
2. 目录结构与模块映射
src/panda_server/ ├── main.py # 入口:生命周期、依赖注入、中间件配置 ├── config/ # 配置:Database(Motor), Env, MongoDB Indexes ├── routes/ # 控制层 (Controller):解析参数,调用 Logic │ ├── workflow_routes.py # 工作流管理 -> logic/workflow_*.py │ ├── chat_routes.py # LLM 对话 (SSE) -> services/llm │ ├── trading/ # 实盘交易 -> (Inline Logic + Redis) │ └── ... ├── logic/ # 业务层 (Service):纯 Python 业务逻辑,无 HTTP 依赖 │ ├── workflow_run_logic.py # 运行分发 (Local/Cloud 策略) │ ├── userPlugin/ # 插件保存与验证逻辑 │ └── ... ├── services/ # 外部服务集成 │ └── llm/ # LLM 核心架构 (Agent, CodeChecker) ├── messaging/ # 异步消息系统 (RabbitMQ) │ ├── consumer_manager.py # 消费者进程管理 │ ├── workflow_consumer.py# 工作流执行消费者 │ └── log_consumer.py # 日志持久化消费者 ├── utils/ # 核心工具库 │ ├── run_workflow_utils.py # DAG 调度与执行引擎 │ └── userPlugin/ # AST 代码安全检查器 └── models/ # 数据模型 (Pydantic Request/Response)
3. 分模块深度解析
3.1 阶段一:基础架构模块 (Infrastructure)
本阶段包含系统的底层支撑组件,负责环境配置、数据库连接、类型定义及核心工具库。
3.1.1 Config (配置管理)
负责系统基础设施的连接与配置。
- Database (
database.py): 使用motor封装异步 MongoDB 连接。支持ReplicaSet和Single模式(通过MONGO_TYPE切换)。 - Env (
env.py): 集中加载.env环境变量,提供MONGO_URI,RABBITMQ_URI等关键配置的默认值。 - Index Management (
mongodb_index_*.py):- 声明式定义:
mongodb_index_config.py定义了各集合(如workflow,workflow_logs)的索引结构。 - 自动同步:
mongodb_index_manager.py在启动时对比并自动创建缺失索引,确保 Schema 性能优化。
- 声明式定义:
3.1.2 Enums (枚举定义)
定义系统通用的状态与类型常量。
- WorkflowRunStatus: 运行状态枚举 (
PENDING,RUNNING,SUCCESS,FAILED,TERMINATED)。 - FeatureTag: 功能标签枚举,用于区分不同类型的插件或节点功能。
3.1.3 Utils (核心工具)
- Workflow Engine (
run_workflow_utils.py):- 拓扑排序:
determine_workflow_execution_order计算节点执行层级。 - 并发调度: 使用
run_in_threadpool将同步的节点计算任务放入线程池,避免阻塞主循环。
- 拓扑排序:
- Storage (
db_storage.py): 封装了cloudpickle序列化,自动根据数据大小选择存储在普通 Collection 或 GridFS (大文件)。 - Time (
time_utils.py): 统一系统时间为北京时间,处理时区转换。 - UserPlugin Validator (
userPlugin/):- AST 静态分析:
user_plugin_validator.py利用抽象语法树 (AST) 对用户代码进行深度审计,而非简单的正则匹配。 - 安全规则:
user_plugin_rules.py定义了黑名单,禁止导入os,sys,subprocess等危险模块,禁止使用eval,exec等动态执行函数,确保插件在沙箱般的约束下运行。
- AST 静态分析:
3.1.4 Migrations (版本迁移)
位于 migrations/ 目录,提供数据库 Schema 演进能力。该模块的设计理念是原子性和增量更新。
- 通用索引管理 (
index_common_manager.py):- 封装了
create_collection_indexes和drop_collection_indexes,支持原子化的索引变更。 - 提供
verify_indexes_exist机制,确保索引变更真实落地。
- 封装了
- 业务索引优化:
index_workflow_logs.py: 针对日志查询慢的问题,引入了复合索引(如按user_id+workflow_id+time排序),并支持“先建后删”的平滑迁移模式。index_workflow_sequence_counters.py: 通过添加唯一索引 (unique: True),从数据库物理层强制保证了序列计数器的唯一性,解决了并发下的数据一致性问题。
- 数据迁移事务:
workflow_demo_migration.py展示了使用 MongoDB 事务 (async with session.start_transaction()) 进行多集合(Workflow, Run, GridFS)级联更新的最佳实践,确保迁移过程中不会出现数据“断头”或状态不一致。
3.2 阶段二:业务核心模块 (Business Core)
本阶段包含核心业务逻辑与数据模型,实现了工作流编排、回测数据处理及实盘交易控制。
3.2.1 Logic (业务逻辑层)
纯 Python 编写的核心业务层,与 HTTP 协议解耦,便于测试和复用。
-
Workflow Domain (工作流域):
- 执行策略 (
workflow_run_logic.py):- 双模式调度: 依据
RUN_MODE环境变量,将任务分发至 RabbitMQ (CLOUD 模式) 或本地线程池 (LOCAL 模式)。 - 事务初始化: 使用 MongoDB 事务原子性地创建
workflow_run记录并更新主表last_run_id,确保状态强一致。
- 双模式调度: 依据
- 版本控制 (
workflow_save_logic.py):- 克隆机制: 若修改公共模板 (
owner='*'),自动触发“克隆”逻辑,创建归属于当前用户的新副本。 - 权限校验: 严格校验
owner字段,防止越权修改他人工作流。
- 克隆机制: 若修改公共模板 (
- CRUD管理:
workflow_list_logic.py: 支持基于feature_tag的多条件筛选(MongoDB$and查询)和按更新时间倒序的分页查询。workflow_delete_logic.py: 执行批量删除,包含严格的存在性检查和权限验证,确保操作的原子性和安全性。
- 执行状态机:
workflow_run_logic.py初始化 PENDING 状态,run_workflow_utils.py接管后续 RUNNING -> SUCCESS/FAILED 状态流转。支持workflow_terminate_logic.py触发的 MANUAL_STOP 手动终止(软停止)。 - 日志优化 (
workflow_logs_get_logic.py): 采用基于 Sequence 的增量查询机制,替代传统分页,有效解决日志实时滚动场景下的“跳页”问题。 - 大对象处理 (
workflow_output_logic.py/workflow_run_output_by_last_run_logic.py): 集成 GridFS 存储,并提供 Locator 定位器(如data.list.0)支持按需提取深层嵌套数据,同时支持对 List 类型的内存切片分页,避免传输整个大文件。 - 动态反射 (
get_all_plugins_logic.py):- Schema 生成: 利用 Pydantic 的
model_json_schema实时将 Python 节点类转换为前端可用的 JSON Schema,实现“代码即 UI”的动态表单生成。 - 插件树构建: 根据
group字段自动构建多层级的插件选择树。
- Schema 生成: 利用 Pydantic 的
- 执行策略 (
-
Backtest Domain (回测域):
- 核心组件: 包含
backtest_account_get_logic.py(账户),backtest_position_get_logic.py(持仓),backtest_profit_get_logic.py(收益),backtest_trade_get_logic.py(交易) 等模块。 - 查询特性: 所有模块均实现了基于
back_id的标准分页查询,并集成了 Pydantic 模型校验 (model_validate) 以过滤脏数据。 - 日志流:
backtest_user_strategy_log_get_logic.py实现了高效的 Cursor Pagination (游标分页),使用sort字段进行范围查询 ($gt),专为海量日志流设计。
- 核心组件: 包含
-
Trading Domain (交易域):
real_trade_order_logic.py: 提供实盘订单的标准 CRUD,自动维护update_time。real_trade_binding_logic.py: 维护策略 ID 与实盘账户(如期货账户)的绑定关系,是交易指令路由的核心依据。trad_constant.py: 集中定义了 Redis 通信协议的 Key(如FUTURE_TRADE_ROUTE,REAL_TRADE_PROGRESS),确保多进程间的状态同步一致性。
-
UserPlugin Domain (插件域):
plugin_save_logic.py: 负责用户自定义代码的存储。在保存前调用Validator进行安全审计,并强制执行重名检测和所有权校验。
3.2.2 Models (数据模型)
基于 Pydantic 的类型定义,分为请求模型、响应模型和数据库模型三大类。
1. Backtest Domain Models (回测域)
专注于回测数据的标准化输出,采用统一的 items + pagination 列表响应结构。
models/backtest/query_account_response.py: 定义回测账户资金曲线的响应结构。models/backtest/query_position_response.py: 定义回测每日持仓明细的响应结构。models/backtest/query_profit_response.py: 定义回测每日收益统计的响应结构。models/backtest/query_trade_response.py: 定义回测逐笔成交记录的响应结构。models/backtest/query_user_strategy_log_response.py: 定义用户策略日志响应,特别采用了 Cursor Pagination (游标分页) 机制。models/backtest/query_backtest_response.py: 定义单次回测任务的详细元数据及绩效指标响应。
2. UserPlugin Domain Models (插件域)
管理用户自定义代码的生命周期与权限。
models/userPlugin/user_plugin_model.py: 定义用户插件的 DB Schema,存储源代码、名称及所有权。models/userPlugin/published_plugin_model.py: 定义已发布插件的快照 Schema,用于版本回溯。models/userPlugin/plugin_subscription_model.py: 定义插件订阅关系的 Schema,包含有效期管理。models/userPlugin/save_user_plugin_request.py: 定义保存插件的请求体,包含代码内容的输入验证。models/userPlugin/save_user_plugin_response.py: 定义保存插件的响应体,返回plugin_id。
3. Core Workflow Models (核心工作流域)
定义 DAG 图结构及运行时状态,采用后端模型与前端 LiteGraph 双模共存策略。
models/workflow_model.py: 定义完整的工作流 DB Schema。- DAG 结构: 聚合
nodes(节点列表) 和links(连线列表)。 - UI 兼容:
litegraph字段存储前端画布原始 JSON。
- DAG 结构: 聚合
models/work_node_model.py: 定义单个节点的属性,包含static_input_data(静态参数) 和output_db_id(输出结果引用)。models/link_model.py: 定义节点间的连线关系,包含源节点与目标节点的端口映射。models/workflow_run_model.py: 定义工作流运行实例的 DB Schema,独立存储运行状态 (status,progress),实现执行与定义的解耦。models/feature_tag_detail_model.py: 定义功能标签详情,用于对工作流节点进行业务分类。
4. API Request/Response Models (通用交互)
models/save_workflow_request.py: 定义保存工作流的请求体,实现了引用完整性检查(如连线端点必须存在)。models/save_workflow_response.py: 定义保存工作流的响应体,返回生成的workflow_id。models/run_workflow_request.py: 定义触发运行及终止运行的请求体。models/run_workflow_response.py: 定义触发运行的响应体,返回生成的workflow_run_id。models/delete_workflow_request.py: 定义批量删除工作流的请求体。models/query_workflow_run_response.py: 定义工作流运行状态查询的响应体。包含状态(status)、进度(progress)、各节点执行状态(running/success/failed_node_ids)及连线状态,用于前端实时监控工作流执行进度。models/general_code_assistant_response.py: 定义通用代码助手(LLM)的响应结构。包含会话ID(session_id)和消息内容(new_code,explanation),用于标准化 AI 辅助编程接口的返回格式。models/base_api_response.py: 定义所有 API 响应的基类BaseAPIResponse。使用泛型Generic[T]包装code、message和data字段,确保系统 API 接口返回结构的统一性。models/query_workflows_response.py: 定义工作流列表查询的响应结构。models/query_logs_response.py: 定义日志查询的响应结构。models/all_plugins_response.py: 定义系统插件树的响应结构,返回所有可用插件的元数据及参数 Schema。
3.3 阶段三:服务与通信模块整合 (Integration)
本阶段负责系统的外部交互、消息传递及高级服务集成。
3.3.1 Messaging (消息队列)
基于 aio_pika 构建的高性能异步消息处理架构,实现了业务解耦、流量削峰和分布式任务分发。
-
RabbitMQ Client (
rabbitmq_client.py):- 连接管理: 实现了健壮的连接池与自动重连机制 (
connect_robust),支持指数退避重试 (max_retries)。 - 通道复用: 维护全局 Channel 对象,减少 TCP 开销。
- 消息原语:
publish: 声明 Exchange 并发布持久化消息。consume: 封装了队列声明、绑定及 QoS (prefetch_count) 设置。支持自动 ACK/NACK 机制(异常时requeue=False防止死循环)。consume_full_message: 专为日志设计,传递包含 Header 元信息的完整 JSON 包。
- 连接管理: 实现了健壮的连接池与自动重连机制 (
-
Consumer Manager (
consumer_manager.py):- 消费者生命周期的大管家。
- 并发控制: 根据
PANDA_SERVER_WORKFLOW_WORKERS环境变量动态伸缩 Worker 数量。通常日志 Worker 数量设为执行 Worker 的一半 (// 2)。 - 并行启动: 使用
asyncio.create_task并行拉起所有消费者协程。
-
Workflow Consumer (
workflow_consumer.py):- 监听:
WORKFLOW_RUN_QUEUE(绑定到WORKFLOW_EXCHANGE). - 动作: 收到消息后调用
run_workflow_in_background触发 DAG 执行。 - 容错: 捕获 Worker 级异常,并尝试调用
mark_workflow_run_failed将数据库状态置为 FAILED,防止任务假死。
- 监听:
-
Log System (
log_consumer.py&log_processor.py):- 架构: 实现了“生产-消费”模式的日志落库,将高频 IO 操作移出主业务链路。
- 分发:
LogConsumer使用consume_full_message获取完整封包,根据type="insert_workflow_log"路由消息。 - 处理器 (
log_processor.py):- 序列号生成: 这是一个关键设计。使用 MongoDB 的原子操作
find_one_and_update在workflow_sequence_counters集合中维护每个workflow_run_id的自增sequence。这确保了即使在多线程/多进程环境下,日志条目也能严格保持时序,解决了分布式日志乱序问题。 - 持久化: 最终将带序列号的
UserLog对象写入workflow_logs集合。
- 序列号生成: 这是一个关键设计。使用 MongoDB 的原子操作
3.3.2 Routes (路由层)
FastAPI 的 Controller 层,负责请求分发、参数校验和上下文提取(如 uid)。根据业务复杂度,采用三种设计模式:
-
System Infrastructure (系统基础):
- Base (
base_routes.py): 系统健康检查接口。- Root Endpoint:
/路由返回BaseAPIResponse,用于负载均衡器 (Load Balancer) 或监控系统确认 API Server 是否存活。
- Root Endpoint:
- Base (
-
Standard Mode (标准业务模式 - Controller-Service):
- 设计原则: 路由函数仅作为“瘦”入口,立即调用
logic/目录下的纯 Python 业务函数。 - 模块分布:
- Workflow (
workflow_routes.py): 核心编排接口。/save: 委托workflow_save_logic,处理 DAG 存储与校验。/all: 委托workflow_list_logic,处理筛选与分页。
- Plugins (
plugins_routes.py): 插件系统接口。/all: 委托get_all_plugins_logic,构建动态插件树。/save: 委托user_plugin_save_logic,执行 AST 安全检查与存储。
- Backtest (
backtest_route.py): 数据查询接口。- 提供
/account,/position,/trade等标准 REST 接口,严格对应 Logic 层的查询服务,确保回测数据的只读访问安全。
- 提供
- Workflow (
- 设计原则: 路由函数仅作为“瘦”入口,立即调用
-
Streaming Mode (流式交互模式):
- Chat (
chat_routes.py): 专为 LLM 交互设计。- SSE 支持:
/code-assistant-stream使用StreamingResponse返回text/event-stream,配合生成器 (yield) 实现打字机效果。 - 协议封装: 自动将 Logic 层生成的 JSON 对象封装为 SSE 格式 (
data: {...}\n\n)。
- SSE 支持:
- Chat (
-
Inline/Hybrid Mode (内联/混合模式):
- Trading (
trading/trading_routes.py): 实盘交易控制接口。 - IPC 通信: 深度集成
RedisClient,通过 Pub/Sub 向独立的实盘进程发送指令,并直接读写 Redis Hash 维护状态。 - 依赖: 引用了
panda_trading共享包的模型。
- Trading (
3.3.3 Services (外部服务)
services/llm 模块是 PandaAI 的智能核心,深度集成了 OpenAI API、垂直领域智能体 (Agents)、代码静态分析 (Code Checker) 及会话状态管理 (Session Management)。它不仅提供回测策略生成、因子挖掘和通用代码优化服务,还通过 “生成 -> 校验 -> 修正” 的自愈闭环,确保生成的代码既符合业务逻辑又具备安全性。
1. 智能体层 (agents/)
backtest_assistant.py(回测策略专家)- 功能: 专注于生成基于
panda_backtest框架的量化回测策略代码。 - 核心实现: 维护 [
BacktestAssistant]类。它通过 [PromptsProvider]动态读取并注入回测引擎的 API 文档(实现逻辑见 [get_backtest_engine_readme]),并实现了核心的 自愈循环 (Self-Correction Loop)(代码实现请参考 [process_response_and_check_code]及 [process_message_nonstream] 中的重试循环):Agent 生成代码 -> 调用BacktestCodeChecker校验 -> 若失败,将错误信息作为 System Message 反馈给 Agent -> Agent 自我修正 -> 输出最终代码。
- 功能: 专注于生成基于
factor_assistant.py(因子挖掘专家)- 功能: 专注于生成量化因子,支持数学公式表达式 (Formula) 和 Python 类 (Class) 两种形式。
- 核心实现: 维护
FactorAssistant类。内置了RANK,DELAY,CORRELATION等常用金融算子的使用说明,指导 Agent 将自然语言需求转化为严谨的因子逻辑。
code_assistant.py(通用代码助手)- 功能: 处理非特定领域的通用 Python 代码生成、解释与优化任务。
- 核心实现: 维护
CodeAssistant类,提供基础的编程辅助能力,适用于数据清洗、工具函数编写等场景。
prompts_provider.py(提示词工程中心)- 功能: 作为系统的"提示词工厂",集中管理所有的 System Prompts 模板与构建逻辑,确保不同 Agent 的人设统一且专业。
- 核心实现:
- 提示词分层架构 (Prompt Layering): 采用模块化拼接方式构建 System Prompt,确保上下文的完整性与逻辑性。典型构建顺序如下(代码参考
join):- Role & Context: 定义 Agent 的角色设定、任务边界及工作流上下文(如
role_and_context_backtest_assistant)。 - Response Format: 强制约束 LLM 输出严格的 JSON 格式(如
response_format),确保能够被系统程序化解析。 - Code Requirements: 注入特定领域的代码规范、依赖限制及最佳实践(如
backtest_code_requirements)。 - Domain Knowledge (RAG): 动态注入框架文档,赋予 Agent 领域知识。
- Role & Context: 定义 Agent 的角色设定、任务边界及工作流上下文(如
- 动态文档注入技术 (Dynamic RAG):
- 为了让 Agent 能够准确使用系统内部 API,实现了实时文档读取机制(
read_markdown_file)。 - 智能标题降级 (Heading Shift): 在注入外部 Markdown 文档(如
panda_backtest/README.md)时,会自动调整文档中的标题层级(shift_markdown_headings),使其作为子章节自然融入 System Prompt 的整体结构中,避免破坏 Prompt 的逻辑层级。
- 为了让 Agent 能够准确使用系统内部 API,实现了实时文档读取机制(
- 提示词分层架构 (Prompt Layering): 采用模块化拼接方式构建 System Prompt,确保上下文的完整性与逻辑性。典型构建顺序如下(代码参考
💡 架构设计洞察:模板方法 (Template Method)
BacktestAssistant与FactorAssistant共享完全相同的交互骨架(“接收需求 -> 调用LLM -> 代码检查 -> 自动修正 -> 返回结果”),体现了模板方法设计模式。它们的差异仅在于配置注入:- 大脑设定 (Prompt): 通过
PromptsProvider注入不同的专家人设与文档(回测文档 vs 因子文档)。 - 安检标准 (Checker): 调用不同的领域校验器(
BacktestCodeCheckervsFactorCodeChecker)。
#backtest_assistant.py system_prompt = pp.join( pp.role_and_context_backtest_assistant, # 设定为回测专家 pp.response_format, pp.backtest_code_requirements, # 要求使用 panda_backtest API pp.get_backtest_engine_doc(), # 注入 panda_backtest/README.md ) #factor_assistant.py system_prompt = pp.join( pp.role_and_context_factor_assistant, # 设定为因子专家 pp.response_format, pp.factor_code_requirements, # 要求使用 Formula 或 Factor Class pp.get_factor_engine_doc(), # 注入 panda_factor 文档 )
2. 基础服务层 (base/ & Root)
base/llm_service.py(LLM 通信基座)- 功能: 封装与 OpenAI API 的底层通信。
- 核心实现: 初始化
AsyncOpenAI客户端,提供统一的chat_completion(非流式) 和chat_completion_stream(流式) 接口。处理 API 鉴权、重试及异常捕获,屏蔽了底层 HTTP 细节。
utils.py(会话管理工具集)- 功能: 提供会话生命周期管理和消息持久化能力。
- 核心实现:
LLMServiceUtils类。- CRUD: 负责 MongoDB
chat_sessions集合的增删改查。 - 可见性控制: 实现了 Internal Message 机制。例如,代码检查器的报错信息 (
is_internal=True) 仅对 AI 可见,用户无感知,保持了前端对话界面的整洁。
- CRUD: 负责 MongoDB
socketioUtils.py- 功能: 提供 WebSocket 通信支持,作为 SSE (Server-Sent Events) 的备选实时通信方案。
3. 代码静态分析层 (code_checker/)
base_code_checker.py(分析器基类)- 功能: 定义代码检查的通用接口。
- 核心实现: 利用 Python 标准库
ast解析代码生成抽象语法树,提供基础的语法错误检查 (check_syntax)。
backtest_code_checker.py(回测校验器)- 功能: 针对回测策略代码的深度审计。
- 核心实现: 检查
initialize,handle_data等必要回调函数是否定义;验证是否引用了不存在的 API;拦截非法全局变量使用。
factor_code_checker.py(因子校验器)- 功能: 针对因子代码的逻辑审计。
- 核心实现: 验证因子表达式的合法性,或检查 Factor 子类的结构规范 (如必须包含
calculate方法)。
object_usage_checker.py(对象调用检查)- 功能: 防止非法调用未授权的方法或属性。
- 核心实现: 追踪变量类型,确保 Agent 仅调用了白名单内的对象方法 (如
context.order是合法的,但os.system是非法的)。
variable_tracker.py(变量追踪器)- 功能: 辅助分析变量的生命周期与作用域,支持复杂的静态分析逻辑。
rules/目录- 功能: 配置具体的校验规则。
- 核心实现:
backtest_code_rules.py和factor_code_rules.py定义了具体的黑白名单、必需函数列表及 API 签名约束。
4. 业务逻辑层 (logic/)
负责编排 API 请求与 Agent 的交互,分离关注点 (Separation of Concerns)
*_logic.py(核心编排)backtest_assistant_stream_logic.py/_nonstream_logic.pyfactor_assistant_stream_logic.py/_nonstream_logic.pycode_assistant_stream_logic.py/_nonstream_logic.py- 功能: 接收 Route 层传入的参数,初始化对应的 Assistant 实例,调用
process_message或process_message_stream,并处理返回的生成器或结果对象。
- 会话管理逻辑
get_session_list_logic.py: 获取用户的历史会话列表。get_session_detail_visible_logic.py: 获取特定会话的详细消息记录,自动过滤掉is_internal=True的系统中间消息。delete_session_logic.py: 执行会话删除操作。
5. 数据模型层 (models/)
- Request Models (
*_request.py)- 定义 API 输入参数的 Pydantic 模型,如
BacktestAssistantRequest(包含session_id,message,model,original_code),确保输入数据的类型安全。
- 定义 API 输入参数的 Pydantic 模型,如
- Database Models (
chat_session_model.py)- 定义 MongoDB
chat_sessions集合的文档结构,包含user_id,messages列表等。
- 定义 MongoDB
- Message Model (
message_model.py)- 定义单条消息的结构:
role(user/assistant/system),content(内容),reasoning(思维链),is_internal(可见性标记)。
- 定义单条消息的结构:
- Enum Constants (
enums/)llm_model_type.py: 定义支持的模型 (如DeepSeek-V3) 及配置。role_type.py: 定义角色常量。
6. 核心交互流程 (Interaction Flow)
- 请求入口: 用户发送消息 -> FastAPI Route ->
logic层。 - 上下文构建: Logic 层调用
Assistant,通过LLMServiceUtils获取历史会话,结合 System Prompt (由PromptsProvider生成) 构建完整 Context。 - LLM 推理:
LLMService将 Context 发送给 OpenAI API。 - 流式响应与校验:
- Agent 接收 LLM 的流式输出。
- 若生成了代码,立即触发
CodeChecker进行静态分析。 - Pass: 将代码和解释通过 Generator 返回给前端。
- Fail: 生成一条 Internal System Message 描述错误,自动触发 LLM 重试 (用户无感知),直到通过或达到最大重试次数。
- 持久化:
LLMServiceUtils将最终的用户提问和 AI 回答 (包含清洗后的代码) 存入 MongoDB。
4. 通信与并发模型
| 组件 | 通信方式 | 并发模型 | 备注 |
|---|---|---|---|
| API Server | HTTP/REST, SSE | AsyncIO (Single Process) | 高并发 I/O |
| Workflow Exec | RabbitMQ (Msg) | ThreadPool (in Async Worker) | CPU 密集型任务隔离 |
| RealTrade | Redis Pub/Sub | AsyncIO | 实时指令下发 |
| Logs | RabbitMQ (Msg) | Async Consumer | 异步写库,低延迟 |
5. 关键依赖库
- Web:
FastAPI,Uvicorn,Starlette - Database:
Motor(Async MongoDB),Redis(via global client) - Queue:
aio_pika(Async RabbitMQ) - LLM:
openai(Async Client) - Analysis:
ast(Python Standard Lib)