概述
因为高频数据量非常大,若要进行多年度的回测需要大量的时间计算,所以我采用先计算因子值,计算完之后再执行回测,本篇主要分享可以优化的方向以及一些高频数据预处理的发现。
1.高频因子特点
- 相比低频因子IC半衰期更短
- 需要更高频的调仓(如日频),带来换手率的增加,但基金业绩表明,即使高频因子的换手率更高,但高频因子的信息收益在合适的控制下仍然高于高换手率的成本
- Level 2 级的数据资讯更多但同时也有更大的噪声
- 本系列将依照高频数据低频化的思想构,围绕四个分类构建高频因子,具体细节留到下篇详细介绍
2.因子值计算
本篇研报的高频因子是面向收益率的分布做构建,日内收益率涵盖许多丰富的信息。常见的特征如均值、方差、峰度、偏度等等的统计量都是实证发现有效的因子。这些统计量的优势是非线性的因子,对于泛化能力较强。
2.1 收益率分布面临的问题
-
流动性差的股票并非每分钟都有搓合,缺失的成交价会以前一分钟填补,这样的数据会有更大的自相关性
-
数据并非独立同分布
研报中提到 Barndor-Nielsen 和Shephard (2010)发现已实现波动率可以依照收益率的正负分解为上行波动率(RV+),下行波动率(RV-),上下行波动率之差描述了跳跃的符号变化(signed jump variation),表明正收益率与负收益率的总体分布是不同的
2.2 因子值计算及优化
资料库为clickhouse使用docker部属在wsl上,语法和SQL几乎一样,以下为数据处理的流程,主要分享优化计算的策略。
和另一篇请求数据使用多线程不同,计算因子值为CPU密集计算,而存取数据则是I/O密集型(发出请求等回应),本次的优化方式为多进程,对比不同切分任务的方式的效率差异
注:因子值有无偏的要求这里暂不做,等回测再
def get_data_from_clickhouse(start_date: datetime, end_date: datetime, client: Client) -> pd.DataFrame:
"""从 ClickHouse 数据库中获取指定时间范围内的数据"""
year = start_date.year
if start_date == end_date:
query = f"""
SELECT *
FROM kline_{year}_rebuild
WHERE toDate(date) = '{start_date.strftime('%Y-%m-%d')}'
"""
else:
query = f"""
SELECT *
FROM kline_{year}_rebuild
WHERE date >= '{start_date.strftime('%Y-%m-%d')}' AND date < '{end_date.strftime('%Y-%m-%d')}'
"""
df = pd.DataFrame(client.query(query).named_results())
return df
def calc_n_return(df: pd.DataFrame, freq: int, name: Optional[int] = None) -> pd.DataFrame:
"""计算每只股票 n 个频率的收益率"""
if not name:
name = freq
df = df.sort_values(['code', 'date']).copy()
df[f'rtn{name}'] = df.groupby('code')['close'].transform(lambda x: x.pct_change(freq))
return df
def get_month_range_data(year: int, start_month: int, end_month: int, client: Client):
start_date = datetime(year, start_month, 1)
end_day = monthrange(year, end_month)[1]
end_date = datetime(year, end_month, end_day)
df = get_data_from_clickhouse(start_date, end_date, client) #将开始日期和结束日期传进封装好的请求数据函数
df = df.set_index('date')
df['date_only'] = df.index.date
return df
def porcess_n_min_kline(df: pd.DataFrame, period: int):
if 'date_only' not in df.columns:
raise ValueError(f"'date_only' column not found in DataFrame")
agg_dict = {
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum',
'turnover': 'sum',
'code': 'last',
'date_only': 'last',
'adj_factor': 'last'
}
df_N_min = (
df.groupby(['code', 'date_only'], group_keys=False)
.resample(f'{str(period)}T')
.agg(agg_dict)
.dropna()
.reset_index()
)
return df_N_min
# ======= 單一任務處理器(給子程序)=======
def process_period_data(year: int, start_month: int, end_month: int , period_list:list[int]):
all_dfs = []
# 連接 ClickHouse
client = clickhouse_connect.get_client(
host=local_host,
port=local_port,
username=user,
password=password,
database='cn_stock_1min_data',
)
logging.info(f'開始 {year} {start_month} {end_month}')
month_df = get_month_range_data(year, start_month, end_month, client) #從資料庫拉資料
client.close()
logging.info(f'拉資料 {year} {start_month} {end_month}')
if 'date_only' not in month_df.columns:
raise ValueError(f"'date_only' column not found in DataFrame for {year}/{start_month}-{end_month}")
for period in period_list:
logging.debug(f'开始处理 {year} {start_month}~{end_month}的{period} 分钟')
df_n_min = porcess_n_min_kline(month_df, period=period)
df_n_min = calc_n_return(df_n_min, freq=1 , name = period) #計算n分K的收益率
mean_df = (
df_n_min.groupby(['code', 'date_only'])[f'rtn{period}']
.mean()
.reset_index()
.rename(columns={f'rtn{period}': f'rtn{period}_mean'})
)
stat_df = (
df_n_min.groupby(['code', 'date_only'])[f'rtn{period}']
.agg(
period_var = 'var',
period_skew= lambda x: skew(x.dropna()),
period_kurt= lambda x: kurtosis(x.dropna()),
period_rv_up= lambda x: x[x > 0].var(),
period_rv_down= lambda x: x[x < 0].var()
)
.reset_index()
)
stat_df = stat_df.rename(columns={
'period_var': f'var_{period}m',
'period_skew': f'skew_{period}m',
'period_kurt': f'kurt_{period}m',
'period_rv_up': f'rv_up_{period}m',
'period_rv_down': f'rv_down_{period}m'
})
inner_merged_df = pd.merge(mean_df, stat_df, on=['code' , 'date_only'], how='left')
all_dfs.append(inner_merged_df)
del df_n_min, mean_df, stat_df ,inner_merged_df
gc.collect()
logging.info(f'完成 {year} {start_month}~{end_month}的{period} 分鐘')
# 合併所有的 DataFrame 並儲存
if all_dfs:
merged_df = reduce(
lambda left, right: pd.merge(left, right, on=['code', 'date_only'], how='left'),
all_dfs
)
output_file = f"{year}_{start_month:02d}_{end_month:02d}.csv"
merged_df.to_csv(output_file, index=False)
logging.info(f"📁 已儲存至 {output_file}\n")
return True
else:
logging.info(f"⚠️ 所有 period 任務都失敗,跳過儲存 {year}/{start_month:02d}-{end_month:02d}")
return False
三种方法
- 三个月一起算
- 拆成三个月,三进程
- 拆成每一天,三进程
以下用第三个方法举例,第一个直接调用process_period_data,第二个改改周期
def each_day_thress_process():
period_list = [1, 5, 10, 15, 30, 60]
processes = 3
monitor_thread = threading.Thread(target=monitor_system, args=(5,), daemon=True)
monitor_thread.start()
client = clickhouse_connect.get_client(
host=local_host,
port=local_port,
username=user,
password=password,
database='cn_stock_1min_data',
)
#先获取1-3月有哪几个交易日,写入任务表内
query = """
SELECT DISTINCT toDate(date) AS date_only
FROM kline_2015_rebuild
WHERE date >= '2015-01-01' AND date < '2015-04-01'
ORDER BY date_only
"""
date_df = pd.DataFrame(client.query(query).named_results())
client.close()
date_list = pd.to_datetime(date_df['date_only']).dt.date.tolist()
all_results = []
#开启多进程
with ProcessPoolExecutor(max_workers=processes) as executor:
futures = {executor.submit(process_day_data, day, period_list): day for day in date_list}
for future in as_completed(futures):
day = futures[future]
try:
result_df = future.result()
if isinstance(result_df, pd.DataFrame):
all_results.append(result_df)
logging.info(f"✅ {day} 处理完成,加入结果")
else:
logging.warning(f"⚠️ {day} 返回不是 DataFrame:{type(result_df)}")
except Exception as e:
logging.error(f"❌ {day} 处理失败: {e}")
结果对比从上到下是第1/2/3个做法
毫无疑问的单进程时间最久,其次三进程以天为单位处理较以月为单位处理快了一点,原因是在计算因子值的时候以月为单位可能会有某个月数据较多的情况,需要处理较久导致整体完成时间延后,但以天为单位可以将大任务拆解成小任务,尽量让所有进程跑满。
结论:
建议将大任务尽量拆解成小任务,尽量跑满CPU所有核心,进程数量设定可以先从核心数-1下手,有余裕再往上加,也可以实时存数据,避免大数据跑到一半出现非预期错误从头来过。
3. 异常值处理
实际清洗过程中会遇到几个状况导致计算值呈现0/nan/inf等异常值,通常来自于以下几个状况
- 涨跌停:若涨跌停没打开,甚至打开时间很少,整天会呈现同一个数字,导致前面计算的收益率呈现0,其他统计量也跟着算不出来,本研报有提到涨跌停对于有效性影响很大,采取剃除的方式处理
- 停牌:数据上会呈现同个数字和涨跌停状况相似
- 整天都是同一个收盘价:状况极少,和涨跌状况相似但需要人工排查,一年约有10笔左右
处理方法采用和掘金终端获取当天涨跌停/停牌的状况,填充nan
处理后回传对应的日期和股票代号在原资料填充nan
def process_err_data(err_tuple: Tuple[str, str] ) -> Tuple[str, str]:
global client # 使用进程内全域 client
err_date, err_code = err_tuple
try:
# 查 GM symbol
gm_code = convert_code(err_code, target='gm')#掘金的股票编号和本地不同,转换成掘金样式
stocks_info = get_history_symbol(
symbol=gm_code,
start_date=err_date,
end_date=err_date,
df=True
)
# 查本地资料
local_code = convert_code(err_code, target='local')
current_date_data = get_data_from_clickhouse(
pd.to_datetime(err_date),
pd.to_datetime(err_date),
local_code
)
current_date_data = current_date_data[cols_to_check].round(2)#处理数据精度问题
if current_date_data is None or current_date_data.empty:
return ('ERROR', f"{err_date},{err_code} - No local data")
# 邏輯條件判斷
#停牌
if stocks_info.iloc[0]['is_suspended']:
return err_date, err_code
#涨停
elif (current_date_data[cols_to_check] >= round(stocks_info.iloc[0]['upper_limit'],2)).any().any():
return err_date, err_code
#跌停
elif (current_date_data[cols_to_check] <= round(stocks_info.iloc[0]['lower_limit'],2)).any().any():
return err_date, err_code
else:
# 如果没有命终任何条件,回傳 None
return ('SKIP' , "SKIP")
except Exception as e:
print(f"⚠️ 处理 {err_date}, {err_code} 时发生错误: {e}")
return ('ERROR', f"{err_date},{err_code}")
#主程式
def main(folder ,file_name):
file_path = os.path.join(folder , file_name)
error_pairs ='error_pairs.log'
df=pd.read_csv(file_path)
df = df.astype({'code': 'string'})
#建立过滤条件:
mask_bad = (
df.isna().any(axis=1) | # 含 NaN
df.isin([np.inf, -np.inf]).any(axis=1) | # 含 inf 或 -inf
df.eq(0).all(axis=1) # 全部為 0
)
# 保留异常资料
df_bad_rows = df[mask_bad]
bad_pairs = list(zip(df_bad_rows['date_only'], df_bad_rows['code']))
with ProcessPoolExecutor(max_workers=10, initializer=init_clickhouse_client) as executor:
futures = executor.map(process_err_data, bad_pairs)
results = list(tqdm(futures, total=len(bad_pairs), desc=f"处理{file_name}错误资料"))
# 分类整理
exclude_pairs = [r for r in results if r[0] not in ('SKIP', 'ERROR')]
error_pairs = [r[1] for r in results if r[0] == 'ERROR']
#转成set更高效
exclude_set = set(tuple(item) for item in exclude_pairs)
# 建立布林遮罩
mask_exclude = df.apply(
lambda row: (row['date_only'].strftime('%Y-%m-%d'), row['code']) in exclude_set, axis=1
)
cols_to_nan = [col for col in df.columns if col not in ['code', 'date_only']]
# 填空
df.loc[mask_exclude, cols_to_nan] = np.nan
#清理後二次檢查
df_bad_after_clean = df[mask_bad].copy()
protected_cols = ['code', 'date_only']
other_cols = [col for col in df_bad_after_clean.columns if col not in protected_cols]
# 筛掉那些 "非 protected 欄位全部都是 NaN" 的 row
mask_all_nan = df_bad_after_clean[other_cols].isna().all(axis=1)
# 保留不是全NaN 的 资料
df_cleaned = df_bad_after_clean[~mask_all_nan].copy()
df.to_csv(file_path , index=False)
basename, ext = os.path.splitext(file_name) # 拆成 '2015_1' 和 '.csv'
err_filename = f"{basename}_err{ext}" # 組成 '2015_1_err.csv'
if len(df_cleaned) > 0:
df_cleaned.to_csv(os.path.join(folder, err_filename), index=False)
4. 分布检验
要预测的前提是两数据的分布必须相同或是近似,这里对于分钟级的数据做[1, 5, 10, 15, 30, 60]分钟的融合,再去计算平均收益率/峰度/变异数等等的统计量后随机配对作Cramér–von Mises criterion Test(python 用scipy.stats调用cramervonmises_2samp),用于检验两数据源是否为相似分布。
这里数据太杂乱就不放了,分享几件观察到有趣的事情
- 不只是有报仇率会出现相同分布,也可以看到有峰度分布的相同,有兴趣的读者可以尝试其他数据像是开盘价/成交量等等的数据。
- 原始数据/boxcox变换/zscore这三种类型的数据,即使是频率接近的数据经过zscore的效果显著好很多
- 不少检定结果pvalue接近1,表示两种数据的分布状态非常接进
def Cramer_von_Mises_test(data1, data2, col1, col2, label, method):
result = cramervonmises_2samp(data1, data2)
if result.pvalue > 0.05:
#print(f"✅ {label} {col1} vs {col2} → 分布无显著差异 (p = {result.pvalue:.4f})")
record = {
"period": label,
"col1": col1,
"col2": col2,
"method": method,
"test": "Cramer-von-Mises",
"pvalue": result.pvalue
}
if method == "origin":
origin_pass_Cramer_von_Mises_test.append(record)
elif method == "boxcox":
boxcox_pass_Cramer_von_Mises_test.append(record)
elif method == "zscore":
zscore_pass_Cramer_von_Mises_test.append(record)