基于《AI系列研究之一:端到端的动态 Alpha模型》实战演练
  Co 24天前 101 4

上篇基于《AI系列研究之一:端到端的动态Alpha模型》理论分析

本篇文章会对其中各部分进行代码研究

大概流程划分为下

1.png

数据清洗

目的

  • 保证输入数据的质量,剔除不符合要求的样本。
  • 减少后续特征工程和模型训练的偏差。
  • 过滤掉 ST 和退市股,可避免模型学到噪声或极端异常;空值剔除确保计算指标和归一化不出错。
def get_all_stocks(self, exclude_st=True, exclude_delisted=True): """获取所有股票代码(排除ST股票和退市股票)""" with self.get_connection() as conn: query = """ SELECT stock_id, stock_name FROM stock_info """ stocks = pd.read_sql_query(query, conn) if exclude_st: # 排除名称中包含'ST'的股票 stocks = stocks[~stocks['stock_name'].str.contains('ST', na=False)] if exclude_delisted: # 排除名称中包含'退'的退市股票 stocks = stocks[~stocks['stock_name'].str.contains('退', na=False)] # 排除以'退市'开头的股票 stocks = stocks[~stocks['stock_name'].str.startswith('退市', na=False)] # 排除以'*'开头的股票(一些交易所使用*标记退市股) stocks = stocks[~stocks['stock_name'].str.startswith('*', na=False)] return stocks def prepare_stock_data(self, stock_id, lookback=10, prediction_days=5): """准备单个股票的训练数据""" # 从数据库获取数据 with self.get_connection() as conn: # 获取股票价格数据 query = """ SELECT sd.trade_date, sd.open_price, sd.high_price, sd.low_price, sd.close_price, sd.volume, si.current_market_value FROM stock_daily sd JOIN stock_info si ON sd.stock_id = si.stock_id WHERE sd.stock_id = ? ORDER BY sd.trade_date """ df = pd.read_sql_query(query, conn, params=[stock_id]) if df.empty: return None # 确保数据类型正确 df['trade_date'] = pd.to_datetime(df['trade_date']) numeric_columns = ['open_price', 'high_price', 'low_price', 'close_price', 'volume', 'current_market_value'] df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce') # 添加技术指标 df = self.add_features(df) # 计算未来n天的收益率作为目标变量 df['future_return'] = df['close_price'].pct_change(periods=prediction_days).shift(-prediction_days) # 删除包含NaN的行 df = df.dropna() if len(df) < lookback + prediction_days: return None # 提取特征和目标 features = df.drop(['trade_date', 'future_return'], axis=1).values target = df['future_return'].values.reshape(-1, 1) # 创建时间序列数据 X, y = [], [] for i in range(lookback, len(features)): X.append(features[i-lookback:i]) y.append(target[i]) if not X: return None X = np.array(X) y = np.array(y) # 将3D数据重塑为2D (样本数, 特征数*lookback) X = X.reshape(X.shape[0], -1) return X, y

特征工程

技术指标和因子设计

  • 收益率变化 price_change、振幅比 price_range、5 日标准差 price_std
  • 均线 ma5、ma10;RSI 相对强弱指标
  • 成交量特征:volume_change、volume_ma5、volume_ratio
  • 市值变化 market_value_change、价量比 price_volume_ratio
  • 五个自定义 Alpha 因子 alpha1~alpha5,如振幅比、成交量与价格变化比等

滚动填充与截面对齐

  • 先向前填充再向后填充,最后置零,保证无空值。
  • 最终输出:宽表格,每行是一天的全量特征;再按 lookback 切成时序样本。
def add_features(self, df): """添加技术指标和特征""" # 价格特征 df['price_change'] = df['close_price'].pct_change() df['price_range'] = (df['high_price'] - df['low_price']) / df['close_price'] df['price_std'] = df['close_price'].rolling(window=5).std() # 移动平均线 df['ma5'] = df['close_price'].rolling(window=5).mean() df['ma10'] = df['close_price'].rolling(window=10).mean() # 相对强弱指标 (RSI) delta = df['close_price'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['rsi'] = 100 - (100 / (1 + rs)) # 成交量特征 df['volume_change'] = df['volume'].pct_change() df['volume_ma5'] = df['volume'].rolling(window=5).mean() df['volume_ratio'] = df['volume'] / df['volume_ma5'] # 市值特征 df['market_value_change'] = df['current_market_value'].pct_change() # 价格与成交量关系 df['price_volume_ratio'] = df['close_price'] / df['volume'] # Alpha因子 # Alpha1: 收盘价与开盘价的差值除以振幅 df['alpha1'] = (df['close_price'] - df['open_price']) / ((df['high_price'] - df['low_price']) + 1e-5) # Alpha2: 收盘价与前一天收盘价的差值 df['alpha2'] = df['close_price'].diff() # Alpha3: 收盘价与5日均线的差值 df['alpha3'] = df['close_price'] - df['ma5'] # Alpha4: 成交量变化率与价格变化率的比值 df['alpha4'] = df['volume_change'] / (df['price_change'] + 1e-5) # Alpha5: RSI与价格变化的乘积 df['alpha5'] = df['rsi'] * df['price_change'] # 填充NaN值 df = df.fillna(method='bfill').fillna(method='ffill').fillna(0) return df

模型定义

网络结构

  • 输入层:input_dim 维度 = 特征数 × lookback。
  • 两层全连接:256 → 128 单元,He 初始化,BatchNorm + Dropout(0.2)。
  • “Alpha 因子层”:32 单元、tanh 激活,并加 L1/L2 正则化,保证输出不全为0。
  • 最终输出层:1 个神经元,直接回归未来收益率。

自定义损失函数

  • IC 损失 (ic_loss):最大化预测值与真实值的皮尔逊相关系数。
  • CCC 损失 (ccc_loss):最大化一致性相关系数,兼顾偏移与相关性。
  • 默认 MSE:均方误差。

为什么要这样设计?

  • 中间的 Alpha 因子层可视为可解释因子,tanh 增强表达;
  • 多种损失函数分别关注不同目标:(MSE)最小绝对偏差;(IC/CCC)提高排序能力与一致性;
  • 正则化和 BatchNorm 控制过拟合、加快收敛。
# 自定义IC损失函数 (Information Coefficient Loss) def ic_loss(y_true, y_pred): """ 计算预测值和真实值之间的相关系数的负值 IC越高越好,所以损失函数取负值 """ # 去均值 y_true_centered = y_true - K.mean(y_true) y_pred_centered = y_pred - K.mean(y_pred) # 计算相关系数 numerator = K.sum(y_true_centered * y_pred_centered) denominator = K.sqrt(K.sum(K.square(y_true_centered)) * K.sum(K.square(y_pred_centered)) + K.epsilon()) # 返回负相关系数作为损失 return -numerator / denominator # 自定义CCC损失函数 (Concordance Correlation Coefficient Loss) def ccc_loss(y_true, y_pred): """ 计算一致性相关系数的负值 CCC越高越好,所以损失函数取负值 """ # 计算均值 mean_true = K.mean(y_true) mean_pred = K.mean(y_pred) # 计算方差 var_true = K.var(y_true) var_pred = K.var(y_pred) # 计算协方差 covar = K.mean((y_true - mean_true) * (y_pred - mean_pred)) # 计算CCC numerator = 2 * covar denominator = var_true + var_pred + K.square(mean_true - mean_pred) + K.epsilon() # 返回负CCC作为损失 return -numerator / denominator class NonlinearAlphaModel: def __init__(self, db_path='stock_db.db', model_dir='alpha_models'): """初始化非线性Alpha模型""" self.db_path = db_path self.model_dir = model_dir self.scaler_X = MinMaxScaler() self.scaler_y = MinMaxScaler() # 创建模型保存目录 if not os.path.exists(model_dir): os.makedirs(model_dir) def build_alpha_model(self, input_dim, activation): """构建改进的非线性Alpha模型,解决因子输出为0的问题""" inputs = Input(shape=(input_dim,)) # 第一层 - 使用更多神经元,减小dropout率 x = Dense(256, activation=activation, kernel_initializer='he_normal')(inputs) x = BatchNormalization()(x) x = Dropout(0.2)(x) # 降低dropout率 # 第二层 x = Dense(128, activation=activation, kernel_initializer='he_normal')(x) x = BatchNormalization()(x) x = Dropout(0.2)(x) # Alpha因子层 - 使用tanh激活函数确保输出不全为0 # tanh输出范围为[-1,1],可以产生更丰富的因子表示 alpha_factors = Dense(32, activation='tanh', name='alpha_factors', kernel_regularizer=tf.keras.regularizers.l1_l2(l1=1e-5, l2=1e-4))(x) # 输出层 outputs = Dense(1)(alpha_factors) # 创建模型 model = Model(inputs=inputs, outputs=outputs) return model

训练调优

网格搜索式尝试

  • 两种激活函数:ReLU、Sigmoid。
  • 三种损失函数:MSE、IC、CCC。
  • 组合后共 2×3=6 种实验。

训练细节

  • 优化器:Adam,学习率 0.001。
  • 早停:EarlyStopping(patience=15),监控验证集损失,防止过拟合。
  • 检查点:ModelCheckpoint(save_best_only),保存每次最优模型。

评估与选优

  • 在测试集上反归一化后计算 MSE、MAE、R²、IC、CCC。

最佳模型:

  • 当损失为 MSE 时,选择最小 MSE;
  • 当损失为 IC/CCC 时,选择最大 IC/CCC。
def train_alpha_model(self, X_train, y_train, X_test, y_test): """训练非线性Alpha模型""" print("训练非线性Alpha模型...") # 定义不同的激活函数 activations = { 'relu': 'relu', 'sigmoid': 'sigmoid' } # 定义不同的损失函数 losses = { 'mse': 'mean_squared_error', 'ic': ic_loss, 'ccc': ccc_loss } best_model = None best_loss = float('inf') best_metrics = None # 尝试不同的激活函数和损失函数组合 for act_name, activation in activations.items(): for loss_name, loss_fn in losses.items(): print(f"\n尝试 激活函数: {act_name}, 损失函数: {loss_name}") # 构建模型 model = self.build_alpha_model(X_train.shape[1], activation) # 编译模型 model.compile( optimizer=Adam(learning_rate=0.001), loss=loss_fn ) # 早停策略 early_stopping = EarlyStopping( monitor='val_loss', patience=15, restore_best_weights=True ) # 模型检查点 checkpoint_path = os.path.join( self.model_dir, f'model_{act_name}_{loss_name}.keras' ) checkpoint = ModelCheckpoint( checkpoint_path, save_best_only=True, monitor='val_loss' ) # 训练模型 history = model.fit( X_train, y_train, epochs=100, batch_size=64, validation_split=0.2, callbacks=[early_stopping, checkpoint], verbose=1 ) # 评估模型 y_pred = model.predict(X_test) # 反标准化预测结果 y_test_orig = self.scaler_y.inverse_transform(y_test) y_pred_orig = self.scaler_y.inverse_transform(y_pred) # 计算评估指标 mse = mean_squared_error(y_test_orig, y_pred_orig) mae = mean_absolute_error(y_test_orig, y_pred_orig) r2 = r2_score(y_test_orig, y_pred_orig) # 计算IC值 ic = np.corrcoef(y_test_orig.flatten(), y_pred_orig.flatten())[0, 1] # 计算CCC值 mean_true = np.mean(y_test_orig) mean_pred = np.mean(y_pred_orig) var_true = np.var(y_test_orig) var_pred = np.var(y_pred_orig) covar = np.mean((y_test_orig - mean_true) * (y_pred_orig - mean_pred)) ccc = (2 * covar) / (var_true + var_pred + (mean_true - mean_pred)**2) print(f"测试集MSE: {mse:.6f}") print(f"测试集MAE: {mae:.6f}") print(f"测试集R²: {r2:.6f}") print(f"测试集IC: {ic:.6f}") print(f"测试集CCC: {ccc:.6f}") # 保存评估指标 metrics = { 'activation': act_name, 'loss': loss_name, 'mse': mse, 'mae': mae, 'r2': r2, 'ic': ic, 'ccc': ccc, 'y_test': y_test_orig, 'y_pred': y_pred_orig, 'history': history.history } # 保存评估结果 results_path = os.path.join( self.model_dir, f'results_{act_name}_{loss_name}.pkl' ) joblib.dump(metrics, results_path) # 更新最佳模型 if loss_name == 'mse' and mse < best_loss: best_loss = mse best_model = model best_metrics = metrics elif loss_name == 'ic' and ic > best_metrics.get('ic', -1) if best_metrics else -1: best_model = model best_metrics = metrics elif loss_name == 'ccc' and ccc > best_metrics.get('ccc', -1) if best_metrics else -1: best_model = model best_metrics = metrics print(f"\n最佳模型: 激活函数={best_metrics['activation']}, 损失函数={best_metrics['loss']}") print(f"最佳MSE: {best_metrics['mse']:.6f}") print(f"最佳IC: {best_metrics['ic']:.6f}") print(f"最佳CCC: {best_metrics['ccc']:.6f}") return best_model, best_metrics

保存/加载模型

保存

  • 特征与目标归一化器 scaler_X.pkl、scaler_y.pkl(使用 joblib)。
  • 各试验模型及结果:model_{act}{loss}.keras、results{act}_{loss}.pkl。
  • 最佳模型:best_alpha_model.keras。

加载

  • load_best_model() 会同时恢复模型结构、权重和自定义损失函数。
  • 重新加载标准化器,保证预测阶段与训练阶段一致。
# 保存最佳模型 best_model_path = os.path.join(self.model_dir, 'best_alpha_model.keras') best_model.save(best_model_path) def load_best_model(self): """加载最佳模型""" model_path = os.path.join(self.model_dir, 'best_alpha_model.keras') scaler_X_path = os.path.join(self.model_dir, 'scaler_X.pkl') scaler_y_path = os.path.join(self.model_dir, 'scaler_y.pkl') if os.path.exists(model_path) and os.path.exists(scaler_X_path) and os.path.exists(scaler_y_path): # 自定义对象 custom_objects = { 'ic_loss': ic_loss, 'ccc_loss': ccc_loss } model = load_model(model_path, custom_objects=custom_objects) self.scaler_X = joblib.load(scaler_X_path) self.scaler_y = joblib.load(scaler_y_path) return model else: print("找不到保存的模型或标准化器") return None

回测与预测/模型解释

批量预测:

  • 调用加载好的最佳模型,遍历所有(符合条件的)股票。
  • 再次取最新 N+20 天数据,保证足够计算特征。
  • 标准化后 model.predict(),反归一化回收益率→预测价。

加入合理性检查:

  • 基于历史波动率的 z-score 限制(超过 2σ 则截断);
  • 基于市场平均涨幅的上限(如 5 倍市场涨幅)。
  • 最终对收益率排序,取 Top N。
def plot_results(self, metrics, title=None): """绘制预测结果""" plt.figure(figsize=(18, 12)) # 1. 散点图:预测值vs实际值 plt.subplot(2, 2, 1) plt.scatter(metrics['y_test'], metrics['y_pred'], alpha=0.3) min_val = min(metrics['y_test'].min(), metrics['y_pred'].min()) max_val = max(metrics['y_test'].max(), metrics['y_pred'].max()) plt.plot([min_val, max_val], [min_val, max_val], 'r--') plt.title(f"预测值 vs 实际值 (IC={metrics['ic']:.4f})") plt.xlabel('实际收益率') plt.ylabel('预测收益率') plt.grid(True) # 2. 训练历史 plt.subplot(2, 2, 2) plt.plot(metrics['history']['loss'], label='训练损失') plt.plot(metrics['history']['val_loss'], label='验证损失') plt.title('训练和验证损失') plt.xlabel('轮次') plt.ylabel('损失') plt.legend() plt.grid(True) # 3. 预测值和实际值的分布 plt.subplot(2, 2, 3) plt.hist(metrics['y_test'], bins=50, alpha=0.5, label='实际收益率') plt.hist(metrics['y_pred'], bins=50, alpha=0.5, label='预测收益率') plt.title('收益率分布') plt.xlabel('收益率') plt.ylabel('频率') plt.legend() plt.grid(True) # 4. 预测误差分布 plt.subplot(2, 2, 4) errors = metrics['y_pred'] - metrics['y_test'] plt.hist(errors, bins=50) plt.title(f'预测误差分布 (MSE={metrics["mse"]:.6f})') plt.xlabel('误差') plt.ylabel('频率') plt.grid(True) # 设置总标题 if title: plt.suptitle(title, fontsize=16) else: plt.suptitle(f"非线性Alpha模型结果 (激活函数: {metrics['activation']}, 损失函数: {metrics['loss']})", fontsize=16) plt.tight_layout() plt.subplots_adjust(top=0.9) # 保存图表 plt.savefig(os.path.join(self.model_dir, f'alpha_results_{metrics["activation"]}_{metrics["loss"]}.png')) plt.show()

因子提取

目的

  • 将模型中学到的“隐含因子”(即中间 layer 输出)拿出来,做进一步分析。

方法

  • 用 Keras Model(inputs, outputs=alpha_factors) 抽取中间32维张量。
  • 标准化这组因子,再按阈值(|value|<0.01)置零,剔除无效因子。
def extract_alpha_factors(self, model, stock_id, lookback=10): """提取股票的Alpha因子并进行后处理,确保因子有意义""" # 从数据库获取最新数据 with self.get_connection() as conn: query = """ SELECT sd.trade_date, sd.open_price, sd.high_price, sd.low_price, sd.close_price, sd.volume, si.current_market_value FROM stock_daily sd JOIN stock_info si ON sd.stock_id = si.stock_id WHERE sd.stock_id = ? ORDER BY sd.trade_date DESC LIMIT ? """ df = pd.read_sql_query(query, conn, params=[stock_id, lookback + 20]) if df.empty: print(f"错误: 没有找到股票 {stock_id} 的数据") return None # 确保数据按日期升序排列 df = df.sort_values('trade_date') # 添加特征 df = self.add_features(df) if len(df) < lookback: print(f"错误: 数据不足 {lookback} 行") return None # 提取特征 features = df.drop(['trade_date'], axis=1).values # 获取最新的lookback天数据 latest_data = features[-lookback:].reshape(1, -1) # 标准化特征 latest_data_scaled = self.scaler_X.transform(latest_data) # 创建一个新模型,用于提取中间层输出 alpha_layer_model = Model( inputs=model.input, outputs=model.get_layer('alpha_factors').output ) # 提取Alpha因子 alpha_factors = alpha_layer_model.predict(latest_data_scaled) # 对因子进行后处理,确保它们有意义 # 1. 标准化因子值 alpha_factors_normalized = (alpha_factors[0] - np.mean(alpha_factors[0])) / (np.std(alpha_factors[0]) + 1e-8) # 2. 过滤掉接近0的因子(绝对值小于阈值) threshold = 0.01 significant_factors = np.where(np.abs(alpha_factors_normalized) > threshold, alpha_factors_normalized, 0) return significant_factors

因子可视化

目的

  • 将模型中学到的“隐含因子”(即中间 layer 输出)拿出来,做进一步分析。
def visualize_alpha_factors(self, alpha_factors, stock_id, stock_name=None): """可视化Alpha因子""" if stock_name is None: stock_name = stock_id # 过滤掉为0的因子 non_zero_indices = np.where(alpha_factors != 0)[0] non_zero_factors = alpha_factors[non_zero_indices] if len(non_zero_factors) == 0: print("警告: 所有Alpha因子都为0") non_zero_indices = np.arange(len(alpha_factors)) non_zero_factors = alpha_factors # 创建因子标签 factor_labels = [f"因子{i+1}" for i in non_zero_indices] # 绘制条形图 plt.figure(figsize=(12, 6)) bars = plt.bar(factor_labels, non_zero_factors) # 为条形图添加颜色 for i, bar in enumerate(bars): if non_zero_factors[i] > 0: bar.set_color('green') else: bar.set_color('red') plt.title(f"{stock_name} ({stock_id}) 的Alpha因子") plt.xlabel("因子") plt.ylabel("因子值") plt.xticks(rotation=45) plt.grid(axis='y', linestyle='--', alpha=0.7) plt.tight_layout() # 保存图表 plt.savefig(os.path.join(self.model_dir, f'alpha_factors_{stock_id}.png')) plt.show() # 打印非零因子 print(f"\n{stock_name} ({stock_id}) 的显著Alpha因子:") for i, factor_idx in enumerate(non_zero_indices): if non_zero_factors[i] != 0: print(f"因子{factor_idx+1}: {non_zero_factors[i]:.4f}")

总结

上述所示就是整个程序的基本框架 后续就是不断完善每一个环节

比如

数据清洗:

  • 考虑缺失值,异常值处理
  • 时间序列对齐
  • 多源数据融合

特征工程:

  • 更多的技术指标
  • 因子选取
  • 自动化特征生成

模型定义:

  • 多目标损失
  • 正则化与归纳偏置
  • 结构搜索
  • 可解释性嵌入

训练与调优:

  • 超参数化
  • 学习率策略
最后一次编辑于 24天前 0

普洱咖啡

码力很强啊

2025-05-29 14:39:50      回复

Co

就是一把梭

2025-06-03 00:40:30 回复

13636435276

基础行情数据停牌记录要保留还是删除。股池怎么处理(影响截面因子)?

2025-05-13 18:59:18      回复

Co

我觉得都行,看选择,有些停牌是好事 有些停牌不是好事 如果要考虑停牌的因素的话 可能还要将其他方面转成因子来介入处理

2025-05-13 22:04:27 回复

推荐阅读