Auto-Trading 量化交易系统
  红塔山 4天前 53 0

Auto-Trading 量化交易系统


1. 项目简介

Auto-Trading 是一个专为A股市场设计的、事件驱动的量化交易系统。项目采用现代化的 Python 技术栈和 Monorepo 架构,旨在构建一个从数据采集、处理、存储到交易执行的高性能、高可靠的全流程平台。

目前,系统的核心 market 服务(行情与交易中枢) 已具备生产级的稳定性和性能。


2. 核心亮点 ✨

  • 高性能异步数据流: 整个实时行情处理链路完全基于 asyncio 构建,实现了从数据接收、内部缓冲到数据库写入的全异步流程,具备极高的 I/O 吞吐能力。

  • 健壮的背压与降载机制:

    • 上游缓冲: MiniQMTFeed 内置了基于 asyncio.Queue 的有界队列(100万条),在消费速度跟不上时提供缓冲。
    • 下游降载: FeedEngine 采用“溢出丢弃”策略的并行写入器(Worker Pool)。当数据库成为瓶颈时,它会主动丢弃积压的旧数据,优先保证主循环能持续获取最新行情,确保系统的低延迟和实时性,避免因内存耗尽而崩溃。
  • 数据库端深度优化:

    • 读写分离: 使用 PostgreSQL 存储低频业务数据,使用 TimescaleDB 存储海量时序数据。
    • 高性能写入: TickRepository 采用 UNLOGGED 表作为临时中转,通过 COPY 命令批量载入,并利用 DISTINCT ON 在数据库端高效去重,写入性能远超传统的 INSERT ... ON CONFLICT
    • K线自动聚合: K线生成完全下沉到 TimescaleDB 内部,通过 time_bucket 和存储过程自动完成,应用层代码不参与任何聚合计算,极大降低了复杂度和资源消耗。
  • 生产级的交易接口:

    • 提供 RESTful 和 WebSocket 两种接口。
    • WebSocket 接口实现了全生命周期事件推送超时保护资源自动清理,为程序化交易提供了稳定可靠的执行反馈。
    • 通过 slowapi 实现了基于 IP 的速率限制(20次/秒),有效防止客户端滥用和恶意攻击。
  • 清晰的模块化架构 (Poetry Monorepo):

    • 项目采用 Poetry Workspace 管理,每个服务(如 market)都是一个独立的子包,拥有自己的 pyproject.toml
    • 实现了清晰的依赖隔离,market 服务自己管理 xtquant, akshare, slowapi 等专属依赖,根项目只负责共享库和开发工具,架构清晰,易于维护和扩展。
  • 高可靠的数据同步机制:

    • 通过 kline_eod_task (日终检查) 和 kline_backfill_task (历史回填) 两个任务的协同工作,形成双保险机制。
    • 利用 symbol 表中的 kline_checked_at (JSONB) 字段作为状态机,确保了 K 线数据的最终一致性,并能自动修复任何历史数据空洞。

3. 技术架构

3.1 架构图

+--------------------------+ +----------------------+ | Strategy / UI | | MiniQMT Client | | (Clients / Other Svcs) | | (External Source) | +--------------------------+ +----------------------+ REST/WebSocket Tick Callback v v +----------------------------------------------------------+ | Market Service (src/market) | | +-----------------------+ +--------------------------+ | | | [Real-time Hot Path] | | [Offline Cold Path] | | | | +-------------+ | | +-----------------+ | | | | | MiniQMTFeed | | | | SyncScheduler | | | | | +-------------+ | | +-----------------+ | | | | Raw Tick | | | Scheduled | | | | v | | v | | | | +-------------+ | | +-----------------+ | | | | | BoundedQueue| | | | Sync/Check Tasks| | | | | | (Drop if Full)| | | +-----------------+ | | | | +-------------+ | | | | | | | Batch Tick | | | Read/Write | | | | v | | | Checkpoint | | | | +-------------+ | | v | | | | | FeedEngine | | | +-----------------+ | | | | +-------------+ | | | SymbolRepository| | | | | v | | +-----------------+ | | | | +-------------+ | +--------------------------+ | | | | Worker Pool | | | | | | Async Tasks | | | | | +-------------+ | | | | Parallel Write | | | | v | | | | +-------------+ | | | | |TickRepository| | | | | +-------------+ | | | +-----------------------+ | +--------------------|----------------------|--------------+ | | High-Perf COPY | v v +----------------------------------------------------------+ | Database Layer | | +---------------------+ +---------------------------+ | | | PostgreSQL | | TimescaleDB | | | | (trading DB) | | (trading_ts DB) | | | | +-----------------+ | | +-----------------------+ | | | | | symbol Table | | | | ticks Table | | | | | +-----------------+ | | +-----------------------+ | | | | | | | Internal | | | | | | v Aggregation | | | | | | +-----------------------+ | | | | | | | klines Table | | | | | | | +-----------------------+ | | | +---------------------+ +---------------------------+ | +----------------------------------------------------------+

3.2 技术栈

类别 技术 用途
Web 框架 FastAPI 提供高性能的异步 Web API
接口限流 SlowAPI 为交易接口提供速率限制保护
数据库 PostgreSQL, TimescaleDB 业务数据存储、时序数据存储与处理
异步驱动 asyncio, asyncpg 提供异步 I/O 能力
依赖管理 Poetry 项目依赖与 Monorepo 工作区管理
代码质量 Black, Ruff, MyPy 格式化、Lint检查、静态类型检查
数据源 xtquant, akshare 行情与基础数据获取

4. market 服务深度解析

4.1 实时行情处理 (热路径)

  1. MiniQMTFeed (生产者): 从 xtquant 接收 Tick,放入一个最大容量为100万的内部队列。队列满则丢弃新数据。
  2. FeedEngine (消费者): 主循环永不阻塞,持续从队列获取数据。
  3. Worker Pool (并行写入): FeedEngine 维护一个固定大小(如5个)的“工人池”。
    • 如果工人池有空位,则创建一个后台任务去执行数据库写入。
    • 如果工人池已满,则直接丢弃当前这批数据,保证主循环能继续获取最新行情。
  4. TickRepository (高性能存储): 使用 UNLOGGED 表、COPY 命令和 DISTINCT ON 将数据高效存入 TimescaleDB。

4.2 离线数据同步与核查 (冷路径)

  1. SyncScheduler: 一个定时调度器,负责在每日特定时间触发各类同步任务。
  2. 基础数据同步: 包括 stock_basic_task, financial_task 等,负责每日更新股票列表、财务数据等低频信息。
  3. K线完整性保障:
    • kline_eod_task: 每日收盘后运行,只检查当天数据是否完整,并快速修复。成功后将 kline_checked_atlatest 日期推进到今天。
    • kline_backfill_task: 低频运行,负责修复任何历史数据空洞,保证数据的最终一致性。

5. 当前进度与后续计划

✅ 已完成且稳定

  • market 服务核心功能:
    • 高性能的实时 Tick 数据接收、处理和存储链路。
    • 健壮的 K 线数据日终核查与历史回填机制。
    • 稳定、安全且带有限流功能的 RESTful 和 WebSocket 交易接口。
    • 每日定时同步股票基础信息、财务、分红等数据的能力。
  • 项目架构:
    • 清晰的 Poetry Monorepo 依赖管理结构。
    • 统一的代码质量与测试工具链。
    • 完善的数据库 Schema 设计。

🚀 后续计划

  • strategy 服务开发: 基于 market 服务提供的稳定数据和交易接口,开始开发策略生成、回测和实盘运行的逻辑。
  • agent 服务集成: 将 rd-agent 或其他研究平台与本系统打通,实现研究到执行的自动化。
  • 监控与告警: 为 FeedEngine 的数据丢弃行为、数据同步任务的失败、数据库性能等关键指标添加监控和告警(如 Prometheus + Grafana)。
  • 管理后台 UI: 开发一个前端界面,用于查看系统状态、数据同步进度、账户信息和手动执行交易。
  • 测试覆盖率提升: 为现有代码编写更全面的单元测试和集成测试。
最后一次编辑于 4天前 0

暂无评论