上篇基于《AI系列研究之一:端到端的动态Alpha模型》理论分析
本篇文章会对其中各部分进行代码研究
大概流程划分为下
数据清洗
目的
- 保证输入数据的质量,剔除不符合要求的样本。
- 减少后续特征工程和模型训练的偏差。
- 过滤掉 ST 和退市股,可避免模型学到噪声或极端异常;空值剔除确保计算指标和归一化不出错。
def get_all_stocks(self, exclude_st=True, exclude_delisted=True):
"""获取所有股票代码(排除ST股票和退市股票)"""
with self.get_connection() as conn:
query = """
SELECT stock_id, stock_name
FROM stock_info
"""
stocks = pd.read_sql_query(query, conn)
if exclude_st:
# 排除名称中包含'ST'的股票
stocks = stocks[~stocks['stock_name'].str.contains('ST', na=False)]
if exclude_delisted:
# 排除名称中包含'退'的退市股票
stocks = stocks[~stocks['stock_name'].str.contains('退', na=False)]
# 排除以'退市'开头的股票
stocks = stocks[~stocks['stock_name'].str.startswith('退市', na=False)]
# 排除以'*'开头的股票(一些交易所使用*标记退市股)
stocks = stocks[~stocks['stock_name'].str.startswith('*', na=False)]
return stocks
def prepare_stock_data(self, stock_id, lookback=10, prediction_days=5):
"""准备单个股票的训练数据"""
# 从数据库获取数据
with self.get_connection() as conn:
# 获取股票价格数据
query = """
SELECT sd.trade_date, sd.open_price, sd.high_price, sd.low_price,
sd.close_price, sd.volume, si.current_market_value
FROM stock_daily sd
JOIN stock_info si ON sd.stock_id = si.stock_id
WHERE sd.stock_id = ?
ORDER BY sd.trade_date
"""
df = pd.read_sql_query(query, conn, params=[stock_id])
if df.empty:
return None
# 确保数据类型正确
df['trade_date'] = pd.to_datetime(df['trade_date'])
numeric_columns = ['open_price', 'high_price', 'low_price', 'close_price', 'volume', 'current_market_value']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# 添加技术指标
df = self.add_features(df)
# 计算未来n天的收益率作为目标变量
df['future_return'] = df['close_price'].pct_change(periods=prediction_days).shift(-prediction_days)
# 删除包含NaN的行
df = df.dropna()
if len(df) < lookback + prediction_days:
return None
# 提取特征和目标
features = df.drop(['trade_date', 'future_return'], axis=1).values
target = df['future_return'].values.reshape(-1, 1)
# 创建时间序列数据
X, y = [], []
for i in range(lookback, len(features)):
X.append(features[i-lookback:i])
y.append(target[i])
if not X:
return None
X = np.array(X)
y = np.array(y)
# 将3D数据重塑为2D (样本数, 特征数*lookback)
X = X.reshape(X.shape[0], -1)
return X, y
特征工程
技术指标和因子设计
- 收益率变化 price_change、振幅比 price_range、5 日标准差 price_std
- 均线 ma5、ma10;RSI 相对强弱指标
- 成交量特征:volume_change、volume_ma5、volume_ratio
- 市值变化 market_value_change、价量比 price_volume_ratio
- 五个自定义 Alpha 因子 alpha1~alpha5,如振幅比、成交量与价格变化比等
滚动填充与截面对齐
- 先向前填充再向后填充,最后置零,保证无空值。
- 最终输出:宽表格,每行是一天的全量特征;再按 lookback 切成时序样本。
def add_features(self, df):
"""添加技术指标和特征"""
# 价格特征
df['price_change'] = df['close_price'].pct_change()
df['price_range'] = (df['high_price'] - df['low_price']) / df['close_price']
df['price_std'] = df['close_price'].rolling(window=5).std()
# 移动平均线
df['ma5'] = df['close_price'].rolling(window=5).mean()
df['ma10'] = df['close_price'].rolling(window=10).mean()
# 相对强弱指标 (RSI)
delta = df['close_price'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# 成交量特征
df['volume_change'] = df['volume'].pct_change()
df['volume_ma5'] = df['volume'].rolling(window=5).mean()
df['volume_ratio'] = df['volume'] / df['volume_ma5']
# 市值特征
df['market_value_change'] = df['current_market_value'].pct_change()
# 价格与成交量关系
df['price_volume_ratio'] = df['close_price'] / df['volume']
# Alpha因子
# Alpha1: 收盘价与开盘价的差值除以振幅
df['alpha1'] = (df['close_price'] - df['open_price']) / ((df['high_price'] - df['low_price']) + 1e-5)
# Alpha2: 收盘价与前一天收盘价的差值
df['alpha2'] = df['close_price'].diff()
# Alpha3: 收盘价与5日均线的差值
df['alpha3'] = df['close_price'] - df['ma5']
# Alpha4: 成交量变化率与价格变化率的比值
df['alpha4'] = df['volume_change'] / (df['price_change'] + 1e-5)
# Alpha5: RSI与价格变化的乘积
df['alpha5'] = df['rsi'] * df['price_change']
# 填充NaN值
df = df.fillna(method='bfill').fillna(method='ffill').fillna(0)
return df
模型定义
网络结构
- 输入层:input_dim 维度 = 特征数 × lookback。
- 两层全连接:256 → 128 单元,He 初始化,BatchNorm + Dropout(0.2)。
- “Alpha 因子层”:32 单元、tanh 激活,并加 L1/L2 正则化,保证输出不全为0。
- 最终输出层:1 个神经元,直接回归未来收益率。
自定义损失函数
- IC 损失 (ic_loss):最大化预测值与真实值的皮尔逊相关系数。
- CCC 损失 (ccc_loss):最大化一致性相关系数,兼顾偏移与相关性。
- 默认 MSE:均方误差。
为什么要这样设计?
- 中间的 Alpha 因子层可视为可解释因子,tanh 增强表达;
- 多种损失函数分别关注不同目标:(MSE)最小绝对偏差;(IC/CCC)提高排序能力与一致性;
- 正则化和 BatchNorm 控制过拟合、加快收敛。
# 自定义IC损失函数 (Information Coefficient Loss)
def ic_loss(y_true, y_pred):
"""
计算预测值和真实值之间的相关系数的负值
IC越高越好,所以损失函数取负值
"""
# 去均值
y_true_centered = y_true - K.mean(y_true)
y_pred_centered = y_pred - K.mean(y_pred)
# 计算相关系数
numerator = K.sum(y_true_centered * y_pred_centered)
denominator = K.sqrt(K.sum(K.square(y_true_centered)) * K.sum(K.square(y_pred_centered)) + K.epsilon())
# 返回负相关系数作为损失
return -numerator / denominator
# 自定义CCC损失函数 (Concordance Correlation Coefficient Loss)
def ccc_loss(y_true, y_pred):
"""
计算一致性相关系数的负值
CCC越高越好,所以损失函数取负值
"""
# 计算均值
mean_true = K.mean(y_true)
mean_pred = K.mean(y_pred)
# 计算方差
var_true = K.var(y_true)
var_pred = K.var(y_pred)
# 计算协方差
covar = K.mean((y_true - mean_true) * (y_pred - mean_pred))
# 计算CCC
numerator = 2 * covar
denominator = var_true + var_pred + K.square(mean_true - mean_pred) + K.epsilon()
# 返回负CCC作为损失
return -numerator / denominator
class NonlinearAlphaModel:
def __init__(self, db_path='stock_db.db', model_dir='alpha_models'):
"""初始化非线性Alpha模型"""
self.db_path = db_path
self.model_dir = model_dir
self.scaler_X = MinMaxScaler()
self.scaler_y = MinMaxScaler()
# 创建模型保存目录
if not os.path.exists(model_dir):
os.makedirs(model_dir)
def build_alpha_model(self, input_dim, activation):
"""构建改进的非线性Alpha模型,解决因子输出为0的问题"""
inputs = Input(shape=(input_dim,))
# 第一层 - 使用更多神经元,减小dropout率
x = Dense(256, activation=activation, kernel_initializer='he_normal')(inputs)
x = BatchNormalization()(x)
x = Dropout(0.2)(x) # 降低dropout率
# 第二层
x = Dense(128, activation=activation, kernel_initializer='he_normal')(x)
x = BatchNormalization()(x)
x = Dropout(0.2)(x)
# Alpha因子层 - 使用tanh激活函数确保输出不全为0
# tanh输出范围为[-1,1],可以产生更丰富的因子表示
alpha_factors = Dense(32, activation='tanh', name='alpha_factors',
kernel_regularizer=tf.keras.regularizers.l1_l2(l1=1e-5, l2=1e-4))(x)
# 输出层
outputs = Dense(1)(alpha_factors)
# 创建模型
model = Model(inputs=inputs, outputs=outputs)
return model
训练调优
网格搜索式尝试
- 两种激活函数:ReLU、Sigmoid。
- 三种损失函数:MSE、IC、CCC。
- 组合后共 2×3=6 种实验。
训练细节
- 优化器:Adam,学习率 0.001。
- 早停:EarlyStopping(patience=15),监控验证集损失,防止过拟合。
- 检查点:ModelCheckpoint(save_best_only),保存每次最优模型。
评估与选优
- 在测试集上反归一化后计算 MSE、MAE、R²、IC、CCC。
最佳模型:
- 当损失为 MSE 时,选择最小 MSE;
- 当损失为 IC/CCC 时,选择最大 IC/CCC。
def train_alpha_model(self, X_train, y_train, X_test, y_test):
"""训练非线性Alpha模型"""
print("训练非线性Alpha模型...")
# 定义不同的激活函数
activations = {
'relu': 'relu',
'sigmoid': 'sigmoid'
}
# 定义不同的损失函数
losses = {
'mse': 'mean_squared_error',
'ic': ic_loss,
'ccc': ccc_loss
}
best_model = None
best_loss = float('inf')
best_metrics = None
# 尝试不同的激活函数和损失函数组合
for act_name, activation in activations.items():
for loss_name, loss_fn in losses.items():
print(f"\n尝试 激活函数: {act_name}, 损失函数: {loss_name}")
# 构建模型
model = self.build_alpha_model(X_train.shape[1], activation)
# 编译模型
model.compile(
optimizer=Adam(learning_rate=0.001),
loss=loss_fn
)
# 早停策略
early_stopping = EarlyStopping(
monitor='val_loss',
patience=15,
restore_best_weights=True
)
# 模型检查点
checkpoint_path = os.path.join(
self.model_dir,
f'model_{act_name}_{loss_name}.keras'
)
checkpoint = ModelCheckpoint(
checkpoint_path,
save_best_only=True,
monitor='val_loss'
)
# 训练模型
history = model.fit(
X_train, y_train,
epochs=100,
batch_size=64,
validation_split=0.2,
callbacks=[early_stopping, checkpoint],
verbose=1
)
# 评估模型
y_pred = model.predict(X_test)
# 反标准化预测结果
y_test_orig = self.scaler_y.inverse_transform(y_test)
y_pred_orig = self.scaler_y.inverse_transform(y_pred)
# 计算评估指标
mse = mean_squared_error(y_test_orig, y_pred_orig)
mae = mean_absolute_error(y_test_orig, y_pred_orig)
r2 = r2_score(y_test_orig, y_pred_orig)
# 计算IC值
ic = np.corrcoef(y_test_orig.flatten(), y_pred_orig.flatten())[0, 1]
# 计算CCC值
mean_true = np.mean(y_test_orig)
mean_pred = np.mean(y_pred_orig)
var_true = np.var(y_test_orig)
var_pred = np.var(y_pred_orig)
covar = np.mean((y_test_orig - mean_true) * (y_pred_orig - mean_pred))
ccc = (2 * covar) / (var_true + var_pred + (mean_true - mean_pred)**2)
print(f"测试集MSE: {mse:.6f}")
print(f"测试集MAE: {mae:.6f}")
print(f"测试集R²: {r2:.6f}")
print(f"测试集IC: {ic:.6f}")
print(f"测试集CCC: {ccc:.6f}")
# 保存评估指标
metrics = {
'activation': act_name,
'loss': loss_name,
'mse': mse,
'mae': mae,
'r2': r2,
'ic': ic,
'ccc': ccc,
'y_test': y_test_orig,
'y_pred': y_pred_orig,
'history': history.history
}
# 保存评估结果
results_path = os.path.join(
self.model_dir,
f'results_{act_name}_{loss_name}.pkl'
)
joblib.dump(metrics, results_path)
# 更新最佳模型
if loss_name == 'mse' and mse < best_loss:
best_loss = mse
best_model = model
best_metrics = metrics
elif loss_name == 'ic' and ic > best_metrics.get('ic', -1) if best_metrics else -1:
best_model = model
best_metrics = metrics
elif loss_name == 'ccc' and ccc > best_metrics.get('ccc', -1) if best_metrics else -1:
best_model = model
best_metrics = metrics
print(f"\n最佳模型: 激活函数={best_metrics['activation']}, 损失函数={best_metrics['loss']}")
print(f"最佳MSE: {best_metrics['mse']:.6f}")
print(f"最佳IC: {best_metrics['ic']:.6f}")
print(f"最佳CCC: {best_metrics['ccc']:.6f}")
return best_model, best_metrics
保存/加载模型
保存
- 特征与目标归一化器 scaler_X.pkl、scaler_y.pkl(使用 joblib)。
- 各试验模型及结果:model_{act}{loss}.keras、results{act}_{loss}.pkl。
- 最佳模型:best_alpha_model.keras。
加载
- load_best_model() 会同时恢复模型结构、权重和自定义损失函数。
- 重新加载标准化器,保证预测阶段与训练阶段一致。
# 保存最佳模型
best_model_path = os.path.join(self.model_dir, 'best_alpha_model.keras')
best_model.save(best_model_path)
def load_best_model(self):
"""加载最佳模型"""
model_path = os.path.join(self.model_dir, 'best_alpha_model.keras')
scaler_X_path = os.path.join(self.model_dir, 'scaler_X.pkl')
scaler_y_path = os.path.join(self.model_dir, 'scaler_y.pkl')
if os.path.exists(model_path) and os.path.exists(scaler_X_path) and os.path.exists(scaler_y_path):
# 自定义对象
custom_objects = {
'ic_loss': ic_loss,
'ccc_loss': ccc_loss
}
model = load_model(model_path, custom_objects=custom_objects)
self.scaler_X = joblib.load(scaler_X_path)
self.scaler_y = joblib.load(scaler_y_path)
return model
else:
print("找不到保存的模型或标准化器")
return None
回测与预测/模型解释
批量预测:
- 调用加载好的最佳模型,遍历所有(符合条件的)股票。
- 再次取最新 N+20 天数据,保证足够计算特征。
- 标准化后 model.predict(),反归一化回收益率→预测价。
加入合理性检查:
- 基于历史波动率的 z-score 限制(超过 2σ 则截断);
- 基于市场平均涨幅的上限(如 5 倍市场涨幅)。
- 最终对收益率排序,取 Top N。
def plot_results(self, metrics, title=None):
"""绘制预测结果"""
plt.figure(figsize=(18, 12))
# 1. 散点图:预测值vs实际值
plt.subplot(2, 2, 1)
plt.scatter(metrics['y_test'], metrics['y_pred'], alpha=0.3)
min_val = min(metrics['y_test'].min(), metrics['y_pred'].min())
max_val = max(metrics['y_test'].max(), metrics['y_pred'].max())
plt.plot([min_val, max_val], [min_val, max_val], 'r--')
plt.title(f"预测值 vs 实际值 (IC={metrics['ic']:.4f})")
plt.xlabel('实际收益率')
plt.ylabel('预测收益率')
plt.grid(True)
# 2. 训练历史
plt.subplot(2, 2, 2)
plt.plot(metrics['history']['loss'], label='训练损失')
plt.plot(metrics['history']['val_loss'], label='验证损失')
plt.title('训练和验证损失')
plt.xlabel('轮次')
plt.ylabel('损失')
plt.legend()
plt.grid(True)
# 3. 预测值和实际值的分布
plt.subplot(2, 2, 3)
plt.hist(metrics['y_test'], bins=50, alpha=0.5, label='实际收益率')
plt.hist(metrics['y_pred'], bins=50, alpha=0.5, label='预测收益率')
plt.title('收益率分布')
plt.xlabel('收益率')
plt.ylabel('频率')
plt.legend()
plt.grid(True)
# 4. 预测误差分布
plt.subplot(2, 2, 4)
errors = metrics['y_pred'] - metrics['y_test']
plt.hist(errors, bins=50)
plt.title(f'预测误差分布 (MSE={metrics["mse"]:.6f})')
plt.xlabel('误差')
plt.ylabel('频率')
plt.grid(True)
# 设置总标题
if title:
plt.suptitle(title, fontsize=16)
else:
plt.suptitle(f"非线性Alpha模型结果 (激活函数: {metrics['activation']}, 损失函数: {metrics['loss']})",
fontsize=16)
plt.tight_layout()
plt.subplots_adjust(top=0.9)
# 保存图表
plt.savefig(os.path.join(self.model_dir, f'alpha_results_{metrics["activation"]}_{metrics["loss"]}.png'))
plt.show()
因子提取
目的
- 将模型中学到的“隐含因子”(即中间 layer 输出)拿出来,做进一步分析。
方法
- 用 Keras Model(inputs, outputs=alpha_factors) 抽取中间32维张量。
- 标准化这组因子,再按阈值(|value|<0.01)置零,剔除无效因子。
def extract_alpha_factors(self, model, stock_id, lookback=10):
"""提取股票的Alpha因子并进行后处理,确保因子有意义"""
# 从数据库获取最新数据
with self.get_connection() as conn:
query = """
SELECT sd.trade_date, sd.open_price, sd.high_price, sd.low_price,
sd.close_price, sd.volume, si.current_market_value
FROM stock_daily sd
JOIN stock_info si ON sd.stock_id = si.stock_id
WHERE sd.stock_id = ?
ORDER BY sd.trade_date DESC
LIMIT ?
"""
df = pd.read_sql_query(query, conn, params=[stock_id, lookback + 20])
if df.empty:
print(f"错误: 没有找到股票 {stock_id} 的数据")
return None
# 确保数据按日期升序排列
df = df.sort_values('trade_date')
# 添加特征
df = self.add_features(df)
if len(df) < lookback:
print(f"错误: 数据不足 {lookback} 行")
return None
# 提取特征
features = df.drop(['trade_date'], axis=1).values
# 获取最新的lookback天数据
latest_data = features[-lookback:].reshape(1, -1)
# 标准化特征
latest_data_scaled = self.scaler_X.transform(latest_data)
# 创建一个新模型,用于提取中间层输出
alpha_layer_model = Model(
inputs=model.input,
outputs=model.get_layer('alpha_factors').output
)
# 提取Alpha因子
alpha_factors = alpha_layer_model.predict(latest_data_scaled)
# 对因子进行后处理,确保它们有意义
# 1. 标准化因子值
alpha_factors_normalized = (alpha_factors[0] - np.mean(alpha_factors[0])) / (np.std(alpha_factors[0]) + 1e-8)
# 2. 过滤掉接近0的因子(绝对值小于阈值)
threshold = 0.01
significant_factors = np.where(np.abs(alpha_factors_normalized) > threshold, alpha_factors_normalized, 0)
return significant_factors
因子可视化
目的
- 将模型中学到的“隐含因子”(即中间 layer 输出)拿出来,做进一步分析。
def visualize_alpha_factors(self, alpha_factors, stock_id, stock_name=None):
"""可视化Alpha因子"""
if stock_name is None:
stock_name = stock_id
# 过滤掉为0的因子
non_zero_indices = np.where(alpha_factors != 0)[0]
non_zero_factors = alpha_factors[non_zero_indices]
if len(non_zero_factors) == 0:
print("警告: 所有Alpha因子都为0")
non_zero_indices = np.arange(len(alpha_factors))
non_zero_factors = alpha_factors
# 创建因子标签
factor_labels = [f"因子{i+1}" for i in non_zero_indices]
# 绘制条形图
plt.figure(figsize=(12, 6))
bars = plt.bar(factor_labels, non_zero_factors)
# 为条形图添加颜色
for i, bar in enumerate(bars):
if non_zero_factors[i] > 0:
bar.set_color('green')
else:
bar.set_color('red')
plt.title(f"{stock_name} ({stock_id}) 的Alpha因子")
plt.xlabel("因子")
plt.ylabel("因子值")
plt.xticks(rotation=45)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
# 保存图表
plt.savefig(os.path.join(self.model_dir, f'alpha_factors_{stock_id}.png'))
plt.show()
# 打印非零因子
print(f"\n{stock_name} ({stock_id}) 的显著Alpha因子:")
for i, factor_idx in enumerate(non_zero_indices):
if non_zero_factors[i] != 0:
print(f"因子{factor_idx+1}: {non_zero_factors[i]:.4f}")
总结
上述所示就是整个程序的基本框架 后续就是不断完善每一个环节
比如
数据清洗:
- 考虑缺失值,异常值处理
- 时间序列对齐
- 多源数据融合
特征工程:
- 更多的技术指标
- 因子选取
- 自动化特征生成
模型定义:
- 多目标损失
- 正则化与归纳偏置
- 结构搜索
- 可解释性嵌入
训练与调优:
- 超参数化
- 学习率策略