市场状态感知动量因子的完整代码实现及结果对比分析
前言
前两篇文章我们讨论了市场状态感知因子设计的思路和流程框架,本篇我们继续尝试市场状态感知的动量因子的完整实现方法,并附上详细实现代码。
市场状态感知是有意义的探索。本次实验分别以 CSI300、CSI500、CSI1000 股票池为例,使用三组动量-收益窗口配对进行对比研究:
- 5日动量 - 1日收益
- 20日动量 - 5日收益
- 60日动量 - 10日收益
实验结果发现:
- 在所有组别下,我们设计的市场状态感知动量因子均优于原始动量因子。
- 沪深300和中证500上的表现更优。
- 中证1000组提升效果较小。
可能的解释:中证1000股票的动量效应受整体市场走势影响较小。经验上看,小盘股走势往往独立于整体市场,其动量主要来源于个股自身动量,受市场动量影响较小。
说明:
- 除动量因子外,后续将继续对其他市场状态感知因子(如市场波动率、量价因子等)进行探索。
- 理论上基于时序构建的个股因子同样能够应用于指数构建市场因子,进而通过回归系数或者互信息等方法刻画个股因子和市场因子间的关系,进而构造能够感知市场状态的因子。
- 更进一步,我们可以将 alpha101、alpha158 等因子引入市场状态的感知,进一步测试效果。
- 市场状态的刻画的主要思路有两个:一是先刻画市场状态,如构建统一的市场状态因子(合成后的市场因子),再将每个个股因子对市场状态因子进行回归或互信息分析,得到新的状态感知因。但是这种方法可能带来因子间相关性的提升。二是个股因子与其对应的市场状态因子配对并进行回归等刻画操作,每个因子都具备其独特内含逻辑的市场状态感知。
- 本研究仅是个人兴趣探索,非科班研究难免错漏,欢迎指正交流。
状态感知动量因子:方法与实证
- 目标:在截面上构建能“感知市场状态”的动量因子,并验证其在不同股票池、不同动量/预测窗口下的稳健性与相对原始动量的提升效果。
- 核心思路:
- 个股线性动量与指数线性动量满足 CAPM 同形关系:
S_i ≈ a + β · S_m + u
。 - 构造市场状态
M_t
(对指数动量做时间 z-score),并用滚动回归估计β
。 - 因子融合:
StateAware = z(U) + λ · z(β · M_t)
,其中U = S_i - (a + β · S_m)
。
- 个股线性动量与指数线性动量满足 CAPM 同形关系:
- 数据:Qlib CN 日频数据;默认
provider_uri
指向本地qlib_data/cn_data
。 - 指标:RankIC、NW 修正 t 值、分组多空(Top-Bottom)及累计收益曲线。
分组对比分析(Raw vs State 的 RankIC 提升)
- 多股票池 × 多窗口网格:
- 股票池与对应指数:
csi300 ↔ SH000300
,csi500 ↔ SH000905
,csi1000 ↔ SH000852
- 动量/预测窗口组合(W/H):
W5-H1
、W20-H5
、W260-H10
- 股票池与对应指数:
- 对比口径与输出:
- 对每个(股票池,W,H)组合,分别计算 Raw 与 State 的
RankIC_mean
。 - 生成 RankIC 对比表与可视化,并给出提升指标:
- 提升数额:
Delta = IC_state - IC_raw
- 绝对提升数额:
Delta_abs = |IC_state| - |IC_raw|
- 相对提升(%):
Pct = Delta / IC_raw
- 绝对相对提升(%):
Pct_abs = Delta_abs / |IC_raw|
- 提升数额:
- 对每个(股票池,W,H)组合,分别计算 Raw 与 State 的
- 可视化:
- 柱状对比:Raw vs State 的
RankIC_mean
(按组合分组) - 提升柱状图:
Delta
(数额)与Pct / Pct_abs
(百分比)
- 柱状对比:Raw vs State 的
实验流程(快速上手)
- 初始化 Qlib 与全局
CONFIG
(数据路径、时间范围、默认窗口等),确保指数与股票池属于同一region="cn"
。 - 运行单次实验管线:拉取个股/指数收盘价 → 构造
S_i
、S_m
与M_t
→ 滚动估计α/β
→ 生成StateAware
。 - 运行“批量实验封装函数”与“批量网格循环”单元:一次性得到
summary_all
汇总表和details
细节。 - 运行“RankIC 对比与提升可视化”单元:输出 Raw vs State 的 RankIC 对比表与三张图(对比/提升数额/提升百分比)。
关键参数(建议)
mom_window (W)
:动量窗口;常用5/20/60/260
。label_horizon (H)
:预测窗口;建议H ≤ W
,日频上常用W20→H5
、W60→H20
。beta_window
、state_window
:建议较长(如 252)以稳定估计。lambda
:融合权重(默认 1.0);可网格搜索微调。nw_lags
:NW 校正滞后阶,日频常用 5。n_groups
:分组回测组合数,默认 10。
注意事项
- 全流程使用
shift(1)
(或含跳过期的shift
)防止前视;指数状态M_t
也做shift(1)
。 - 个股收盘价做近 10 日有效性过滤,替换无穷为缺失值,并与指数序列对齐后再计算。
- 长窗口与大股票池运行时间较长,可优先试运行少量组合验证流程。
0. 运行前准备
- 安装 Qlib:
pip install pyqlib --upgrade
2:初始化与全局配置
- 目标:设置 Qlib 的数据路径、研究范围与核心参数,并完成初始化。
- 关键配置:
provider_uri
:本地 CN 数据目录;默认从环境变量读取,否则回退至项目内路径。universe
:股票池(如csi300
/csi1000
)。market_index
:用于刻画市场动量/状态的指数代码(如000300.SH
或SH000852
)。mom_window
:动量窗口 W,控制S_i
与S_m_idx
的回看长度。beta_window
、state_window
:用于 β 估计与状态 z-score 的时间窗口,通常取较长以稳定估计。label_horizon
:未来收益的预测视窗 H。- 其他:是否行业中性、IC 滚动窗口、NW 滞后阶、分组数等。
- 结果:调用
qlib.init
完成初始化,控制台输出数据路径确认信息。 - 注意:
- Windows 下路径分隔符与转义,推荐通过环境变量或
os.path.expanduser
/os.path.join
处理。 - 指数与股票池需来自同一区域(
region="cn"
),以保证取数一致性。
- Windows 下路径分隔符与转义,推荐通过环境变量或
# 1) 初始化与参数
import os, warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import qlib
from qlib.data import D
CONFIG = {
"provider_uri": os.environ.get("QLIB_PROVIDER_URI", os.path.expanduser("G:\workspace\qlibtutor\qlib_data\cn_data")),
"universe": "csi300", # 个股池(示例 csi300)
"market_index": "SH000300", # 市场指数(沪深300)
"freq": "day",
"start_time": "2020-01-01",
"end_time": "2025-08-31",
"mom_window": 20, # 20日动量
"beta_window": 252, # 滚动回归窗口
"state_window": 252, # 市场状态 z-score 的时间窗口
"label_horizon": 5, # 未来5日收益
"lambda": 1, # 融合权重
"industry_neutral": False, # 行业中性可选
"n_groups": 10,
"ic_roll_window": 60,
"nw_lags": 5,
}
qlib.init(provider_uri=CONFIG["provider_uri"], region="cn")
print("Qlib initialized:", CONFIG["provider_uri"])
3:加载股票池与指数行情
- 目标:
- 从 Qlib 读取股票池内个股的收盘价,构造成日期×股票的矩阵
close
。 - 同时读取指定市场指数(如
000300.SH
/SH000852
)的收盘价序列close_idx
。
- 从 Qlib 读取股票池内个股的收盘价,构造成日期×股票的矩阵
- 过程:
D.instruments(universe)
获取股票池代码集合。D.features(..., ["$close"])
拉取收盘价,透视为宽表并按日期排序。- 对个股收盘价进行有限有效性过滤(近 10 日有成交),并替换无穷值为
NaN
。 - 对指数同样拉取收盘价并取出单列序列。
- 输出:
close
:个股收盘价矩阵(日期×股票)。close_idx
:指数收盘价(Series)。
- 注意:
- 个股与指数的日期覆盖需尽量一致;后续会通过对齐处理不一致部分。
- 市场指数代码来自
CONFIG["market_index"]
。
# 2) 加载股票池与个股收盘价
instruments = D.instruments(CONFIG["universe"])
fields = ["$close"]
df_close = D.features(instruments, fields, start_time=CONFIG["start_time"], end_time=CONFIG["end_time"], freq=CONFIG["freq"])
tmp = df_close.reset_index()
close = tmp.pivot(index="datetime", columns="instrument", values="$close").sort_index()
close = close.replace([np.inf, -np.inf], np.nan)
valid = close.notna().rolling(10).sum() > 0
close = close.where(valid)
print("Stock close shape:", close.shape)
# 加载指数收盘价(SH000300)
idx_raw = D.features([CONFIG["market_index"]], ["$close"],start_time=CONFIG["start_time"], end_time=CONFIG["end_time"],freq=CONFIG["freq"]).reset_index()
# 对指数数据进行透视与清洗处理
# 1. 将指数数据从长格式转为宽格式(日期×指数代码)
# 2. 按日期排序确保时间序列连续性
# 3. 替换无穷值为NaN以避免计算异常
idx_pivot = (idx_raw.pivot(index="datetime", columns="instrument", values="$close").sort_index())
close_idx = idx_pivot[CONFIG["market_index"]].replace([np.inf, -np.inf], np.nan)
print("Index close (SH000300) points:", close_idx.shape)
4:构建个股与指数的 W 日动量与市场状态
- 目标:
- 计算个股
S_i
与指数S_m_idx
的 W 日动量(此处 W=20/5,取决于CONFIG["mom_window"]
)。 - 基于指数动量构造市场状态
M_t
(时间维度 z-score)。
- 计算个股
- 关键防漏:动量按
t-1
与t-1-W
的收盘价计算,避免使用当期或未来数据。 - 过程:
S_i = close.shift(1)/close.shift(1+W) - 1
。S_m_idx
同理,但以指数收盘序列close_idx
计算。M_t = z_t(S_m_idx)
:用state_window
做滚动均值/标准差并shift(1)
,得到状态的时间 z-score。
- 输出:
S_i
:个股 W 日动量矩阵。S_m_idx
:指数 W 日动量(Series)。M_t
:市场状态(Series)。
- 注意:
state_window
与beta_window
通常取较长窗口以稳定估计;可按数据频率与样本量调参。
# 3) 构建 20 日动量信号(个股 S_i 与 指数 S_m)
W = CONFIG["mom_window"]
# 个股 20D 动量:使用 t-1 与 t-1-W 收盘(避免未来)
S_i = (close.shift(1) / close.shift(1+W) - 1.0)
# 指数 20D 动量(市场动量 S_m):同样使用 t-1 与 t-1-W 收盘
S_m_idx = (close_idx.shift(1) / close_idx.shift(1+W) - 1.0)
# 市场状态 M_t:对 S_m_idx 做时间维度 z-score(以 state_window 滚动,且 shift(1) 防未来)
roll_mean = S_m_idx.rolling(CONFIG["state_window"]).mean().shift(1)
roll_std = S_m_idx.rolling(CONFIG["state_window"]).std().shift(1)
M_t = (S_m_idx - roll_mean) / roll_std
print("Built stock S_i and index-based S_m (SH000300) and M_t.")
5:滚动回归估计 α/β(个股动量对指数动量)
- 目标:对每只股票,使用滚动窗口在时间维度上回归
S_i = a_i + b_i * S_m_idx + u_i
,得到每日的截距α
与斜率β
。 - 输入:
S_i[col]
:个股的 W 日动量时间序列。S_m_idx
:指数(如沪深300)的 W 日动量时间序列。CONFIG["beta_window"]
:滚动窗口长度(默认 252)。
- 实现:
- 自实现
rolling_beta_alpha
,用滚动均值/方差公式计算协方差与方差,从而得到β=cov/var
与α=E[y]-βE[x]
。 - 对每只股票独立计算,填入
alpha
与beta
两个 DataFrame。
- 自实现
- 输出:
alpha
、beta
:与S_i
维度一致,索引为日期、列为股票。
- 注意:
- 回归使用的
S_i
与S_m_idx
均已通过shift(1)
避免未来信息泄漏(在动量构造处),本处直接使用即可。 - 对样本不足窗口期的起始日期,结果为
NaN
属正常现象。
- 回归使用的
# 4) 滚动回归:S_i = a_i + b_i * S_m_idx + u_i
def rolling_beta_alpha(y: pd.Series, x: pd.Series, window: int):
mean_x = x.rolling(window).mean()
mean_y = y.rolling(window).mean()
mean_xy = (x * y).rolling(window).mean()
mean_x2 = (x * x).rolling(window).mean()
cov_xy = mean_xy - mean_x * mean_y
var_x = mean_x2 - mean_x ** 2
beta = cov_xy / var_x
alpha = mean_y - beta * mean_x
return alpha, beta
alpha = pd.DataFrame(index=S_i.index, columns=S_i.columns, dtype=float)
beta = pd.DataFrame(index=S_i.index, columns=S_i.columns, dtype=float)
for col in S_i.columns:
a_i, b_i = rolling_beta_alpha(S_i[col], S_m_idx, window=CONFIG["beta_window"])
alpha[col] = a_i
beta[col] = b_i
print("Rolling alpha/beta estimated against 000300.SH momentum.")
6:残差与状态分量、截面标准化与融合
- 目标:基于回归结果构造个股的残差动量
U
与状态分量F_state
,并与原动量进行截面标准化后按权重融合,得到状态感知因子。 - 输入:
S_i
:个股 W 日动量矩阵(日期×股票)。alpha
、beta
:由滚动回归得到的截距与 β。S_m_idx
:指数的 W 日动量(Series)。M_t
:指数动量的时间 z-score(市场状态)。CONFIG["lambda"]
:融合权重(默认 0.6)。
- 过程:
- 将
S_m_idx
与M_t
依据日期对齐到S_i
,得到S_m_aligned
、M_aligned
。 - 残差动量:
U = S_i - (alpha + beta * S_m_aligned)
。 - 状态分量:
F_state = beta * M_aligned
。 - 对
S_i
、U
、F_state
做逐日截面 z-score(消除横截面尺度差异)。 - 融合:
F_raw = z(S_i)
;F_stateaware = z(U) + lambda * z(F_state)
。
- 将
- 输出:
F_raw
:原始因子的截面标准化版本。F_stateaware
:状态感知融合因子。
- 注意:
- 截面标准化在每日横截面上进行;当某日标准差为 0 会产生
NaN
,代码已做保护。 - 日期对齐是关键,避免广播维度错误与“前视”。
lambda
可根据验证结果调参。
- 截面标准化在每日横截面上进行;当某日标准差为 0 会产生
# 5) 构造残差与状态分量,并融合
# 与 S_i 的日期索引对齐,避免广播形状不匹配
S_m_aligned = S_m_idx.reindex(S_i.index)
M_aligned = M_t.reindex(S_i.index)
# 残差动量:U_i(t) = S_i(t) - a_hat - b_hat * S_m_idx(t)
U = S_i - (alpha + beta.mul(S_m_aligned, axis=0))
# 状态分量:F_state_i(t) = b_hat_i(t) * M_t
F_state = beta.mul(M_aligned, axis=0)
def cs_zscore(df):
mean = df.mean(axis=1)
std = df.std(axis=1)
return (df.sub(mean, axis=0)).div(std.replace(0, np.nan), axis=0)
lambda_ = CONFIG["lambda"]
F_raw = cs_zscore(S_i) # 原始 20D 动量(截面 z)
F_stateaware = cs_zscore(U) + lambda_ * cs_zscore(F_state) # 状态感知融合
print("Built raw and state-aware factors (index-based).")
7:行业中性化(可选)
- 目标:提供一个占位的行业中性化接口,用于剔除行业共同效应的影响。
- 原理:通常做法是逐日将因子对行业哑元做截面回归,取残差作为行业中性的因子暴露。
- 当前实现:
- 若
CONFIG["industry_neutral"]
为False
,直接返回原因子。 - 若为
True
,当前仅打印提示;用户可在此接入行业分类并实现逐日回归(如 OLS/Huber)。
- 若
- 作用:减少行业轮动对因子效果评估的干扰,使比较更聚焦于风格暴露本身。
# 6) 可选:行业中性化(占位实现)
def industry_neutralize_factor(F: pd.DataFrame):
if not CONFIG["industry_neutral"]:
return F
# TODO: 用户在此对接行业哑元矩阵,逐日回归取残差
print("industry_neutral=True 但未提供行业数据,暂跳过。")
return F
F_raw_neu = industry_neutralize_factor(F_raw)
F_sta_neu = industry_neutralize_factor(F_stateaware)
8:标签构造与对齐
- 目标:生成未来
H
日收益标签并与因子矩阵严格按日期对齐。 - 输入:
close
:个股收盘价矩阵(日期×股票)。CONFIG["label_horizon"]
(H
):未来收益的预测视窗,默认 5 日。F_raw_neu
、F_sta_neu
:可能已做行业中性的 Raw/State-aware 因子。
- 过程:
- 计算
label5 = close.shift(-H)/close - 1
,确保是“向前看的”收益。 - 取三者共同的日期索引
common
,得到F1
、F2
与Y
的严格对齐版本。
- 计算
- 输出:打印标签维度,返回对齐后的因子与标签,用于评估阶段。
- 注意:任何对齐不严谨都会造成“前视偏差”。本单元统一以交集日期裁剪,确保有效比较。
# 7) 未来 5 日收益标签(close_{t+5}/close_t - 1)
H = CONFIG["label_horizon"]
label5 = (close.shift(-H) / close - 1.0)
# 对齐
F1 = F_raw_neu.copy()
F2 = F_sta_neu.copy()
common = F1.index.intersection(label5.index)
F1 = F1.loc[common]
F2 = F2.loc[common]
Y = label5.loc[common]
print("Label aligned:", Y.shape)
9:评估工具函数
cs_spearman_ic
:- 逐日期对因子与未来收益做截面 Spearman 相关,得到每日 RankIC。
- 对每个交易日,剔除
NaN
,若样本太少(<5)则记为NaN
。
nw_tstat
:- 对时间序列
x
(如 IC 序列)做 Newey-West 方差校正,考虑自相关(默认滞后 5)。 - 返回
(均值, 标准误, t 值)
,t 值用于显著性判断。
- 对时间序列
group_backtest
:- 按因子截面分位分组(默认 10 组),计算每组下一期的平均收益。
- 记录最高组与最低组的收益以及两者差(多空);构造累计收益曲线。
- 用途:为后续的效果评估与可视化提供独立、可复用的工具。
# 8) 评估函数
from scipy import stats
def cs_spearman_ic(factor_df: pd.DataFrame, label_df: pd.DataFrame):
ics = []
for dt in factor_df.index:
x = factor_df.loc[dt]
y = label_df.loc[dt]
mask = x.notna() & y.notna()
if mask.sum() < 5:
ics.append(np.nan)
else:
rho, _ = stats.spearmanr(x[mask], y[mask])
ics.append(rho)
return pd.Series(ics, index=factor_df.index, name="RankIC")
def nw_tstat(x: pd.Series, lags: int = 5):
x = x.dropna()
if len(x) < 5:
return np.nan, np.nan, np.nan
mean = x.mean(); T = len(x)
eps = (x - mean).values
gamma0 = np.dot(eps, eps) / T
var_nw = gamma0
for L in range(1, min(lags, T-1) + 1):
w = 1 - L / (lags + 1)
cov = np.dot(eps[L:], eps[:-L]) / T
var_nw += 2 * w * cov
se = np.sqrt(var_nw / T)
tval = mean / se if se > 0 else np.nan
return mean, se, tval
10:指标评估与汇总
- 目标:用 RankIC 与 Newey-West 校正的 t 统计量评估因子效果,并进行分组回测对比。
- 输入:
F1
、F2
:分别为 Raw 与 State-aware 因子的截面 z 分数矩阵(股票×日期)。Y
:未来收益标签(与因子对齐)。CONFIG["nw_lags"]
:NW 方差校正的自相关滞后阶数,默认 5。CONFIG["n_groups"]
:分组回测的组合数,默认 10。
- 过程:
- 逐日计算因子与收益的 Spearman 相关,得到
ic_raw
与ic_sta
。 - 使用
nw_tstat
对 IC 序列进行 Newey-West 校正,计算均值、标准误与 t 值。 - 通过
group_backtest
进行分组多空回测,得到多空单期收益及其累计曲线。 - 生成
summary
,包含 RankIC 的均值/标准差/IR、NW t 值,以及多空收益的均值/标准差/IR。
- 逐日计算因子与收益的 Spearman 相关,得到
- 输出:控制台打印 NW 修正后的 IC 统计;返回
summary
表以便在 Notebook 中展示。 - 注意:
- 负的 RankIC 均值意味着反向排序可能更优,可在后续实盘/研究中考虑取负或改造信号。
- NW 校正用于处理序列自相关,避免常规 t 值偏乐观。
# 9) 评估与对比
ic_raw = cs_spearman_ic(F1, Y)
ic_sta = cs_spearman_ic(F2, Y)
mean_raw, se_raw, t_raw = nw_tstat(ic_raw, lags=CONFIG["nw_lags"])
mean_sta, se_sta, t_sta = nw_tstat(ic_sta, lags=CONFIG["nw_lags"])
print("RankIC (NW-corrected):")
print(f"Raw : mean={mean_raw:.4f}, se={se_raw:.4f}, t={t_raw:.2f}")
print(f"State : mean={mean_sta:.4f}, se={se_sta:.4f}, t={t_sta:.2f}")
summary = pd.DataFrame({
"RankIC_mean": [ic_raw.mean(), ic_sta.mean()],
"RankIC_std": [ic_raw.std(), ic_sta.std()],
"RankIC_IR": [ic_raw.mean() / ic_raw.std(), ic_sta.mean() / ic_sta.std()],
"NW_t": [t_raw, t_sta],
}, index=["Raw20D", "StateAware20D"])
summary
RankIC (NW-corrected):
Raw : mean=-0.0140, se=0.0103, t=-1.36
State : mean=-0.0179, se=0.0121, t=-1.48
11:可视化结果
可视化两类因子的RankIC时序表现和滚动均值对比。
# 10) 可视化(本地渲染)
import matplotlib.pyplot as plt
roll_w = CONFIG["ic_roll_window"]
plt.figure()
plt.plot(ic_raw.index, ic_raw.values, label="Raw 20D RankIC")
plt.plot(ic_sta.index, ic_sta.values, label="State-aware 20D RankIC", alpha=0.7)
plt.plot(ic_raw.rolling(roll_w).mean().index, ic_raw.rolling(roll_w).mean().values, linestyle="--", label="Raw 60D roll")
plt.plot(ic_sta.rolling(roll_w).mean().index, ic_sta.rolling(roll_w).mean().values, linestyle="--", label="State 60D roll")
plt.title("RankIC over time"); plt.xlabel("Date"); plt.ylabel("IC"); plt.legend(); plt.show()
12:分组批量对比实验
单次批量实验封装:对给定股票池/指数与窗口 W/H,计算 Raw 与 State 因子并评估。
作用:
- 拉取个股/指数收盘价 → 构造个股动量 S_i 与指数动量 S_m、市场状态 M_t
- 逐股滚动回归得到 α/β;构造残差项 U 与状态项 β·M_t,并做截面标准化与融合
- 生成未来 H 日收益标签,与因子严格对齐后,计算 RankIC、NW t、分组多空等指标
参数:
universe
: 股票池名称,如 “csi300”/“csi500”/“csi1000”market_index
: 对应市场指数代码,如 “SH000300”/“SH000905”/“SH000852”W
: 动量窗口(天)H
: 预测窗口(天),建议 H ≤ Wstart_time/end_time/freq
: 取数范围与频率(默认 “day”)beta_window/state_window
: 回归与状态估计窗口(默认 252)lambda_
: 状态分量融合权重industry_neutral
: 是否行业中性(当前为占位,False 时不处理)n_groups
: 分组回测的组合数(默认 10)nw_lags
: Newey–West 校正滞后阶(默认 5)
返回:
summary
: DataFrame(两行:Raw/State),列包含:- RankIC_mean/RankIC_std/RankIC_IR/NW_t/LS_mean/LS_std/LS_IR
details
: dict,键:- “ic_raw”/“ic_sta”(Series)
- “bt_raw”/“bt_sta”(dict,含 long/short/ls 及累计曲线)
重要说明:
- 防前视:动量与状态均使用 shift(1),标签使用 shift(-H)
- 有效性过滤:近 10 日无效价置 NaN;指数与个股严格按日期对齐后再评估
- 性能:大股票池与长窗口计算耗时更长,可先抽样或缩小网格验证
# 批量实验:单次运行封装
# 说明:依赖前文的 qlib.init、以及后文已定义的 cs_spearman_ic / nw_tstat / group_backtest
import pandas as pd
import numpy as np
from qlib.data import D
def run_state_aware_experiment(
universe: str,
market_index: str,
W: int,
H: int,
start_time: str,
end_time: str,
freq: str = "day",
beta_window: int = 252,
state_window: int = 252,
lambda_: float = 1.0,
industry_neutral: bool = False,
n_groups: int = 10,
nw_lags: int = 5,
):
# 1) 拉数
instruments = D.instruments(universe)
df_close = D.features(instruments, ["$close"], start_time=start_time, end_time=end_time, freq=freq)
tmp = df_close.reset_index()
close = tmp.pivot(index="datetime", columns="instrument", values="$close").sort_index()
close = close.replace([np.inf, -np.inf], np.nan)
valid = close.notna().rolling(10).sum() > 0
close = close.where(valid)
idx_raw = D.features([market_index], ["$close"], start_time=start_time, end_time=end_time, freq=freq).reset_index()
idx_pivot = idx_raw.pivot(index="datetime", columns="instrument", values="$close").sort_index()
close_idx = idx_pivot[market_index].replace([np.inf, -np.inf], np.nan)
# 2) 信号与状态
S_i = close.shift(1) / close.shift(1 + W) - 1.0
S_m_idx = close_idx.shift(1) / close_idx.shift(1 + W) - 1.0
roll_mean = S_m_idx.rolling(state_window).mean().shift(1)
roll_std = S_m_idx.rolling(state_window).std().shift(1)
M_t = (S_m_idx - roll_mean) / roll_std
# 3) 滚动回归得到 alpha, beta(逐列)
def rolling_beta_alpha(y: pd.Series, x: pd.Series, window: int):
mean_x = x.rolling(window).mean()
mean_y = y.rolling(window).mean()
mean_xy = (x * y).rolling(window).mean()
mean_x2 = (x * x).rolling(window).mean()
cov_xy = mean_xy - mean_x * mean_y
var_x = mean_x2 - mean_x ** 2
beta = cov_xy / var_x
alpha = mean_y - beta * mean_x
return alpha, beta
alpha = pd.DataFrame(index=S_i.index, columns=S_i.columns, dtype=float)
beta = pd.DataFrame(index=S_i.index, columns=S_i.columns, dtype=float)
for col in S_i.columns:
a_i, b_i = rolling_beta_alpha(S_i[col], S_m_idx, window=beta_window)
alpha[col] = a_i
beta[col] = b_i
# 4) 残差项与状态项 + 截面标准化
S_m_aligned = S_m_idx.reindex(S_i.index)
M_aligned = M_t.reindex(S_i.index)
U = S_i - (alpha + beta.mul(S_m_aligned, axis=0))
F_state = beta.mul(M_aligned, axis=0)
def cs_zscore(df: pd.DataFrame):
mean = df.mean(axis=1)
std = df.std(axis=1)
return (df.sub(mean, axis=0)).div(std.replace(0, np.nan), axis=0)
F_raw = cs_zscore(S_i)
F_stateaware = cs_zscore(U) + lambda_ * cs_zscore(F_state)
# 可选:行业中性(占位)
if industry_neutral:
pass
# 5) 标签与对齐
label = (close.shift(-H) / close - 1.0)
common = F_raw.index.intersection(label.index)
F1 = F_raw.loc[common]; F2 = F_stateaware.loc[common]; Y = label.loc[common]
# 6) 评估
ic_raw = cs_spearman_ic(F1, Y)
ic_sta = cs_spearman_ic(F2, Y)
mean_raw, se_raw, t_raw = nw_tstat(ic_raw, lags=nw_lags)
mean_sta, se_sta, t_sta = nw_tstat(ic_sta, lags=nw_lags)
bt_raw = group_backtest(F1, Y, n_groups=n_groups)
bt_sta = group_backtest(F2, Y, n_groups=n_groups)
rows = [
{
"Universe": universe, "Index": market_index, "W": W, "H": H, "Factor": "Raw",
"RankIC_mean": ic_raw.mean(), "RankIC_std": ic_raw.std(),
"RankIC_IR": ic_raw.mean() / ic_raw.std() if ic_raw.std() > 0 else np.nan,
"NW_t": t_raw,
"LS_mean": bt_raw["ls"].mean(), "LS_std": bt_raw["ls"].std(),
"LS_IR": bt_raw["ls"].mean() / bt_raw["ls"].std() if bt_raw["ls"].std() > 0 else np.nan,
},
{
"Universe": universe, "Index": market_index, "W": W, "H": H, "Factor": "State",
"RankIC_mean": ic_sta.mean(), "RankIC_std": ic_sta.std(),
"RankIC_IR": ic_sta.mean() / ic_sta.std() if ic_sta.std() > 0 else np.nan,
"NW_t": t_sta,
"LS_mean": bt_sta["ls"].mean(), "LS_std": bt_sta["ls"].std(),
"LS_IR": bt_sta["ls"].mean() / bt_sta["ls"].std() if bt_sta["ls"].std() > 0 else np.nan,
},
]
return pd.DataFrame(rows), {"ic_raw": ic_raw, "ic_sta": ic_sta, "bt_raw": bt_raw, "bt_sta": bt_sta}
# 多股票池 × 多窗口 批量跑与汇总
universe_index_map = {
"csi300": "SH000300",
"csi500": "SH000905",
"csi1000": "SH000852",
}
grids = [(5, 1), (20, 5), (60, 10)]
all_summaries = []
details = {} # 如需取出某一组合的 ic / 回测曲线
for uni, idx in universe_index_map.items():
for W, H in grids:
print(f"Running: {uni} / {idx} | W={W}, H={H}")
summ, det = run_state_aware_experiment(
universe=uni, market_index=idx,
W=W, H=H,
start_time=CONFIG["start_time"], end_time=CONFIG["end_time"], freq=CONFIG["freq"],
beta_window=CONFIG["beta_window"], state_window=CONFIG["state_window"],
lambda_=CONFIG["lambda"], industry_neutral=CONFIG["industry_neutral"],
n_groups=CONFIG["n_groups"], nw_lags=CONFIG["nw_lags"],
)
all_summaries.append(summ)
details[(uni, W, H)] = det
summary_all = pd.concat(all_summaries, ignore_index=True)
summary_all = summary_all.sort_values(["Universe", "W", "H", "Factor"]).reset_index(drop=True)
summary_all
RankIC 对比:Raw vs State(提升数额 / 百分比)+ 可视化
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# 1) 汇总表:各组 Raw/State RankIC 及提升
pivot = summary_all.pivot_table(
index=["Universe", "W", "H"],
columns="Factor",
values="RankIC_mean"
).rename(columns={"Raw": "RankIC_raw", "State": "RankIC_state"}).dropna(subset=["RankIC_raw", "RankIC_state"])
comp = pivot.copy()
# 使用绝对值进行对比
comp["RankIC_raw_abs"] = comp["RankIC_raw"].abs()
comp["RankIC_state_abs"] = comp["RankIC_state"].abs()
comp["Delta_abs"] = comp["RankIC_state_abs"] - comp["RankIC_raw_abs"] # |IC| 提升数额
comp["Pct_abs"] = comp["Delta_abs"] / comp["RankIC_raw_abs"].replace(0, np.nan) # |IC| 相对提升
comp = comp.reset_index().sort_values(["Universe", "W", "H"])
cols = ["Universe","W","H","RankIC_raw","RankIC_state","Delta_abs","Pct_abs"]
comp_display = comp[cols]
comp_display
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
Factor | Universe | W | H | RankIC_raw | RankIC_state | Delta_abs | Pct_abs |
---|---|---|---|---|---|---|---|
0 | csi1000 | 5 | 1 | -0.022639 | -0.023503 | 0.000864 | 0.038186 |
1 | csi1000 | 20 | 5 | -0.039254 | -0.040618 | 0.001364 | 0.034741 |
2 | csi1000 | 60 | 10 | -0.048739 | -0.049627 | 0.000887 | 0.018207 |
3 | csi300 | 5 | 1 | -0.012329 | -0.013711 | 0.001382 | 0.112055 |
4 | csi300 | 20 | 5 | -0.013965 | -0.017881 | 0.003915 | 0.280362 |
5 | csi300 | 60 | 10 | -0.023427 | -0.044194 | 0.020768 | 0.886486 |
6 | csi500 | 5 | 1 | -0.015212 | -0.020445 | 0.005233 | 0.343970 |
7 | csi500 | 20 | 5 | -0.022392 | -0.032622 | 0.010230 | 0.456868 |
8 | csi500 | 60 | 10 | -0.032708 | -0.039037 | 0.006329 | 0.193494 |
# 2) 可视化:Raw vs State 的 RankIC 绝对值对比
labels = comp.apply(lambda r: f'{r["Universe"]}-W{r["W"]}H{r["H"]}', axis=1)
x = np.arange(len(labels)); width = 0.38
plt.figure(figsize=(max(8, len(labels)*0.7), 4.6))
plt.bar(x - width/2, comp["RankIC_raw"].abs(), width, label="Raw (|IC|)")
plt.bar(x + width/2, comp["RankIC_state"].abs(), width, label="State (|IC|)")
plt.xticks(x, labels, rotation=45, ha="right")
plt.ylabel("|RankIC_mean|")
plt.title("RankIC :Raw vs State")
plt.legend(); plt.tight_layout(); plt.show()