上一篇文章我们介绍了高频因子的波动率类因子,这一篇继续介绍高阶特征因子,并在因子分析的基础上加入策略回测。
研究环境利用聚宽因子分析API,构建因子函数类;研究在日内高频分钟级数据中挖掘构建高频因子,对该因子进行有效性检验,并利用回测平台进行回测。
一 高阶特征因子
1.1 构建方法
第三大类因子为高阶特征因子。高阶特征利用股票高阶矩与其未来收益建立联系,刻画日内价格分布以及快速变化的特征,能够有效反映价格的除动量和波动率这样一阶和二阶特征外更高阶的特征。
将分钟级收益率、成交量的偏度和峰度进行组合,可以构建 6 个高频高阶特征因子。shape_skew_m 和 shape_skratio_m 因子在大多数选股中的表现较好。我们将高阶特征因子在全市场、沪深 300、中证 500 和中证 1000 分别进行月度和周度调仓的有效性检验。根据因子间的相关性统计,上述三个因子间均存在较强的相关性。
1.2 检验结果
全市场范围内,shape_skew_m、shape_skew_std 和 shape_skratio_m 因子具有良好有效性,月度调仓频率下,它们的 ICIR 表现均超过 1.0,且 shape_skew_m 因子的年化多空收益达到了 22.6%。在沪深 300 中,因子表现有所下降,其中相对表现较好的是 shape_skew_std和 shape_kurt_std 因子,它们的 ICIR 均超过了 0.6,shape_skew_std 因子的多空收益达到了14.9%,夏普比率为 1.6。
高阶特征因子整体有效性良好,其中 shape_skew_m 和 shape_skratio_m 因子在多个选股范围内均具有不错的预测性和稳定性,可以关注。
高阶特征因子之间的相关性统计如下表:
1.3 收益统计
全市场范围内,shape_skew_m 因子的多空收益跑赢基线且远超其他同类因子,具有良好的单调性,与常见因子不存在显著相关性;而 shape_skratio_m 因子的多头超额收益表现型良好,在 2020 年收益水平有所放缓。所选择的 5 个表现较好的高频特征因子整体有效性较好,分组年化超额收益区分度较为明显。
二 因子复现
下面我们选取一个具体因子来进行构建和回测,使用聚宽研究环境。选取因子为shape_skratio_m,构建方式为日内分钟k线收益率的偏度和峰度比值,最后的后缀“_m”表式对调仓周期取当期均值。
使用因子分析API,在研究环境中调用以进行因子分析。API使用详情可参阅聚宽技术文档。
# 构建因子分析框架
from jqfactor import Factor,analyze_factor
from jqdata.apis import *
import datetime
import pandas as pd
from jqfactor import neutralize
from jqfactor import winsorize_med
from jqfactor import standardlize
# 构建高阶特征高频因子计算函数
class shape_skratio_m(Factor):
# 设置因子名称
name = 'shape_skratio_m'
# 设置获取数据的时间窗口长度
max_window = 1
# 设置依赖的数据
dependencies = ['close'] #参考标的
# 计算因子的函数, 需要返回一个 pandas.Series, index 是股票代码,value 是因子值
def calc(self, data):
# 获取日期,股票池
date= data['close'].index[0] #获取(上一)交易日 实际上的逻辑是用上一交易日的数据在今天进行交易,时间戳是昨天的时间
security_list = data['close'].columns.tolist()
# 以下为获取更多数据
#获取分钟级数据,我们取前20日平均值作为当期的因子值
stock_bars = get_price(security_list,count =240 * 20,start_date=None, end_date=date,
frequency='1m', fields=['close'], skip_paused=False,
fq='none')
stock_bars['close'] = stock_bars['close'].fillna(method='ffill')
stock_bars['return'] = np.log(stock_bars['close']/stock_bars['close'].shift(1))
stock_bars['return'] = stock_bars['return'].fillna(0)
stock_close = stock_bars['return']
#以日为单位迭代计算当日因子值
stock_close['time'] = [i.time() for i in stock_close.index]
stock_close.index = [i.date() for i in stock_close.index]
stock_close['day'] = stock_close.index
GroupBy = stock_close.groupby(by='day')
vol_return = pd.DataFrame()
vol_return_temp = pd.DataFrame()
for day, group in GroupBy:
group = group.drop(['day','time'], axis=1)
temp = self.cal_factor(group)
vol_return[day] = temp['factor']
vol_return_temp['factor'] = vol_return.apply(np.mean, axis=1)
return vol_return_temp['factor']
## 计算因子值
def cal_factor(self,stock_bar):
rtn_vol = pd.DataFrame()
rtn_vol['factor']=stock_bar.skew()/stock_bar.kurt()
return rtn_vol
理论上峰度总为正,偏度可正可负。通过因子计算结果,可以看到有些股票左偏,有些股票右偏。
三 因子分析
3.1 引擎初始化
下面我们对上面构建的高阶特征因子进行回测分析,从各个方面考察因子的有效性。为保持与研报的一致性,我们主要考察周度和月度调仓频率下的表现。
参数设置如下:
(1)测试时间:2022-01-01 至 2025-06-05;
(2)分位数:十分位数;
(3)调仓周期:5日、30日;
(4)仓位配置:等权配置;
(5)股票池:沪深300
start = datetime.datetime.today()
far = analyze_factor(
shape_skratio_m(), start_date='2022-01-01', end_date='2025-06-25', universe='000300.XSHG',
quantiles=10,
periods=(5,30),
industry='jq_l1',
weight_method='avg',
use_real_price=True,
skip_paused=False,
max_loss=0.2
)
end = datetime.datetime.today()
print('程序耗时:', end - start)
3.2 因子IC分析
我们使用IC信息系数法对趋势强度因子进行检验。我们排除因子值大小的影响,使用RankIC来替代IC值进行分析。RankIC为当期因子值的排名与下一期因子值排名的相关性。
下面我们绘制因子5日、30日的RankIC时间序列图和月度均值图。
在 2022-01到2025-06这三年中,5、30日IC均值分别为为-0.01、-0.009,说明因子值与下月收益率呈现一定的负相关性,即因子值越低,下月预期收益率越高。
我们考察5、30日因子的IR值,分别为0.097、0.101。可见,5日换仓和30日换仓因子稳定性相近,5日更好。
进一步的计算IC_IR值为:-0.103、-0.089。因此,在单位风险下,5日换仓因子具有最佳的有效性。
四 策略回测
根据上述分析,我们构建因子的回测组合,用于考察因子实际的选股能力。这一环节我们使用真实费率。
股票池:沪深300,剔除 ST、停牌、上市时间小于 6 个月的股票
回测时间:2020-01-01 至 2025-05-31
调仓期:每30天
费率:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5元
选股步骤可分为以下三部分:
(1)在每个调仓日第一天计算因子值
(2)对因子值根据从小到大的顺序进行排序,并将其等分为 10 组
(3)每个调仓日选取第n组(前n*10%)股票池进行调仓交易
# 导入函数库
from jqdata import *
import datetime
import pandas as pd
import statsmodels.api as sm
from statsmodels import regression
from statsmodels.formula.api import ols
from jqfactor import Factor, calc_factors
from jqfactor import neutralize
from jqfactor import winsorize_med
from jqfactor import standardlize
# 初始化函数,设定基准等等
def initialize(context):
# 设定沪深300作为基准
set_benchmark('000300.XSHG')
# 开启动态复权模式(真实价格)
set_option('use_real_price', True)
# 输出内容到日志 log.info()
log.info('初始函数开始运行且全局只运行一次')
# 过滤掉order系列API产生的比error级别低的log
# log.set_level('order', 'error')
### 股票相关设定 ###
# 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.00025, close_commission=0.00025, min_commission=5),type='stock')
## 运行函数:每月第一天开盘后进行调仓
run_monthly(market_open, monthday=1, time='open', reference_security='000300.XSHG')
#run_daily(market_open, time='open', reference_security='000300.XSHG')
### 变量初始化 ###
g.index = ['000300.XSHG']#['000852.XSHG','000906.XSHG']
g.N = 6
g.num = 10 #选取第num组
g.R = 20
g.timer = 1
'''
######################策略的交易逻辑######################
每周计算因子值, 并买入前 20 支股票
'''
# 每周或每月开盘运行一次, 按照上一交易日的因子值进行调仓
def market_open(context):
if g.timer % 10 == 1:
log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
# 1. 定义计算因子的 universe,
# 建议使用与 benchmark 相同的指数,方便判断选股带来的 alpha
#universe = get_index_stocks('000300.XSHG')
security_list = get_qualified_stocks(context)
# 2. 获取因子值
# get_factor_values 有三个参数,context、因子列表、股票池,
# 返回值是一个 dict,key 是因子类的 name 属性,value 是 pandas.Series
# Series 的 index 是股票代码,value 是当前日期能看到的最新因子值
factor_values = get_factor_values(context, [shape_skratio_m()], security_list)
# 3. 对因子做线性加权处理, 并将结果进行排序。您在这一步可以研究自己的因子权重模型来优化策略结果。
# 对因子做 rank 是因为不同的因子间由于量纲等原因无法直接相加,这是一种去量纲的方法。
#final_factor = vol_return1min.rank(ascending=False)
shape_skratio = factor_values['shape_skratio_m']
# 4. 由因子确定每日持仓的股票列表:
# 采用因子值由大到小排名前 10% 股票作为目标持仓
stock_list, buy_stock_count = select_stock_universe(context, shape_skratio, g.num)
# 5. 根据股票列表进行调仓:
# 这里采取所有股票等额买入的方式,您可以使用自己的风险模型自由发挥个股的权重搭配
rebalance_position(context, stock_list, buy_stock_count)
g.timer += 1
'''
######################下面是策略中使用的因子######################
可以先使用因子分析功能生产出理想的因子, 再加入到策略中
因子分析:https://www.joinquant.com/algorithm/factor/list
'''
# 构建高阶特征高频因子计算函数
class shape_skratio_m(Factor):
# 设置因子名称
name = 'shape_skratio_m'
# 设置获取数据的时间窗口长度
max_window = 1
# 设置依赖的数据
dependencies = ['close'] #参考标的
# 计算因子的函数, 需要返回一个 pandas.Series, index 是股票代码,value 是因子值
def calc(self, data):
# 获取日期,股票池
date= data['close'].index[0] #获取(上一)交易日 实际上的逻辑是用上一交易日的数据在今天进行交易,时间戳是昨天的时间
security_list = data['close'].columns.tolist()
# 以下为获取更多数据
#获取分钟级数据,我们取前20日平均值作为当期的因子值
stock_bars = get_price(security_list,count =240 * 20,start_date=None, end_date=date,
frequency='1m', fields=['close'], skip_paused=False,
fq='none')
stock_bars['close'] = stock_bars['close'].fillna(method='ffill')
stock_bars['return'] = np.log(stock_bars['close']/stock_bars['close'].shift(1))
stock_bars['return'] = stock_bars['return'].fillna(0)
stock_close = stock_bars['return']
#以日为单位迭代计算当日因子值
stock_close['time'] = [i.time() for i in stock_close.index]
stock_close.index = [i.date() for i in stock_close.index]
stock_close['day'] = stock_close.index
GroupBy = stock_close.groupby(by='day')
vol_return = pd.DataFrame()
vol_return_temp = pd.DataFrame()
for day, group in GroupBy:
group = group.drop(['day','time'], axis=1)
temp = self.cal_factor(group)
vol_return[day] = temp['factor']
vol_return_temp['factor'] = vol_return.apply(np.mean, axis=1)
return vol_return_temp['factor']
## 计算因子值
def cal_factor(self,stock_bar):
rtn_vol = pd.DataFrame()
rtn_vol['factor']=stock_bar.skew()/stock_bar.kurt()
return rtn_vol
"""
###################### 工具 ######################
"""
## 开盘前运行函数
def before_trading_start(context):
# 输出运行时间
log.info('函数运行时间(before_market_open):'+str(context.current_dt.time()))
# 给微信发送消息(添加模拟交易,并绑定微信生效)
# send_message('美好的一天~')
## 收盘后运行函数
def after_trading_end(context):
log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
#得到当天所有成交记录
trades = get_trades()
for _trade in trades.values():
log.info('成交记录:'+str(_trade))
log.info('一天结束')
log.info('##############################################################')
## 选股函数
def get_qualified_stocks(context):
#获取前一个交易日日期
previous_date = context.previous_date
#获取全体A股
stock_list = []
for index in g.index:
stock_list += get_index_stocks(index_symbol=index, date=previous_date)
#剔除ST
st_list = get_extras('is_st', stock_list, count=1, end_date=previous_date)
stock_list = [stock for stock in stock_list if not st_list[stock][0]]
#剔除上市不足N个月新股
stock_list_fn = []
for stock in stock_list:
start_date = get_security_info(stock).start_date
if start_date < previous_date - datetime.timedelta(days=g.N*30):
stock_list_fn.append(stock)
#剔除停牌股票
paused_series = get_price(stock_list_fn,end_date=context.current_dt,count=1,fields='paused')['paused'].iloc[0]
stock_list_fn = paused_series[paused_series==False].index.tolist()
return stock_list_fn
## 待选股票池
def select_stock_universe(context, factor_df, num):
length = int(len(factor_df.index) / 10)
stock_list = factor_df.index[length*(num-1):length*num]
return (stock_list.tolist(),length)
"""
调仓:
先卖出持仓中不在 stock_list 中的股票
再等价值买入 stock_list 中的股票
"""
## 调仓
def rebalance_position(context, buy_stocks, buy_stock_count):
# 现持仓的股票,如果不在“目标池”中,且未涨停,就卖出
if len(context.portfolio.positions)>0:
last_prices = history(1, '1m', 'close', security_list=context.portfolio.positions.keys())
for stock in context.portfolio.positions.keys():
if stock not in buy_stocks :
curr_data = get_current_data()
if last_prices[stock][-1] < curr_data[stock].high_limit:
order_target_value(stock, 0)
# 依次买入“目标池”中的股票
for stock in buy_stocks:
position_count = len(context.portfolio.positions)
curr_data = get_current_data()
last_prices = history(1, '1m', 'close', security_list=buy_stocks)
#if get_growth_rate(g.indexNew,10) >= 0.01:
if buy_stock_count > position_count:
value = context.portfolio.cash / (buy_stock_count - position_count)
if context.portfolio.positions[stock].total_amount == 0 and last_prices[stock][-1] < curr_data[stock].high_limit:
order_target_value(stock, value)
"""
# 策略中获取因子数据的函数
每日返回上一日的因子数据
详见 帮助-单因子分析
"""
def get_factor_values(context,factor_list, universe):
"""
输入: 因子、股票池
返回: 前一日的因子值
"""
# 取因子名称
factor_name = list(factor.name for factor in factor_list)
# 计算因子值
values = calc_factors(universe,
factor_list,
context.previous_date,
context.previous_date)
# 装入 dict
factor_dict = {i:values[i].iloc[0] for i in factor_name}
return factor_dict
沪深300股票池前10%策略收益16.95%,略高于同期基准收益-6.26%。但第二组(前10%~20%)策略收益46.66%,高于第一组。可以看出区别出现在2022年,2022年以前第一组好于第二组,2022年以后第二组逐渐超过第一组。
五 分层回测
5.1 分组回测画图
回测结果第一层和第二层的区分效果一般。我们将10个组合同时进行回测,并将主要特征画图,以便分析总体分层效果。将结果保存在研究环境当中,方便以后调用。
#1 先导入所需要的程序包
import datetime
import numpy as np
import pandas as pd
import time
from jqdata import *
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import copy
import pickle
# 定义类'参数分析'
class parameter_analysis(object):
# 定义函数中不同的变量
def __init__(self, algorithm_id=None):
self.algorithm_id = algorithm_id # 回测id
self.params_df = pd.DataFrame() # 回测中所有调参备选值的内容,列名字为对应修改面两名称,对应回测中的 g.XXXX
self.results = {} # 回测结果的回报率,key 为 params_df 的行序号,value 为
self.evaluations = {} # 回测结果的各项指标,key 为 params_df 的行序号,value 为一个 dataframe
self.backtest_ids = {} # 回测结果的 id
# 新加入的基准的回测结果 id,可以默认为空 '',则使用回测中设定的基准
self.benchmark_id = ''
self.benchmark_returns = [] # 新加入的基准的回测回报率
self.returns = {} # 记录所有回报率
self.excess_returns = {} # 记录超额收益率
self.log_returns = {} # 记录收益率的 log 值
self.log_excess_returns = {} # 记录超额收益的 log 值
self.dates = [] # 回测对应的所有日期
self.excess_max_drawdown = {} # 计算超额收益的最大回撤
self.excess_annual_return = {} # 计算超额收益率的年化指标
self.evaluations_df = pd.DataFrame() # 记录各项回测指标,除日回报率外
# 定义排队运行多参数回测函数
def run_backtest(self, #
algorithm_id=None, # 回测策略id
running_max=10, # 回测中同时巡行最大回测数量
start_date='2006-01-01', # 回测的起始日期
end_date='2016-11-30', # 回测的结束日期
frequency='day', # 回测的运行频率
initial_cash='1000000', # 回测的初始持仓金额
param_names=[], # 回测中调整参数涉及的变量
param_values=[] # 回测中每个变量的备选参数值
):
# 当此处回测策略的 id 没有给出时,调用类输入的策略 id
if algorithm_id == None: algorithm_id=self.algorithm_id
# 生成所有参数组合并加载到 df 中
# 包含了不同参数具体备选值的排列组合中一组参数的 tuple 的 list
param_combinations = list(itertools.product(*param_values))
# 生成一个 dataframe, 对应的列为每个调参的变量,每个值为调参对应的备选值
to_run_df = pd.DataFrame(param_combinations)
# 修改列名称为调参变量的名字
to_run_df.columns = param_names
# 设定运行起始时间和保存格式
start = time.time()
# 记录结束的运行回测
finished_backtests = {}
# 记录运行中的回测
running_backtests = {}
# 计数器
pointer = 0
# 总运行回测数目,等于排列组合中的元素个数
total_backtest_num = len(param_combinations)
# 记录回测结果的回报率
all_results = {}
# 记录回测结果的各项指标
all_evaluations = {}
# 在运行开始时显示
print('【已完成|运行中|待运行】:'),
# 当运行回测开始后,如果没有全部运行完全的话:
while len(finished_backtests)<total_backtest_num:
# 显示运行、完成和待运行的回测个数
print('[%s|%s|%s].' % (len(finished_backtests),
len(running_backtests),
(total_backtest_num-len(finished_backtests)-len(running_backtests)) )),
# 记录当前运行中的空位数量
to_run = min(running_max-len(running_backtests), total_backtest_num-len(running_backtests)-len(finished_backtests))
# 把可用的空位进行跑回测
for i in range(pointer, pointer+to_run):
# 备选的参数排列组合的 df 中第 i 行变成 dict,每个 key 为列名字,value 为 df 中对应的值
params = to_run_df.ix[i].to_dict()
# 记录策略回测结果的 id,调整参数 extras 使用 params 的内容
backtest = create_backtest(algorithm_id = algorithm_id,
start_date = start_date,
end_date = end_date,
frequency = frequency,
initial_cash = initial_cash,
extras = params,
# 再回测中把改参数的结果起一个名字,包含了所有涉及的变量参数值
name = str(params)
)
# 记录运行中 i 回测的回测 id
running_backtests[i] = backtest
# 计数器计数运行完的数量
pointer = pointer+to_run
# 获取回测结果
failed = []
finished = []
# 对于运行中的回测,key 为 to_run_df 中所有排列组合中的序数
for key in running_backtests.keys():
# 研究调用回测的结果,running_backtests[key] 为运行中保存的结果 id
bt = get_backtest(running_backtests[key])
# 获得运行回测结果的状态,成功和失败都需要运行结束后返回,如果没有返回则运行没有结束
status = bt.get_status()
# 当运行回测失败
if status == 'failed':
# 失败 list 中记录对应的回测结果 id
failed.append(key)
# 当运行回测成功时
elif status == 'done':
# 成功 list 记录对应的回测结果 id,finish 仅记录运行成功的
finished.append(key)
# 回测回报率记录对应回测的回报率 dict, key to_run_df 中所有排列组合中的序数, value 为回报率的 dict
# 每个 value 一个 list 每个对象为一个包含时间、日回报率和基准回报率的 dict
all_results[key] = bt.get_results()
# 回测回报率记录对应回测结果指标 dict, key to_run_df 中所有排列组合中的序数, value 为回测结果指标的 dataframe
all_evaluations[key] = bt.get_risk()
# 记录运行中回测结果 id 的 list 中删除失败的运行
for key in failed:
running_backtests.pop(key)
# 在结束回测结果 dict 中记录运行成功的回测结果 id,同时在运行中的记录中删除该回测
for key in finished:
finished_backtests[key] = running_backtests.pop(key)
# 当一组同时运行的回测结束时报告时间
if len(finished_backtests) != 0 and len(finished_backtests) % running_max == 0 and to_run !=0:
# 记录当时时间
middle = time.time()
# 计算剩余时间,假设没工作量时间相等的话
remain_time = (middle - start) * (total_backtest_num - len(finished_backtests)) / len(finished_backtests)
# print 当前运行时间
print('[已用%s时,尚余%s时,请不要关闭浏览器].' % (str(round((middle - start) / 60.0 / 60.0,3)),
str(round(remain_time / 60.0 / 60.0,3)))),
# 5秒钟后再跑一下
time.sleep(5)
# 记录结束时间
end = time.time()
print('')
print('【回测完成】总用时:%s秒(即%s小时)。' % (str(int(end-start)),
str(round((end-start)/60.0/60.0,2)))),
# 对应修改类内部对应
self.params_df = to_run_df
self.results = all_results
self.evaluations = all_evaluations
self.backtest_ids = finished_backtests
#7 最大回撤计算方法
def find_max_drawdown(self, returns):
# 定义最大回撤的变量
result = 0
# 记录最高的回报率点
historical_return = 0
# 遍历所有日期
for i in range(len(returns)):
# 最高回报率记录
historical_return = max(historical_return, returns[i])
# 最大回撤记录
drawdown = 1-(returns[i] + 1) / (historical_return + 1)
# 记录最大回撤
result = max(drawdown, result)
# 返回最大回撤值
return result
# log 收益、新基准下超额收益和相对与新基准的最大回撤
def organize_backtest_results(self, benchmark_id=None):
# 若新基准的回测结果 id 没给出
if benchmark_id==None:
# 使用默认的基准回报率,默认的基准在回测策略中设定
self.benchmark_returns = [x['benchmark_returns'] for x in self.results[0]]
# 当新基准指标给出后
else:
# 基准使用新加入的基准回测结果
self.benchmark_returns = [x['returns'] for x in get_backtest(benchmark_id).get_results()]
# 回测日期为结果中记录的第一项对应的日期
self.dates = [x['time'] for x in self.results[0]]
# 对应每个回测在所有备选回测中的顺序 (key),生成新数据
# 由 {key:{u'benchmark_returns': 0.022480100091729405,
# u'returns': 0.03184566700000002,
# u'time': u'2006-02-14'}} 格式转化为:
# {key: []} 格式,其中 list 为对应 date 的一个回报率 list
for key in self.results.keys():
self.returns[key] = [x['returns'] for x in self.results[key]]
# 生成对于基准(或新基准)的超额收益率
for key in self.results.keys():
self.excess_returns[key] = [(x+1)/(y+1)-1 for (x,y) in zip(self.returns[key], self.benchmark_returns)]
# 生成 log 形式的收益率
for key in self.results.keys():
self.log_returns[key] = [log(x+1) for x in self.returns[key]]
# 生成超额收益率的 log 形式
for key in self.results.keys():
self.log_excess_returns[key] = [log(x+1) for x in self.excess_returns[key]]
# 生成超额收益率的最大回撤
for key in self.results.keys():
self.excess_max_drawdown[key] = self.find_max_drawdown(self.excess_returns[key])
# 生成年化超额收益率
for key in self.results.keys():
self.excess_annual_return[key] = (self.excess_returns[key][-1]+1)**(252./float(len(self.dates)))-1
# 把调参数据中的参数组合 df 与对应结果的 df 进行合并
self.evaluations_df = pd.concat([self.params_df, pd.DataFrame(self.evaluations).T], axis=1)
# self.evaluations_df =
# 获取最总分析数据,调用排队回测函数和数据整理的函数
def get_backtest_data(self,
algorithm_id=None, # 回测策略id
benchmark_id=None, # 新基准回测结果id
file_name='results.pkl', # 保存结果的 pickle 文件名字
running_max=10, # 最大同时运行回测数量
start_date='2006-01-01', # 回测开始时间
end_date='2016-11-30', # 回测结束日期
frequency='day', # 回测的运行频率
initial_cash='1000000', # 回测初始持仓资金
param_names=[], # 回测需要测试的变量
param_values=[] # 对应每个变量的备选参数
):
# 调运排队回测函数,传递对应参数
self.run_backtest(algorithm_id=algorithm_id,
running_max=running_max,
start_date=start_date,
end_date=end_date,
frequency=frequency,
initial_cash=initial_cash,
param_names=param_names,
param_values=param_values
)
# 回测结果指标中加入 log 收益率和超额收益率等指标
self.organize_backtest_results(benchmark_id)
# 生成 dict 保存所有结果。
results = {'returns':self.returns,
'excess_returns':self.excess_returns,
'log_returns':self.log_returns,
'log_excess_returns':self.log_excess_returns,
'dates':self.dates,
'benchmark_returns':self.benchmark_returns,
'evaluations':self.evaluations,
'params_df':self.params_df,
'backtest_ids':self.backtest_ids,
'excess_max_drawdown':self.excess_max_drawdown,
'excess_annual_return':self.excess_annual_return,
'evaluations_df':self.evaluations_df}
# 保存 pickle 文件
pickle_file = open(file_name, 'wb')
pickle.dump(results, pickle_file)
pickle_file.close()
# 读取保存的 pickle 文件,赋予类中的对象名对应的保存内容
def read_backtest_data(self, file_name='results.pkl'):
pickle_file = open(file_name, 'rb')
results = pickle.load(pickle_file)
self.returns = results['returns']
self.excess_returns = results['excess_returns']
self.log_returns = results['log_returns']
self.log_excess_returns = results['log_excess_returns']
self.dates = results['dates']
self.benchmark_returns = results['benchmark_returns']
self.evaluations = results['evaluations']
self.params_df = results['params_df']
self.backtest_ids = results['backtest_ids']
self.excess_max_drawdown = results['excess_max_drawdown']
self.excess_annual_return = results['excess_annual_return']
self.evaluations_df = results['evaluations_df']
# 回报率折线图
def plot_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.returns[key])), self.returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), self.benchmark_returns, label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('returns',fontsize=20)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.returns[0]))
# 多空组合图
def plot_long_short(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
a1 = [i+1 for i in self.returns[0]]
a2 = [i+1 for i in self.returns[4]]
a1.insert(0,1)
a2.insert(0,1)
b = []
for i in range(len(a1)-1):
b.append((a1[i+1]/a1[i]-a2[i+1]/a2[i])/2)
c = []
c.append(1)
for i in range(len(b)):
c.append(c[i]*(1+b[i]))
ax.plot(range(len(c)), c)
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
ax.set_title("Strategy's long_short performances",fontsize=20)
# 设置图片标题样式
plt.xlim(0, len(c))
return c
# 回测的4个主要指标,包括总回报率、最大回撤夏普率和波动
def get_eval4_bar(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
fig = plt.figure(figsize=(20,7))
# 定义位置
ax1 = fig.add_subplot(221)
# 设定横轴为对应分位,纵轴为对应指标
ax1.bar(range(len(indices)),
[self.evaluations[x]['algorithm_return'] for x in indices], 0.6, label = 'Algorithm_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax1.legend(loc='best',fontsize=15)
# 设置y标签样式
ax1.set_ylabel('Algorithm_return', fontsize=15)
# 设置y标签样式
ax1.set_yticklabels([str(x*100)+'% 'for x in ax1.get_yticks()])
# 设置图片标题样式
ax1.set_title("Strategy's of Algorithm_return performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax2 = fig.add_subplot(224)
# 设定横轴为对应分位,纵轴为对应指标
ax2.bar(range(len(indices)),
[self.evaluations[x]['max_drawdown'] for x in indices], 0.6, label = 'Max_drawdown')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax2.legend(loc='best',fontsize=15)
# 设置y标签样式
ax2.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax2.set_yticklabels([str(x*100)+'% 'for x in ax2.get_yticks()])
# 设置图片标题样式
ax2.set_title("Strategy's of Max_drawdown performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax3 = fig.add_subplot(223)
# 设定横轴为对应分位,纵轴为对应指标
ax3.bar(range(len(indices)),
[self.evaluations[x]['sharpe'] for x in indices], 0.6, label = 'Sharpe')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax3.legend(loc='best',fontsize=15)
# 设置y标签样式
ax3.set_ylabel('Sharpe', fontsize=15)
# 设置x标签样式
ax3.set_yticklabels([str(x*100)+'% 'for x in ax3.get_yticks()])
# 设置图片标题样式
ax3.set_title("Strategy's of Sharpe performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax4 = fig.add_subplot(222)
# 设定横轴为对应分位,纵轴为对应指标
ax4.bar(range(len(indices)),
[self.evaluations[x]['algorithm_volatility'] for x in indices], 0.6, label = 'Algorithm_volatility')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax4.legend(loc='best',fontsize=15)
# 设置y标签样式
ax4.set_ylabel('Algorithm_volatility', fontsize=15)
# 设置x标签样式
ax4.set_yticklabels([str(x*100)+'% 'for x in ax4.get_yticks()])
# 设置图片标题样式
ax4.set_title("Strategy's of Algorithm_volatility performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
5.2 运行分组回测
共十个模块,每个模块运行后输出进度。
def group_backtest(start_date,end_date,num):
warnings.filterwarnings("ignore")
pa = parameter_analysis()
pa.get_backtest_data(file_name = 'skr-results.pkl',
running_max = 2,
algorithm_id = '18fa361a5899b5f38565a195330c1bbc', # 趋势强度因子回测代码对应的策略ID
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['num'],
param_values = [num]
)
start_date = '2020-01-01'
end_date = '2025-06-25'
num = range(1,11)
group_backtest(start_date,end_date,num)
运行回测输出:
【已完成|运行中|待运行】:
[0|0|10].
[0|2|8].
[0|2|8].
[0|2|8].
[0|2|8].
[0|2|8].
[0|2|8].
5.3 分组回测结果
由图可以看出,总体分层效果还是明显的,较高的分组组合基本跑赢大盘,明显好于较低的分组组合,符合单因子有效性的检验。但具体到第一组和第二组(以及第九组和第十组)区别并不明显。
进一步分析,2022年以前分层效果较为稳定,2022年之后因子的有效性有所减弱,可能和投资者的交易情绪有关。
另外,因子的空头方表现较为稳定,虽然目前A股市场无有效的做空手段,但可以用因子的空头方对策略股票池做负向排除,实现避免踩雷的效果。
上面几张表分析了每个投资组合的评价指标,根据年化收益、年化波动率、夏普比率及最大回撤分析,并非严格单调,但也呈现出明显的分层效果。基本上满足随着组合数的递增,收益能力上升且风险控制能力下降的趋势,由此符合单因子有效性的检验。空头方效果明显,即便如今也可以利用趋势强度因子的空头进行负向排除。
左上角第一张图收益率分析,总的来看越靠后的收益越高(因子负相关,回测时未做处理)符合预期。第二组和第九组分层效果好于第一组和第十组,若将两组合并效果会更好。
六 总结
我们通过对所选取的高阶特征因子的有效性分析以及分层回测检验,初步得到以下结论:在 2022-01到2025-06中,因子值与下月收益率呈现一定的负相关性,我们考察因子的IR值,5日换仓因子稳定性较好,且在单位风险下,5日换仓因子具有最佳的有效性。
继续构建策略进行分层回测,选取五年数据。从分组回测来看,分层效果总体较好。多头组合整体上明显跑赢空头组合,符合单因子有效性的检验。但具体到第一组和第二组(以及第九组和第十组)区别并不明显,若将两组合并将会有更好的效果。