Clickhouse+MongoDB本地部署搭建
  mithea 3天前 20 0

引言

我们在搭建量化数据库时,时常需要考虑到数据库与策略类型是否兼容。数据库的选型直接影响数据存储、查询效率、回测速度和实盘交易的稳定性。根据不同类型数据库可分为以下几类:

1. 时间序列数据库

适用场景:高频交易、Tick级数据、市场深度数据等高频时间序列存储。
推荐选项

  • InfluxDB
    • 优势:专为时间序列优化,高性能写入和压缩,支持连续查询(CQ)和降采样。
    • 缺点:集群版闭源,社区版功能有限。
  • TimescaleDB
    • 优势:基于PostgreSQL的扩展,支持SQL语法,兼容金融生态工具(如Python/R)。
    • 缺点:处理超高频数据时性能略逊于InfluxDB。
  • Kdb+
    • 优势:高频交易领域标杆,极低延迟,内置q语言处理时序数据。
    • 缺点:学习曲线陡峭,商业授权费用高。

2. 关系型数据库 (RDBMS)

适用场景:结构化数据存储(如基本面数据、投资组合信息)、中低频策略回测。
推荐选项

  • PostgreSQL
    • 优势:扩展性强(支持TimescaleDB、PL/Python等),ACID事务完善,适合复杂分析。
  • MySQL/MariaDB
    • 优势:轻量级,适合中小规模数据,运维简单。
  • SQL Server
    • 优势:与Microsoft生态集成好,支持高级分析(如SSAS)。

3. NoSQL数据库

适用场景:非结构化或半结构化数据(如新闻情绪、社交媒体数据)。
推荐选项

  • MongoDB
    • 优势:灵活存储JSON格式数据,适合事件驱动的异构数据(如财报、新闻)。
  • Elasticsearch
    • 优势:全文检索能力强,适合日志分析和文本情感分析。

4. 分布式大数据平台

适用场景:超大规模数据(如全市场历史数据、多资产类别数据)。
推荐选项

  • Apache Cassandra
    • 优势:线性扩展能力,适合写入密集型场景(如存储全球市场Tick数据)。
  • ClickHouse
    • 优势:列式存储,亚秒级聚合查询,适合OLAP分析。
  • DuckDB
    • 优势:轻量级嵌入式OLAP引擎,适合本地化分析(替代SQLite的进阶选择)。

5. 内存数据库

适用场景:低延迟实盘交易、高频回测。
推荐选项

  • Redis
    • 优势:支持数据结构丰富(如有序集合存储L2行情),微秒级延迟。
  • MemSQL (现SingleStore)
    • 优势:兼容SQL的同时支持内存优化,适合混合工作负载。

本期就数据库选型做一次step by step搭建。

I. 为什么Clickhouse+MongoDB

在量化金融中,ClickHouse + MongoDB 的组合可以发挥两者的互补优势,尤其适合需要同时处理 结构化时序数据 和 非结构化/半结构化数据 的场景(如高频行情数据 + 动态修改参数)。以下是具体优势分析:


1. ClickHouse 的核心优势

适用场景:高频时序数据存储、快速聚合分析、OLAP 查询。
量化金融中的典型用途

  • 存储股票/期货的 Tick 数据、分钟/日线行情。
  • 快速计算技术指标(如移动平均、波动率)。
  • 多维度聚合查询(如按行业、市值分组统计收益)。

优势

  • 列式存储:高效压缩,减少 I/O 压力,适合大规模时间序列数据。
  • 向量化执行引擎:利用 CPU SIMD 指令加速计算,聚合查询比传统数据库快 10-100 倍。
  • 实时写入与查询:支持高吞吐写入(百万行/秒)和亚秒级响应。
  • SQL 兼容:学习成本低,可直接用 SQL 实现复杂分析(如窗口函数)。

2. MongoDB 的核心优势

适用场景:非结构化数据存储、灵活模式变更、文本/事件驱动分析。
量化金融中的典型用途

  • 存储新闻、社交媒体、财报文本(如 Reuters/Bloomberg 新闻流)。
  • 记录交易日志、信号事件(如订单执行结果)。
  • 存储异构数据(如不同交易所的原始 JSON 格式行情快照)。

优势

  • 文档模型灵活:无需预定义 Schema,可动态添加字段(如新增情感分析字段)。
  • 全文检索:支持关键词搜索、正则匹配,适合新闻情绪分析。
  • 嵌套数据结构:直接存储 JSON 格式的 L2 订单簿或复杂事件。
  • 水平扩展:分片集群轻松应对海量非结构化数据。

使用这个组合的首要原因,两者都免费,我穷。
其次,clickhouse处理海量时序数据快速预聚合需要计算的数据非常有效,mongoDB可以弥补clickhouse的短板例如动态schema,处理非结构化数据,动态参数调整。方便。
基础的流程我们可以如下设计:
image.png

II. 完整流程:

1. 确定数据接口:

此次实践用的tushare官网的期货与期权接口,
期货合约信息表 接口:fut_basic,
交易日历 接口:trade_cal,
期货历史分钟行情 接口:ft_mins,
结算参数 接口:fut_settle,
期货主力与连续合约 接口:fut_mapping,
期权合约信息 接口:opt_basic,
期权历史分钟行情 接口:opt_mins,

2. 绘制流程图:

image.png

3. 创建MongoDB集合:

根据流程图确定需要在MongoDB创建的集合:
image.png
实际名称根据个人情况而定。

4. Tushare合约信息提取到MongoDB

考虑到合约信息类数据是Reference, 我们将信息类数据放在MongoDB里保存,这里我用了fut_basic(获取期货合约列表数据)接口和fut_mapping(期货主力与连续合约), fut_basic 可以用于区别主力合约与其他合约,我们可以根据tushare官方给的定义来用代码区分:
image.png
这里给出具体代码:

a. fut_basic.py

# -*- coding: utf-8 -*-

"""
期货合约基本信息采集脚本 - 原始数据版本
支持合约类型和上市日期筛选
"""

import os
import sys
import argparse
import logging
import configparser
from datetime import datetime
import pymongo
import pandas as pd
import tushare as ts

# 设置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger('fut_basic_collector')

# 配置管理器
class ConfigManager:
    def __init__(self, config_path='config.ini'):
        self.config_path = config_path
        self.config = configparser.ConfigParser()
        self.load_config()
    
    def load_config(self):
        """加载配置文件"""
        if not os.path.exists(self.config_path):
            logger.error(f"配置文件 {self.config_path} 不存在")
            sys.exit(1)
            
        try:
            self.config.read(self.config_path, encoding='utf-8')
            # 设置日志级别
            log_level = self.config.get('settings', 'log_level', fallback='INFO')
            logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
        except Exception as e:
            logger.error(f"加载配置文件失败: {str(e)}")
            sys.exit(1)
    
    def get_db_config(self):
        """获取数据库配置"""
        return {
            'mongodb_uri': self.config.get('db', 'mongodb_uri'),
            'db_name': self.config.get('db', 'db_name')
        }
    
    def get_tushare_token(self):
        """获取Tushare Token"""
        return self.config.get('tushare', 'token')
    
    def get_available_exchanges(self):
        """获取支持的交易所列表"""
        return [ex.strip() for ex in self.config.get('settings', 'exchanges', fallback='').split(',')]

# 数据处理器
class DataProcessor:
    def __init__(self, config_manager):
        self.config = config_manager
    
    def get_fut_basic(self, exchange, fut_type, list_date=None):
        """从 Tushare 获取期货合约基本信息(原始数据)"""
        try:
            logger.info(f"开始获取 {exchange} 交易所期货合约数据 (合约类型={fut_type})...")
            
            # 初始化 Tushare Pro
            pro = ts.pro_api(self.config.get_tushare_token())
            
            # 获取数据
            df = pro.fut_basic(
                exchange=exchange,
                fields='ts_code,symbol,name,fut_code,exchange,list_date,delist_date,trade_unit,per_unit,quote_unit,per_weight,d_month'
            )
            
            if not df.empty:
                # 添加合约类型字段
                df['fut_type'] = fut_type
                
                # 添加采集时间戳
                df['collected_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                
                # 根据上市日期筛选 (如果提供了list_date)
                if list_date:
                    # 确保list_date是字符串格式 (YYYYMMDD)
                    list_date_str = str(list_date)
                    # 筛选list_date >= 指定日期的记录
                    df = df[df['list_date'] >= list_date_str]
                    logger.info(f"筛选后数据量: {len(df)} 条 (上市日期 >= {list_date_str})")
                
                logger.info(f"成功获取 {len(df)} 条期货合约数据")
            else:
                logger.warning(f"未获取到 {exchange} 交易所的期货合约数据")
                
            return df
        
        except Exception as e:
            logger.error(f"获取数据时出错: {str(e)}")
            raise
    
    def save_to_mongodb(self, df, collection_name="ref_FUT_instrument_raw"):
        """将原始数据保存到 MongoDB"""
        if df.empty:
            logger.warning("没有数据需要保存")
            return
        
        try:
            # 获取数据库配置
            db_config = self.config.get_db_config()
            
            # 连接 MongoDB
            client = pymongo.MongoClient(db_config['mongodb_uri'])
            db = client[db_config['db_name']]
            collection = db[collection_name]
            
            # 创建索引
            collection.create_index([("ts_code", pymongo.ASCENDING)])
            collection.create_index([("fut_type", pymongo.ASCENDING)])
            
            # 转换为字典格式
            records = df.to_dict('records')
            
            # 批量插入/更新数据
            operations = []
            for record in records:
                # 使用 ts_code + fut_type 作为唯一标识
                unique_key = f"{record['ts_code']}_{record['fut_type']}"
                operations.append(pymongo.ReplaceOne(
                    {'_id': unique_key},
                    record,
                    upsert=True
                ))
            
            # 执行批量操作
            result = collection.bulk_write(operations)
            
            logger.info(
                f"数据保存成功: "
                f"插入 {result.upserted_count} 条, "
                f"更新 {result.modified_count} 条, "
                f"匹配 {result.matched_count} 条"
            )
            
            return result
        
        except Exception as e:
            logger.error(f"保存数据到 MongoDB 时出错: {str(e)}")
            raise

# 主控制器
class MainController:
    def __init__(self):
        self.config_manager = ConfigManager()
        self.data_processor = DataProcessor(self.config_manager)
    
    def validate_exchange(self, exchange):
        """验证交易所代码是否有效"""
        available_exchanges = self.config_manager.get_available_exchanges()
        if exchange not in available_exchanges:
            raise ValueError(f"无效的交易所代码: {exchange}。可用值: {', '.join(available_exchanges)}")
    
    def validate_fut_type(self, fut_type):
        """验证合约类型是否有效"""
        if fut_type not in [1, 2]:
            raise ValueError("合约类型无效,应为 1 (普通合约) 或 2 (主力与连续合约)")
    
    def run(self):
        # 设置命令行参数
        parser = argparse.ArgumentParser(description='期货合约基础信息采集脚本 - 原始数据版本')
        parser.add_argument('--exchange', type=str, required=True, 
                            help='交易所代码: CFFEX, SHFE, DCE, CZCE, INE, GFEX')
        parser.add_argument('--fut_type', type=int, required=True, choices=[1, 2],
                            help='合约类型: 1 (普通合约), 2 (主力与连续合约)')
        parser.add_argument('--list_date', type=str,
                            help='筛选上市日期之后的合约 (格式: YYYYMMDD)')
        
        args = parser.parse_args()
        
        try:
            # 参数验证
            self.validate_exchange(args.exchange)
            self.validate_fut_type(args.fut_type)
            
            # 获取数据
            df = self.data_processor.get_fut_basic(
                exchange=args.exchange,
                fut_type=args.fut_type,
                list_date=args.list_date
            )
            
            # 保存到 MongoDB
            if not df.empty:
                self.data_processor.save_to_mongodb(df)
            
            logger.info("脚本执行完成")
            return 0
        
        except Exception as e:
            logger.exception("脚本执行失败")
            return 1

if __name__ == "__main__":
    controller = MainController()
    sys.exit(controller.run())


5. 配置clickhouse及分钟数据导入

这一部分也是对我来说耗时最长的部分。本人代码薄弱,对我而言clickhouse的相关问题偏难,只能参考网上的帖子以及ds问答式生产。最终,我想把我的经验分享给大家:

a. windows上的clickhouse

有clickhouse使用经验的朋友可以跳到下一小节。首先,我是通过docker部署的clickhouse,参考官网的这一份文档:https://clickhouse.com/docs/zh/install/docker。

有集群需求的朋友可以看看这一篇:https://www.cnblogs.com/liugp/p/17501491.html。

b. 数据持久化的部署方式

为了使docker上clickhouse的数据持久化docker创建的容器不与本地绑定的话删除容器则会消失,我们需要做本地挂载卷处理,及将数据库目录与本地目录绑定。
在windows部署clickhouse并实现数据持久化有几种方法:

  1. 原生安装:在wsl2安装clickhouse并挂载数据到/var/lib/clickhouse,这样最接近原生性能但会遇到Windows (NTFS) 和 Linux (ext4) 的权限模型不同,导致 ClickHouse 无法访问数据目录的问题,我尝试过格式化磁盘为ext4格式,权限问题依然存在,故放弃,有理解的大神欢迎指点。
  2. 云服务器部署:在本地创建一个虚拟linux服务器,需要用到wsl1,感兴趣的朋友可以参考这篇文章:https://mp.weixin.qq.com/s/ezUENAlDBDI57gaGKb9OSQ,由于是个人本地部署,不涉及集群工作,所以放弃,需要的朋友可以考虑,或者买一台设备做linux服务器更直接。
  3. docker部署:简易,灵活,非高强度使用环境下稳定性达标,用docker volume自动管理权限,也可以避免在windows下的权限问题,可以优化的地方在于,迁移wsl的地址到ssd盘,然后将数据挂载在wsl目录而非本地目录,既可以充分利用clickhouse兼容ext4格式的性能,也解决了数据持久化的问题。

具体操作如下:

  1. 迁移wsl路径
    通过修改 WSL2 全局配置文件 %USERPROFILE%.wslconfig,可指定所有新发行版的默认存储路径:
[wsl2]
memory=4GB
swap=2GB
kernel=C:\\temp\\myCustomKernel
root = Q:\\qdb\\unknown\\wsl #替换为你指定路径
注意:此方法仅对新安装的发行版生效,现有发行版仍需通过导出/导入迁移。
  1. 部署docker-compose文件
`version: '3.8'

  

services:

  clickhouse-server:

    image: clickhouse/clickhouse-server:latest  # 使用最新LTS版本

    container_name: ck

    hostname: clickhouse-server

    restart: unless-stopped

    ulimits:

      nofile:

        soft: 262144

        hard: 262144

    environment:

      TZ: Asia/Shanghai

    volumes:

      - ./config:/etc/clickhouse-server/config.d

      - ./users:/etc/clickhouse-server/users.d

      - /root/clickhouse/data:/var/lib/clickhouse:z  # 更新为新的数据目录

    ports:

      - "8123:8123"

      - "9000:9000"

      - "9009:9009"

    networks:

      clickhouse-network:

        aliases:

          - clickhouse

  
  

networks:

  clickhouse-network:

    driver: bridge

比较重要的点:
i. 推荐选择最新稳定版镜像或者2.5以上版本

image: clickhouse/clickhouse-server:latest

新出的功能lazy materialization对于数据庞大且复杂的查询(排序)十分迅速,这对于我们对于实时数据及时性有要求的场景(比如计算并列出在某一时刻全品种的特征排序)可能会起到一定作用,还需测试;

ii:

volumes:

      - ./config:/etc/clickhouse-server/config.d

      - ./users:/etc/clickhouse-server/users.d

      - /root/clickhouse/data:/var/lib/clickhouse:z

前两者是clickhouse的config和users文件,挂载到主机方便修改,- /root/clickhouse/data:/var/lib/clickhouse:z 则是实现挂载clickhouse导入的数据到wsl目录下,即物理存储在我们指定的ssd磁盘路径,兼顾性能与稳定的数据持久化。

c. config & users

在创建容器时就需要绑定本地目录以及log和config文件(如上)。如果没有默认的config和users文件,可以通过镜像拉取默认config的users到./config和./users. 为了使clickhouse在后续写入分钟级数据及后续聚合能力平衡,我们需要改动config.xml和users.xml文件,具体改动为:

i. clickhouse级里的


        <max_suspicious_broken_parts>1000</max_suspicious_broken_parts>

        <max_bytes_to_merge_at_max_space_in_pool>1073741824</max_bytes_to_merge_at_max_space_in_pool>

        <ttl_only_drop_parts>1</ttl_only_drop_parts>

        <parts_to_delay_insert>3000</parts_to_delay_insert>

        <parts_to_throw_insert>3000</parts_to_throw_insert>

        <max_delay_to_insert>1000</max_delay_to_insert>

    </merge_tree>

  

    <mongodb>

        <connections_pool_size>20</connections_pool_size>

        <connection_timeout>300</connection_timeout>

    </mongodb>

ii. users.xml里default级


            <max_memory_usage_for_user>40000000000</max_memory_usage_for_user>

            <max_bytes_before_external_group_by>20000000000</max_bytes_before_external_group_by>

            <max_bytes_before_external_sort>20000000000</max_bytes_before_external_sort>

            <max_partitions_per_insert_block>5000</max_partitions_per_insert_block>


这里指的是内存,可以按自己硬件调整。

iii. 新版本clickhouse(>2.5)权限问题

到这里又是一个坑要避:新版clickhouse权限需要用default用户先创建一个用户然后在给这个用户所有权限,所以需要在users文件:

		<default>
		... 
		<access_management>1</access_management>
		<named_collection_control>1</named_collection_control>
		<show_named_collections>1</show_named_collections>
		<show_named_collections_secrets>1</show_named_collections_secrets>
		</default> 

这个时候default用户就可以通过sql创建其他用户和给新用户赋权的能力,具体在下方会说明。

d. clickhouse初期建表

不同于MongoDB,clickhouse在导入数据之前创建表单结构,这取决于使用者给予clickhouse的目的是什么。就我而言,clickhouse会作为原始分钟数据的数据库,并可以帮助我更有效的从不同维度做研究。那么,就会涉及到以下几点:

i. 表单设计:
由于是分钟级数据也是原始数据,我的想法是尽量保证数据的原始性,同时尽可能兼顾后续聚合或者做分析时的便捷性:

CREATE TABLE IF NOT EXISTS {full_table_name}

        (

            ts_code String COMMENT '合约代码(如P2207.DCE)',

            trade_time DateTime64(3, 'Asia/Shanghai') COMMENT '交易时间',

            open Float32 COMMENT '开盘价',

            high Float32 COMMENT '最高价',

            low Float32 COMMENT '最低价',

            close Float32 COMMENT '收盘价',

            vol UInt32 COMMENT '成交量',

            amount Float64 COMMENT '成交额',

            oi UInt32 COMMENT '持仓量',

            prt LowCardinality(String) MATERIALIZED splitByChar('.', ts_code)[1] COMMENT '品种代码',

            trade_date Date MATERIALIZED toDate(trade_time) COMMENT '交易日期',

            month UInt32 MATERIALIZED toYYYYMM(trade_time) COMMENT '月份分区键',

            _version UInt8 DEFAULT 1 COMMENT '数据版本'

        )

        ENGINE = MergeTree()

        PARTITION BY month

        ORDER BY (trade_time, prt, ts_code)

        SETTINGS

            min_bytes_for_wide_part = 10485760

        """

ts_code-oi字段是tushare接口的返回值,prt,trade_date,month,则是基于原始数据衍生字段,MATERIALIZED代表着不会显示在表单里,但是可以在查询时引入,这里纳入表格当中是为了后续在创建投影和物化视图做准备,也方便做简单查询。

ENGINE = MergeTree是因为Mergetree用于原始表相比于其他引擎具有性能优势,也正是因为我们选择Mergetree,添加version为了区别导入数据的批次以免混乱。

选择 PARTITION BY month 因为过多和过小的分区都会降低clickhouse性能(会报错too many parts),且以时间维度作为分区键方便后续需要每天更新或者修改的场景(clickhouse尽量不要精修,整行整区替换性能影响最低),ORDER BY (trade_time, prt, ts_code)则是模拟了后续查询需求的频率选择的排序键。

ii. 后续兼容
对于现阶段对数据的理解,这样设置字段满足我的需求。
1). 投影
投影是clickhouse的一项功能,也是我选择clickhouse作为分钟级数据库的原因之一。clickhouse作为列式数据库,擅长处理时序数据和矩阵化因子计算,投影则是加强clickhouse在处理时序数据上的优势,原理是在主表上预聚合数据,适合做日内的计算和监控等。

2). 物化视图
物化视图则是弥补clickhouse在截面上的不足,虽及时性不如投影,但是由于投影依赖主表排序键,物化视图在需要分组聚合场景下非常有效。

两者都是为了避免全盘扫描的情况做性能上的补偿,由于策略上的差异化,投影和物化视图可以用在不同地方,这里不做具体实施,大家可以自行研究,放一个图片来大家可以代入下:
image.png

不熟悉docker部署的,尽量按照我的yaml来,因为会很麻烦。
docker-compose.yaml建好以后,管理员运行Ubuntu 输入命令:
1. cd (docker-compose在ubuntu的位置)
2. 输入: docker-compose up -d
3. 然后docker ps检查status是否为UP状态(多刷新几次,避免restarting和starting状态)
4. 无误后输入: docker exec -it ck bash \ clickhouse-client --user default
5. CREATE USERS clickhouse_admin(用户名) IDENTIFIED BY ‘(用户的密码)’
6. image.png
clickhouse官方文档:https://clickhouse.com/docs/operations/access-rights
目的是运行docker-compose文件创建容器和拉取镜像,然后创建一个具有所有权限的用户方便使用,例如:
CREATE USERS dbeaver IDENTIFIED BY ‘123’

至此,clickhouse则是成功通过docker部署在本地。

e. 导入数据到clickhouse

建议大家自行根据实际MongoDB集合和对照API文档编写导入逻辑,下面这段导入代码跑完用了2天半,你猜我怎么知道的(IT这一块/.)。

import time
from datetime import datetime
import pandas as pd
from pymongo import MongoClient
import tushare as ts
from clickhouse_driver import Client as ClickHouseClient
from concurrent.futures import ThreadPoolExecutor
from collections import deque
import threading

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.FileHandler('data_import.log', encoding='utf-8'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# MongoDB 配置
MONGO_URI = '你的url'
MONGO_DB_NAME = 'QR_raw'
MONGO_COLLECTION = 'fut_name'

# Tushare 配置
TS_TOKEN = '你的token'
ts.set_token(TS_TOKEN)
pro = ts.pro_api()

# ClickHouse 配置
CLICKHOUSE_HOST = 'localhost'  # 与 docker-compose 网络别名一致
CLICKHOUSE_PORT = 9000  # 与 docker-compose 端口映射一致
CLICKHOUSE_USER = '你的用户'  # 请确保与 users.d 配置中的用户匹配
CLICKHOUSE_PASSWORD = '你的密码'  # 请确保与 users.d 配置中的密码匹配
CLICKHOUSE_DB = 'quantDB'
CLICKHOUSE_TABLE = 'FUTBAR_1M_RAW'

# 常量
BATCH_SIZE = 100000
MAX_WORKERS = 4
MAX_CALLS_PER_MINUTE = 480

# 连接MongoDB
def connect_mongo():
    try:
        client = MongoClient(MONGO_URI)
        db = client[MONGO_DB_NAME]
        logger.info("成功连接到MongoDB")
        return db
    except Exception as e:
        logger.error(f"连接MongoDB失败: {e}")
        raise

# 连接ClickHouse并创建数据库和表
def connect_clickhouse():
    try:
        # 初始连接到default数据库
        client = ClickHouseClient(
            host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, user=CLICKHOUSE_USER,
            password=CLICKHOUSE_PASSWORD, database='default'
        )
        # 测试连接
        client.execute('SELECT 1')
        logger.info("成功连接到ClickHouse服务器")
        # 创建数据库(如果不存在)
        client.execute(f'CREATE DATABASE IF NOT EXISTS {CLICKHOUSE_DB}')
        # 切换到目标数据库
        client = ClickHouseClient(
            host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, user=CLICKHOUSE_USER,
            password=CLICKHOUSE_PASSWORD, database=CLICKHOUSE_DB
        )
        # 创建表(如果不存在)
        client.execute(f'''
            CREATE TABLE IF NOT EXISTS {CLICKHOUSE_DB}.{CLICKHOUSE_TABLE}
                (
                    ts_code String COMMENT '合约代码(如P2207.DCE)',
                    trade_time DateTime64(3, 'Asia/Shanghai') COMMENT '交易时间',
                    open Float32 COMMENT '开盘价',
                    high Float32 COMMENT '最高价',
                    low Float32 COMMENT '最低价',
                    close Float32 COMMENT '收盘价',
                    vol Int32 COMMENT '成交量(手)',
                    amount Float64 COMMENT '成交额(万元)',
                    oi Int32 COMMENT '持仓量',
                    prt LowCardinality(String) MATERIALIZED splitByChar('.', ts_code)[1] COMMENT '品种代码',
                    trade_date Date MATERIALIZED toDate(trade_time) COMMENT '交易日期',
                    month UInt32 MATERIALIZED toYYYYMM(trade_time) COMMENT '月份分区键',
                    _version UInt8 DEFAULT 1 COMMENT '数据版本'
                )
                ENGINE = MergeTree()
                PARTITION BY month
                ORDER BY (trade_time, prt, ts_code)
                SETTINGS 
                    min_bytes_for_wide_part = 10485760
                /* 表结构优化用于金融时间序列数据 */
        ''')
        logger.info("成功确保ClickHouse数据库和表存在")
        return client
    except Exception as e:
        logger.error(f"连接ClickHouse或创建数据库/表失败: {e}")
        raise

# 日期格式转换
def convert_date(date_str, is_start=True):
    if not date_str:
        return '2025-06-30 19:00:00' if not is_start else None
    date = datetime.strptime(date_str, '%Y%m%d')
    time_suffix = '09:00:00' if is_start else '19:00:00'
    formatted_date = date.strftime(f'%Y-%m-%d {time_suffix}')
    if not is_start and datetime.strptime(formatted_date, '%Y-%m-%d %H:%M:%S') > datetime(2025, 6, 30, 19, 0, 0):
        return '2025-06-30 19:00:00'
    return formatted_date

# API 调用频率限制
api_call_timestamps = deque()
api_lock = threading.Lock()

def rate_limiter():
    with api_lock:
        now = time.time()
        while api_call_timestamps and api_call_timestamps[0] < now - 60:
            api_call_timestamps.popleft()
        if len(api_call_timestamps) >= MAX_CALLS_PER_MINUTE:
            wait_time = 60 - (now - api_call_timestamps[0])
            if wait_time > 0:
                time.sleep(wait_time)
        api_call_timestamps.append(time.time())

# 获取Tushare数据
def fetch_tushare_data(ts_code, start_date, end_date):
    data_list = []
    current_start = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
    max_rows_per_call = 8000

    while current_start < end_date:
        current_end = min(current_start + pd.Timedelta(days=1), end_date)
        try:
            rate_limiter()
            df = pro.ft_mins(
                ts_code=ts_code, freq='1min',
                start_date=current_start.strftime('%Y-%m-%d %H:%M:%S'),
                end_date=current_end.strftime('%Y-%m-%d %H:%M:%S')
            )
            if not df.empty:
                data_list.append(df)
        except Exception as e:
            logger.error(f"{ts_code} 获取数据失败: {e}")
            time.sleep(60)
        current_start = current_end

    if data_list:
        return pd.concat(data_list, ignore_index=True)
    return pd.DataFrame()

# 插入ClickHouse
def insert_to_clickhouse(client, data):
    if data.empty:
        return 0

    columns = ['ts_code', 'trade_time', 'open', 'high', 'low', 'close', 'vol', 'amount', 'oi']
    try:
        data = data[columns].copy()
        for col in ['vol', 'oi']:
            data[col] = data[col].astype(float)
            non_int_rows = data[data[col].apply(lambda x: x != int(x) if pd.notnull(x) else False)]
            if not non_int_rows.empty:
                logger.warning(f"列 '{col}' 包含非整数值,以下是前 10 行异常数据:")
                for idx, row in non_int_rows.head(10).iterrows():
                    logger.warning(f"异常行: {row.to_dict()}")
            data[col] = data[col].apply(lambda x: int(x) if pd.notnull(x) and x == int(x) else None)
        data = data.dropna(subset=['vol', 'oi'])
        data['trade_time'] = pd.to_datetime(data['trade_time'])
        data = data.to_dict(orient='records')
        client.execute(f'INSERT INTO {CLICKHOUSE_TABLE} ({", ".join(columns)}) VALUES', data, types_check=True)
        return len(data)
    except Exception as e:
        logger.error(f"插入ClickHouse失败: {e}")
        return 0

# 处理fut_name
def process_fut_name(fut):
    ts_code = fut['ts_code']
    list_date = convert_date(fut.get('list_date'), is_start=True)
    delist_date = convert_date(fut.get('delist_date'), is_start=False)

    if not list_date:
        logger.warning(f"{ts_code} 缺少list_date,跳过")
        return pd.DataFrame()

    start_time = time.time()
    logger.info(f"开始处理 {ts_code}")

    data = fetch_tushare_data(ts_code, list_date, delist_date)

    end_time = time.time()
    elapsed_time = end_time - start_time
    rows_fetched = len(data) if not data.empty else 0

    logger.info(
        f"完成 {ts_code} 处理,起止时间: {list_date} 至 {delist_date},"
        f"获取行数: {rows_fetched},耗时: {elapsed_time:.2f}秒"
    )
    return data


# 主函数
def main(incremental=False):
    mongo_db = connect_mongo()
    clickhouse_client = connect_clickhouse()

    query = {} if not incremental else {'collected_at': {'$gt': '2025-01-01'}}
    fut_names = list(mongo_db[MONGO_COLLECTION].find(query, {'ts_code': 1, 'list_date': 1, 'delist_date': 1}))
    total_fut_names = len(fut_names)
    processed = 0
    data_buffer = []
    total_rows_inserted = 0  # 新增:总插入行数统计
    recent_inserts = deque(maxlen=10)  # 新增:跟踪最近10次插入量

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(process_fut_name, fut): fut for fut in fut_names}
        for future in futures:
            try:
                data = future.result()
                if not data.empty:
                    data_buffer.append(data)
                    if sum(len(df) for df in data_buffer) >= BATCH_SIZE:
                        merged_data = pd.concat(data_buffer, ignore_index=True)
                        logger.info(
                            f"准备批量插入,列名: {merged_data.columns.tolist()},行数: {len(merged_data)}"
                        )
                        rows_inserted = insert_to_clickhouse(clickhouse_client, merged_data)
                        total_rows_inserted += rows_inserted  # 新增:累加总插入行数
                        recent_inserts.append(rows_inserted)  # 新增:记录最近插入量
                        if rows_inserted > 0:
                            logger.info(f"成功插入 {rows_inserted} 行数据到ClickHouse")
                        else:
                            logger.warning("插入失败,未插入任何数据")
                        data_buffer = []
                processed += 1
                if processed % 10 == 0:
                    recent_sum = sum(recent_inserts)  # 新增:最近10次插入量合计
                    logger.info(
                        f"总体进度: 已处理 {processed} / {total_fut_names} 个ts_code,"
                        f"总插入行数: {total_rows_inserted},最近10次插入量合计: {recent_sum}"
                    )
            except Exception as e:
                logger.error(f"处理fut_name记录时出错: {e}")

    if data_buffer:
        merged_data = pd.concat(data_buffer, ignore_index=True)
        logger.info(
            f"准备批量插入剩余数据,列名: {merged_data.columns.tolist()},行数: {len(merged_data)}"
        )
        rows_inserted = insert_to_clickhouse(clickhouse_client, merged_data)
        total_rows_inserted += rows_inserted  # 新增:累加总插入行数
        recent_inserts.append(rows_inserted)  # 新增:记录最近插入量
        if rows_inserted > 0:
            logger.info(f"成功插入 {rows_inserted} 行数据到ClickHouse")
        else:
            logger.warning("插入失败,未插入任何数据")

    logger.info(f"完成所有数据处理,共处理 {processed} 个ts_code,总插入行数: {total_rows_inserted}")

if __name__ == "__main__":
    main(incremental=False)

期权数据的获取方式与期货数据的获取方式相同,故不再做过多叙述。

因为clickhouse与windows兼容性问题,以及自己对数据库知识的缺乏导致整个过程缓慢。

文章的主要目的是以实践的方式尽可能的规避clickhouse+mongoDB在本地部署时遇到的一些问题,生产环境时也可修改docker-compose文件结合Prometheus和grafana实现对数据的监控,希望对各位读者有所帮助。

最后一次编辑于 3天前 1

暂无评论