备注
节点:文中所用节点用反引号包裹。e.g.Boll & RSI因子构建.py。- 个人结合自己的学习和AI辅助将备注全部写入代码当中,方便于读者阅读。
- 具体函数请参阅:https://www.pandaai.online/community/article/72
一.构建期货因子工作流
- prompt:基于布林带和RSI生成期货的因子分析工作流
阅读并注释Boll & RSI因子构建.py节点代码
# 布林带 + RSI + 价量确认”的复合因子
class BollRsiCompositeFactor(Factor):
def calculate(self, factors):
close = factors['close']
high = factors['high']
low = factors['low']
volume = factors['volume']
# --- 1. 布林带相关信号(趋势+极端) ---
n_boll = 20 # 布林带取值时间窗口
p_boll = 2 # 布林带标准差倍数
mid = BOLL_MID(close, n_boll, p_boll) # 20日收盘价均线作为布林带中轨
upper = BOLL_UPPER(close, n_boll, p_boll) # 20日均线mid + 2倍标准差作为布林带上轨
lower = BOLL_LOWER(close, n_boll, p_boll) # 20日均线mid - 2倍标准差作为布林带下轨
# 价格在布林带中的相对位置(0=下轨,1=上轨,>1或<0为突破布林带属于极端区域)
price_pos = (close - lower) / (upper - lower + 1e-8)
price_pos = SCALE(price_pos) # 做截面标准化使得取值映射到 [-1,1]
# 布林带宽度衡量波动环境,做时序zscore,再截面归一
width = BOLL_WIDTH(close, n_boll) # 布林带宽度,上下轨之差,等于4个标准差,衡量波动率环境(越宽波动越大)。
width_z = TS_ZSCORE(width, 60) # 布林带宽度在60日的滚动Z-Score 值
width_z_cs = SCALE(width_z) # 做截面标准化使得取值映射到 [-1,1]
# --- 2. RSI动量信号 ---
n_rsi = 14
rsi = RSI(close, n_rsi) # 使用 14 日 RSI # 30以下认为超卖(负),70以上超买(正)
rsi_signal = (rsi - 50.0) / 50.0 # 把 50 作为中性点
rsi_signal = SCALE(rsi_signal) # 标准化到[-1,1]
# --- 3. 价量确认(期货通常量价共振更可靠) ---
vol_ma = MA(volume, 20) # 20 日平均成交量
vol_ratio = volume / (vol_ma + 1e-8) # 当前成交量相对自身 20 日均量的倍数
vol_signal = TS_ZSCORE(vol_ratio, 60) # 相对于过去 60 日的放量程度做 z-score(时序标准化)
vol_signal = SCALE(vol_signal) # 截面标准化映射[-1,1],使得同日不同标的可比
# --- 4. 信号方向逻辑 ---
# 趋势跟随 + 超买超卖逆向:
# 1)趋势:价格在中轨上方倾向多,在下方倾向空
trend_signal = SCALE(close - mid) # 取值范围[-1,1]
# 2)极端位置:接近上轨+RSI高 -> 做空;接近下轨+RSI低 -> 做多
# price_pos_cs >0 表示偏上方,<0 偏下方(已SCALE)
price_pos_cs = price_pos
# 极端逆向信号(与趋势信号相反)
extreme_contra = -price_pos_cs * rsi_signal # e.g.布林带下轨&rsi超卖,两者相乘负负为正,加负号为负值。布林带上轨&rsi超买,两者相乘正正为正,加负号为负值。
extreme_contra = SCALE(extreme_contra)
# --- 5. 组合权重 ---
# 在波动较大时,更多信任趋势;波动较小时,更重视RSI反转
# width_z_cs >0 表示高波动
trend_weight = (width_z_cs + 1.0) / 2.0 # 转到[0,1]
trend_weight = MIN(MAX(trend_weight, 0), 1) # 截断0~1,此步骤是否多余?接近0为低波动,接近1为高波动
contra_weight = 1.0 - trend_weight # 取值范围[0,1],接近0为高波动,接近1为低波动
# 基础方向信号:趋势 + 极端逆向
# 高波动环境(宽布林)通常趋势性更强 → 更信任趋势跟随信号。
# 低波动/震荡环境 → 更信任极端+RSI 的反转信号。(但此RSI反转信号,强上涨横盘触下轨,强下跌触上轨都是正值,然而后续上涨下跌并不好判断需结合别的因子此处从逻辑上对因子收益应该无贡献,是混乱的。)
base_signal = trend_weight * trend_signal + contra_weight * extreme_contra
# 价量共振加权:放大在放量时的信号
vol_amp = (vol_signal + 1.5) / 2.5 # 大致缩到[0,1]附近
vol_amp = MIN(MAX(vol_amp, 0), 2) # 控制放大倍数2,此处是否为多余?
composite_raw = base_signal * vol_amp
# --- 6. 最终归一化,防止极端值 ---
result = SCALE(TS_ZSCORE(composite_raw, 60))
# 替换残缺值
result = IF(result != result, 0, result) # 简单去NaN
return result
- 运行工作流
二.构建期货回测工作流
- prompt:按照“线性因子构建”模块的输出去调仓,新建并连接期货回测模块,生成2024-01-01到2025-01-01日期的回测工作流。
AI写出一个将线性因子传入期货回测节点的期货交易.py
from panda_backtest.api.api import *
from panda_backtest.api.stock_api import *
import panda_data
import pandas as pd
import numpy as np
import re
def _extract_product(symbol: str) -> str:
"""从合约代码中提取品种代码,例如:
A2509.DCE -> A
AG2508.SHF -> AG
"""
if not isinstance(symbol, str):
return ""
m = re.match(r"^([A-Za-z]+)\d", symbol)
return m.group(1).upper() if m else symbol.upper()
def initialize(context):
"""策略初始化
约定:工作流中线性因子构建节点输出通过 df_factor 传入,格式至少包含:
date, symbol, factor_value
或多因子:date, symbol, factor1, factor2, ...
本策略将对当期所有因子列取均值作为打分。
"""
# 期货账户(回测环境固定5588)
context.account = "5588"
# 选品与仓位参数
context.top_n = 10 # 每周买入因子得分排名前10个品种
context.risk_per_symbol = None # 这里采用“满仓等权”,通过手数归一来实现
# 调仓控制:每周一调仓
context.last_rebalance_date = None
# 预处理因子数据
if hasattr(context, "df_factor") and context.df_factor is not None and not context.df_factor.empty:
df = context.df_factor.copy()
# 统一日期格式为 YYYYMMDD 字符串
df["date"] = df["date"].astype(str).str.replace("-", "", regex=False)
# 因子列:除 date、symbol 以外的所有数值列
base_cols = {"date", "symbol"}
factor_cols = [c for c in df.columns if c not in base_cols]
if not factor_cols:
# 如果没有显式因子列,尝试使用 factor_value 这一常用命名
if "factor_value" in df.columns:
factor_cols = ["factor_value"]
else:
# 没有因子列,直接报错提示
print("df_factor 中未找到因子列,需至少存在一个因子列")
# 将因子列转为数值并填充缺失
if factor_cols:
df[factor_cols] = df[factor_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0)
# 计算综合得分
if len(factor_cols) == 1:
df["score"] = df[factor_cols[0]]
else:
df["score"] = df[factor_cols].mean(axis=1)
else:
df["score"] = 0.0
# 去掉不能交易的 DOMINANT 虚拟标的(如果线性因子是直接作用在具体合约,可保留)
if "symbol" in df.columns:
df = df[~df["symbol"].astype(str).str.contains("DOMINANT", case=False, na=False)]
# 添加品种字段
df["product"] = df["symbol"].apply(_extract_product)
context.df_factor = df
# 预加载主力合约映射:(品种, 日期) -> 主力合约代码
products = sorted(df["product"].dropna().unique().tolist())
if products:
start_date = df["date"].min()
end_date = df["date"].max()
try:
dom_df = panda_data.get_future_dominant(
underlying_symbol=products,
start_date=start_date,
end_date=end_date,
)
context.dominant_map = {}
if dom_df is not None and not dom_df.empty:
# 保证 date 是 YYYYMMDD 字符串
dom_df["date"] = dom_df["date"].astype(str).str.replace("-", "", regex=False)
for _, row in dom_df.iterrows():
key = (str(row["underlying_symbol"]).upper(), str(row["date"]))
context.dominant_map[key] = str(row["symbol"])
else:
context.dominant_map = {}
except Exception as e:
print(f"加载主力合约映射失败: {e}")
context.dominant_map = {}
else:
context.dominant_map = {}
print("因子数据预处理完成:")
print(f" 行数: {len(context.df_factor)}")
print(f" 日期范围: {context.df_factor['date'].min()} ~ {context.df_factor['date'].max()}")
print(f" 品种数: {len(products)}")
else:
context.df_factor = pd.DataFrame()
context.dominant_map = {}
print("未检测到因子数据 df_factor,策略将不进行调仓")
def _is_monday(trade_date: str) -> bool:
"""判断当前日期是否为周一(基于交易日历)"""
try:
cal = panda_data.get_trading_calendar(
start_date=trade_date,
end_date=trade_date,
exchange="SH",
is_trading_day=None,
fields=["nature_date", "is_trade"],
)
if cal is None or cal.empty:
return False
# nature_date 为 int 或 str 的 YYYYMMDD
dt_str = str(cal.iloc[0]["nature_date"])
dt_str = dt_str.replace("-", "")
# 转为 datetime 计算周几
dt = pd.to_datetime(dt_str, format="%Y%m%d", errors="coerce")
if pd.isna(dt):
return False
# Monday = 0
return dt.weekday() == 0 and int(cal.iloc[0]["is_trade"]) == 1
except Exception as e:
print(f"判断是否周一失败: {e}")
return False
def _select_products_today(context, trade_date: str):
"""根据今日因子得分选出前 top_n 个品种,返回 [(product, score), ...]"""
if context.df_factor is None or context.df_factor.empty:
return []
df = context.df_factor
df_today = df[df["date"] == trade_date]
if df_today.empty:
return []
# 按品种聚合:每个品种取得分最高的合约
df_sorted = df_today.sort_values("score", ascending=False)
df_dedup = df_sorted.drop_duplicates(subset="product", keep="first")
# 取前 top_n 品种
df_top = df_dedup.head(context.top_n)
result = [(row["product"], float(row["score"])) for _, row in df_top.iterrows()]
return result
def _get_dominant_contract(context, product: str, trade_date: str):
"""从预加载映射中获取指定品种在某日的主力合约代码"""
if not hasattr(context, "dominant_map"):
return None
key = (str(product).upper(), trade_date)
return context.dominant_map.get(key)
def _calc_equal_weight_hands(futures_account, price_map: dict, symbols: list):
"""根据“满仓等权”思想计算每个标的的目标手数
简化处理:
- 假设合约乘数为 1 或通过合约乘数统一考虑
- 将全部资金按标的数量等分
- 每个标的手数 = floor( (总权益 / N) / (价格 * 合约乘数) )
- 至少为 1 手
"""
if futures_account is None:
return {}
total_value = futures_account.total_value
n = len(symbols)
if total_value is None or total_value <= 0 or n == 0:
return {}
# 获取合约乘数
try:
mul_df = panda_data.get_future_list(symbol=symbols, fields=["symbol", "contract_multiplier"], is_trading=None)
mul_map = {}
if mul_df is not None and not mul_df.empty:
for _, row in mul_df.iterrows():
mul_map[str(row["symbol"])] = float(row["contract_multiplier"] or 1.0)
else:
mul_map = {s: 1.0 for s in symbols}
except Exception as e:
print(f"获取合约乘数失败: {e}")
mul_map = {s: 1.0 for s in symbols}
alloc_per_symbol = total_value / n
target = {}
for sym in symbols:
price = price_map.get(sym, 0.0)
mul = mul_map.get(sym, 1.0)
if price is None or price <= 0 or mul <= 0:
continue
hand = int(np.floor(alloc_per_symbol / (price * mul)))
if hand <= 0:
hand = 1
target[sym] = hand
return target
def handle_data(context, data):
"""每个bar触发(假设为日线或分钟线),周一开盘调仓一次,其余时间不交易"""
trade_date = str(context.now) # YYYYMMDD
# 仅在周一第一次触发时调仓
if not _is_monday(trade_date):
return
if context.last_rebalance_date == trade_date:
# 当天已调过仓
return
context.last_rebalance_date = trade_date
# 获取期货账户
futures_account = context.future_account_dict.get(context.account)
if futures_account is None:
print(f"未找到期货账户 {context.account}")
return
# 1. 根据今日因子得分选出品种(品种级别)
selected_products = _select_products_today(context, trade_date)
if not selected_products:
print(f"{trade_date} 无可用因子数据或未选出标的,跳过本次调仓")
return
# 2. 映射到当日主力合约
target_symbols = []
for product, score in selected_products:
dom = _get_dominant_contract(context, product, trade_date)
if dom:
target_symbols.append(dom)
else:
print(f"{trade_date} 品种 {product} 未找到主力合约,跳过")
# 去重
target_symbols = list(dict.fromkeys(target_symbols))
if not target_symbols:
print(f"{trade_date} 无有效主力合约,跳过本次调仓")
return
# 3. 获取主力合约当日价格(收盘价或当前bar收盘价)
price_map = {}
for sym in target_symbols:
try:
bar = data[sym]
if bar and bar.close and bar.close > 0:
price_map[sym] = float(bar.close)
except Exception:
# 如果当前bar没有,尝试用日线历史价格
try:
md = panda_data.get_market_data(
symbol=[sym],
start_date=trade_date,
end_date=trade_date,
type="future",
fields=["close"],
indicator="",
st=None,
)
if md is not None and not md.empty:
price_map[sym] = float(md.iloc[0]["close"])
except Exception:
continue
# 过滤没有价格的合约
target_symbols = [s for s in target_symbols if s in price_map]
if not target_symbols:
print(f"{trade_date} 无可用价格的主力合约,跳过本次调仓")
return
# 4. 等权满仓计算每个合约的目标手数
long_target = _calc_equal_weight_hands(futures_account, price_map, target_symbols)
if not long_target:
print(f"{trade_date} 计算目标仓位失败或手数为0,跳过本次调仓")
return
# 5. 执行目标持仓:多头等权,空头全部平掉
try:
# 空头目标设为空字典,表示不保留任何空头仓位
target_future_group_order(context.account, long_target, {})
print(f"{trade_date} 调仓完成,多头目标: {long_target}")
except Exception as e:
print(f"{trade_date} 执行目标持仓下单失败: {e}")
# 降级:逐合约设置多头目标
for sym, hands in long_target.items():
try:
long_future_target(context.account, sym, hands, style=MarketOrderStyle)
except Exception as e2:
print(f"{trade_date} 单合约下单失败 {sym}: {e2}")
def before_trading(context):
# 可选:打印账户概览
trade_date = str(context.now)
futures_account = context.future_account_dict.get(getattr(context, "account", "5588"))
if futures_account:
print(f"{trade_date} 开盘前,总权益: {futures_account.total_value}, 可用资金: {futures_account.cash}")
def after_trading(context):
# 可选:收盘后打印持仓信息
trade_date = str(context.now)
futures_account = context.future_account_dict.get(getattr(context, "account", "5588"))
if futures_account:
positions = futures_account.positions
print(f"{trade_date} 收盘后持仓:")
for sym, pos in positions.items():
print(f" {sym}: 多头 {pos.buy_quantity}, 空头 {pos.sell_quantity}, pnl={pos.pnl}")
2.运行工作流
三.仿真实盘搭建
参考:https://ncn9g4d5xvof.feishu.cn/minutes/obcn5wb26df6mc98op9k9kkk?from=ai_minutes
总结
- PandaAI的Agent经过训练,明显比通用的Agent更加精细化,答案也是很专业。同时是一个很好的学习辅助帮手。
- 代码细节还是需要自己学习,学习函数背后的计算式,这样才能和AI交互做好策略。
- 策略是一个不断迭代的过程,先跑通再快速迭代就能不断进步。