实现与量化策略深度赋能,不止于计算,更是策略逻辑的精准基石
🔑1.1 核心升级亮点(Week 3 实战淬炼)
✅ 数据安全三重防护:列名校验 + 类型转换 + 边界预警,杜绝“静默失败”
✅ NaN智能处理策略:保留原始逻辑(默认不填充),避免未来函数陷阱
✅ 多均线协同支持:单节点扩展至双均线计算(金叉/死叉信号预埋)
✅ 诊断级日志系统:精准提示“前19行因窗口不足为NaN",加速问题定位
✅ 策略友好命名规范:MA_5_SMA → SIGNAL_CROSS,无缝对接信号生成节点
💻1.2 均线在量化回测中的核心作用
✅ 判断大趋势:价格在均线上方运行,通常被视为多头(看涨)市场;在均线下方运行,则是空头(看跌)市场。
✅ 寻找支撑与阻力:在上涨趋势中,均线往往会像一张“蹦床”一样托住回调的价格(支撑位);在下跌趋势中,均线又会变成压制价格反弹的“天花板”(阻力位)。
✅ 生成买卖信号:这是你在回测节点中最常用的功能。比如经典的“双均线策略”:当短期均线(如5周期)向上穿越长期均线(如20周期)时形成“金叉”,产生买入信号;反之形成“死叉”,产生卖出信号。
import pandas as pd
import numpy as np
from panda_plugins.base import BaseWorkNode, work_node, ui
from pydantic import BaseModel, Field
# 兼容 V1 验证器
from pydantic.v1 import validator as field_validator_v1
# =============== 1. 增强型输入模型 ===============
@ui(
window_short={
"input_type": "number_input",
"min": 2,
"max": 100,
"placeholder": "短期周期(如5)"
},
window_long={
"input_type": "number_input",
"min": 5,
"max": 200,
"placeholder": "长期周期(如20)"
},
ma_type={
"input_type": "select",
"options": [
{"label": "SMA(趋势跟踪)", "value": "SMA"},
{"label": "EMA(灵敏反转)", "value": "EMA"},
{"label": "双均线交叉信号", "value": "DUAL"}
],
"tooltip": "DUAL模式将直接生成cross_signal列: 1=金叉, -1=死叉"
},
handle_nan={
"input_type": "radio",
"options": [
{"label": "保留NaN (推荐回测)", "value": "keep"},
{"label": "前向填充 (仅调试)", "value": "ffill"}
],
"tooltip": "警告: 填充NaN可能导致未来函数! 仅在仿真调试时启用"
},
use_atr_adaptation={
"input_type": "switch",
"label": "启用ATR自适应周期",
"tooltip": "根据市场波动率自动调整均线周期,过滤噪音"
}
)
class EnhancedMACalculatorInput(BaseModel):
market_data: Any = Field(default=None, title="K线数据 (需含'close'列)")
window_short: int = Field(default=5, title="短期周期")
window_long: int = Field(default=20, title="长期周期")
ma_type: str = Field(default="SMA", title="均线类型")
handle_nan: str = Field(default="keep", title="NaN处理策略")
use_atr_adaptation: bool = Field(default=False, title="是否启用ATR自适应")
# 使用 V1 验证器兼容
@field_validator_v1('window_long')
def validate_windows(cls, v, values):
if 'window_short' in values and v <= values['window_short']:
raise ValueError(f"长期周期({v})必须大于短期周期({values['window_short']})")
return v
# =============== 2. 诊断增强型输出模型 ===============
class EnhancedMAOutput(BaseModel):
processed_data: Any = Field(default=None, title="含均线列的数据")
message: str = Field(default="", title="执行摘要")
warnings: list = Field(default_factory=list, title="️ 重要提示")
# =============== 3. 工业级节点逻辑 ===============
@work_node(name="【增强】均线指标计算", group="因子工程")
class EnhancedMACalculator(BaseWorkNode):
@classmethod
def input_model(cls) -> Optional[Type[BaseModel]]:
return EnhancedMACalculatorInput
@classmethod
def output_model(cls) -> Optional[Type[BaseModel]]:
return EnhancedMAOutput
def run(self, input: EnhancedMACalculatorInput) -> EnhancedMAOutput:
# === 阶段1:数据安全校验与预处理 ===
if input.market_data is None or input.market_data.empty:
return EnhancedMAOutput(
message=" 未检测到有效市场数据,请检查上游节点连接",
warnings=["上游数据流为空"]
)
try:
df = input.market_data.copy()
if 'close' not in df.columns:
return EnhancedMAOutput(
message=f" 缺失'close'列! 可用列: {list(df.columns)}",
warnings=[f"缺失关键列: close"]
)
df['close'] = pd.to_numeric(df['close'], errors='coerce')
if df['close'].isnull().all():
return EnhancedMAOutput(message=" 'close'列无法转换为数值", warnings=["数据格式错误"])
except Exception as e:
return EnhancedMAOutput(message=f" 数据预处理失败: {str(e)}", warnings=[str(e)])
# === 阶段2:动态自适应周期计算 (修复了括号语法) ===
short_period = input.window_short
long_period = input.window_long
if input.use_atr_adaptation:
try:
# 计算TR和ATR
df['tr0'] = abs(df['high'] - df['low'])
df['tr1'] = abs(df['high'] - df['close'].shift(1))
df['tr2'] = abs(df['low'] - df['close'].shift(1))
df['tr'] = df[['tr0', 'tr1', 'tr2']].max(axis=1)
df['atr'] = df['tr'].rolling(window=14).mean()
# 获取最新ATR值 (修复了括号)
atr_series = df['atr'].dropna()
if not atr_series.empty:
current_atr = atr_series.iloc[-1]
price_level = df['close'].iloc[-1]
volatility_ratio = current_atr / price_level
# 修复点:修正了此处的括号匹配错误
short_period = max(3, min(50, int(input.window_short * (1 + volatility_ratio * 10))))
long_period = max(8, min(100, int(input.window_long * (1 + volatility_ratio * 5))))
except Exception as e:
# 如果缺少 high/low 列导致报错,则忽略 ATR 自适应
pass
# === 阶段3:核心均线计算 ===
ma_type = input.ma_type.upper()
warnings = []
calc_cols = []
try:
if ma_type == "SMA":
col_name = f"MA_{short_period}_SMA"
df[col_name] = df['close'].rolling(window=short_period).mean()
calc_cols.append(col_name)
elif ma_type == "EMA":
col_name = f"MA_{short_period}_EMA"
df[col_name] = df['close'].ewm(span=short_period, adjust=False).mean()
calc_cols.append(col_name)
elif ma_type == "DUAL":
short_col = f"MA_{short_period}_FAST"
long_col = f"MA_{long_period}_SLOW"
df[short_col] = df['close'].rolling(window=short_period).mean()
df[long_col] = df['close'].rolling(window=long_period).mean()
# 计算交叉信号
df['cross_signal'] = 0
valid_mask = ~(df[short_col].isna() | df[long_col].isna())
golden_cross = (df[short_col] > df[long_col]) & (df[short_col].shift(1) <= df[long_col].shift(1))
death_cross = (df[short_col] < df[long_col]) & (df[short_col].shift(1) >= df[long_col].shift(1))
df.loc[valid_mask & golden_cross, 'cross_signal'] = 1
df.loc[valid_mask & death_cross, 'cross_signal'] = -1
calc_cols = [short_col, long_col, 'cross_signal']
warnings.append(" 已生成cross_signal: 1=金叉, -1=死叉, 0=无信号")
# === 阶段4:NaN处理策略 ===
if input.handle_nan == "ffill" and calc_cols:
original_nan = df[calc_cols].isna().sum().sum()
df[calc_cols] = df[calc_cols].ffill()
if original_nan > 0:
warnings.append(f"️ 已填充{original_nan}个NaN (仅限调试!)")
# === 阶段5:结果封装 ===
summary = f"✓ 成功计算 {ma_type} | 数据行数: {len(df)} | 新增列: {', '.join(calc_cols)}"
if input.use_atr_adaptation:
summary += f" | ATR自适应周期: 短期{short_period}/长期{long_period}"
return EnhancedMAOutput(
processed_data=df,
message=summary,
warnings=warnings
)
except Exception as e:
return EnhancedMAOutput(
message=f" 计算异常: {type(e).__name__} - {str(e)}",
warnings=[f"建议: 检查数据长度是否超过设定的周期"]
)
# =============== 4. 本地测试模块 ===============
if __name__ == "__main__":
node = EnhancedMACalculator()
test_input = EnhancedMACalculatorInput(window_short=5, window_long=20, ma_type="SMA")
print(node.run(test_input))
🌟操作步骤:
🔹点击“+”号添加代码框;
🔹鼠标右键,选择“填充节点模版”;
🔹点击运行,测试功能是否正常or报错;
🔹左键点击节点标题(紫色的字),拖动到画布中,即可生成节点。
📌 返回节点库,可以看到“用户自定义节点”
🔹在节点库中,可对自定义节点进行“编辑”和删除;
🔹节点编辑框的右上角有“插件帮助文档”;
🔹如有修改,记得点“保存”!
【特别提示】
!!!在工作流画布中创建或修改了节点后,务必“Ctrl+S”!!!
否则,等再点开工作流,将收获一片荒芜……
💡操作体验:
🔹创建“用户自定义节点”,自由度高,支持实现定制化,更适合有代码功底的用户。
🔹在有节点模板的情况下,可以在基础模版上进行调整,善用“插件帮助文档”。
🔹节点库中包含大部分常用功能,即使不创建自定义节点,也基本够用√。
🔹【强烈建议】!!!在有修改的地方,用户退出界面时进行提示,例如:“您已对XXX进行修改,是否保存”。