【新功能内测】第四周打卡:专家模式使用——均线指标计算节点
  18521037752 4天前 25 0

实现与量化策略深度赋能,不止于计算,更是策略逻辑的精准基石

🔑1.1 核心升级亮点(Week 3 实战淬炼)

✅ 数据安全三重防护:列名校验 + 类型转换 + 边界预警,杜绝“静默失败”

✅ NaN智能处理策略:保留原始逻辑(默认不填充),避免未来函数陷阱

image.png

✅ 多均线协同支持:单节点扩展至双均线计算(金叉/死叉信号预埋)

✅ 诊断级日志系统:精准提示“前19行因窗口不足为NaN",加速问题定位

image.png

✅ 策略友好命名规范:MA_5_SMA → SIGNAL_CROSS,无缝对接信号生成节点

💻1.2 均线在量化回测中的核心作用

✅ 判断大趋势:价格在均线上方运行,通常被视为多头(看涨)市场;在均线下方运行,则是空头(看跌)市场。
✅ 寻找支撑与阻力:在上涨趋势中,均线往往会像一张“蹦床”一样托住回调的价格(支撑位);在下跌趋势中,均线又会变成压制价格反弹的“天花板”(阻力位)。
✅ 生成买卖信号:这是你在回测节点中最常用的功能。比如经典的“双均线策略”:当短期均线(如5周期)向上穿越长期均线(如20周期)时形成“金叉”,产生买入信号;反之形成“死叉”,产生卖出信号。

import pandas as pd
import numpy as np
from panda_plugins.base import BaseWorkNode, work_node, ui
from pydantic import BaseModel, Field
# 兼容 V1 验证器
from pydantic.v1 import validator as field_validator_v1

# =============== 1. 增强型输入模型 ===============
@ui(
    window_short={
        "input_type": "number_input", 
        "min": 2, 
        "max": 100, 
        "placeholder": "短期周期(如5)"
    },
    window_long={
        "input_type": "number_input", 
        "min": 5, 
        "max": 200, 
        "placeholder": "长期周期(如20)"
    },
    ma_type={
        "input_type": "select",
        "options": [
            {"label": "SMA(趋势跟踪)", "value": "SMA"},
            {"label": "EMA(灵敏反转)", "value": "EMA"},
            {"label": "双均线交叉信号", "value": "DUAL"}
        ],
        "tooltip": "DUAL模式将直接生成cross_signal列: 1=金叉, -1=死叉"
    },
    handle_nan={
        "input_type": "radio",
        "options": [
            {"label": "保留NaN (推荐回测)", "value": "keep"},
            {"label": "前向填充 (仅调试)", "value": "ffill"}
        ],
        "tooltip": "警告: 填充NaN可能导致未来函数! 仅在仿真调试时启用"
    },
    use_atr_adaptation={
        "input_type": "switch",
        "label": "启用ATR自适应周期",
        "tooltip": "根据市场波动率自动调整均线周期,过滤噪音"
    }
)
class EnhancedMACalculatorInput(BaseModel):
    market_data: Any = Field(default=None, title="K线数据 (需含'close'列)")
    window_short: int = Field(default=5, title="短期周期")
    window_long: int = Field(default=20, title="长期周期")
    ma_type: str = Field(default="SMA", title="均线类型")
    handle_nan: str = Field(default="keep", title="NaN处理策略")
    use_atr_adaptation: bool = Field(default=False, title="是否启用ATR自适应")

    # 使用 V1 验证器兼容
    @field_validator_v1('window_long')
    def validate_windows(cls, v, values):
        if 'window_short' in values and v <= values['window_short']:
            raise ValueError(f"长期周期({v})必须大于短期周期({values['window_short']})")
        return v


# =============== 2. 诊断增强型输出模型 ===============
class EnhancedMAOutput(BaseModel):
    processed_data: Any = Field(default=None, title="含均线列的数据")
    message: str = Field(default="", title="执行摘要")
    warnings: list = Field(default_factory=list, title="️ 重要提示")


# =============== 3. 工业级节点逻辑 ===============
@work_node(name="【增强】均线指标计算", group="因子工程")
class EnhancedMACalculator(BaseWorkNode):
    
    @classmethod
    def input_model(cls) -> Optional[Type[BaseModel]]:
        return EnhancedMACalculatorInput
        
    @classmethod
    def output_model(cls) -> Optional[Type[BaseModel]]:
        return EnhancedMAOutput
        
    def run(self, input: EnhancedMACalculatorInput) -> EnhancedMAOutput:
        # === 阶段1:数据安全校验与预处理 ===
        if input.market_data is None or input.market_data.empty:
            return EnhancedMAOutput(
                message=" 未检测到有效市场数据,请检查上游节点连接",
                warnings=["上游数据流为空"]
            )
        
        try:
            df = input.market_data.copy()
            if 'close' not in df.columns:
                return EnhancedMAOutput(
                    message=f" 缺失'close'列! 可用列: {list(df.columns)}",
                    warnings=[f"缺失关键列: close"]
                )
            df['close'] = pd.to_numeric(df['close'], errors='coerce')
            if df['close'].isnull().all():
                return EnhancedMAOutput(message=" 'close'列无法转换为数值", warnings=["数据格式错误"])
        except Exception as e:
            return EnhancedMAOutput(message=f" 数据预处理失败: {str(e)}", warnings=[str(e)])
        
        # === 阶段2:动态自适应周期计算 (修复了括号语法) ===
        short_period = input.window_short
        long_period = input.window_long
        
        if input.use_atr_adaptation:
            try:
                # 计算TR和ATR
                df['tr0'] = abs(df['high'] - df['low'])
                df['tr1'] = abs(df['high'] - df['close'].shift(1))
                df['tr2'] = abs(df['low'] - df['close'].shift(1))
                df['tr'] = df[['tr0', 'tr1', 'tr2']].max(axis=1)
                df['atr'] = df['tr'].rolling(window=14).mean()
                
                # 获取最新ATR值 (修复了括号)
                atr_series = df['atr'].dropna()
                if not atr_series.empty:
                    current_atr = atr_series.iloc[-1]
                    price_level = df['close'].iloc[-1]
                    volatility_ratio = current_atr / price_level
                    
                    # 修复点:修正了此处的括号匹配错误
                    short_period = max(3, min(50, int(input.window_short * (1 + volatility_ratio * 10))))
                    long_period = max(8, min(100, int(input.window_long * (1 + volatility_ratio * 5))))
                
            except Exception as e:
                # 如果缺少 high/low 列导致报错,则忽略 ATR 自适应
                pass
        
        # === 阶段3:核心均线计算 ===
        ma_type = input.ma_type.upper()
        warnings = []
        calc_cols = []
        
        try:
            if ma_type == "SMA":
                col_name = f"MA_{short_period}_SMA"
                df[col_name] = df['close'].rolling(window=short_period).mean()
                calc_cols.append(col_name)
            
            elif ma_type == "EMA":
                col_name = f"MA_{short_period}_EMA"
                df[col_name] = df['close'].ewm(span=short_period, adjust=False).mean()
                calc_cols.append(col_name)
            
            elif ma_type == "DUAL":
                short_col = f"MA_{short_period}_FAST"
                long_col = f"MA_{long_period}_SLOW"
                df[short_col] = df['close'].rolling(window=short_period).mean()
                df[long_col] = df['close'].rolling(window=long_period).mean()
                
                # 计算交叉信号
                df['cross_signal'] = 0
                valid_mask = ~(df[short_col].isna() | df[long_col].isna())
                golden_cross = (df[short_col] > df[long_col]) & (df[short_col].shift(1) <= df[long_col].shift(1))
                death_cross = (df[short_col] < df[long_col]) & (df[short_col].shift(1) >= df[long_col].shift(1))
                df.loc[valid_mask & golden_cross, 'cross_signal'] = 1
                df.loc[valid_mask & death_cross, 'cross_signal'] = -1
                
                calc_cols = [short_col, long_col, 'cross_signal']
                warnings.append(" 已生成cross_signal: 1=金叉, -1=死叉, 0=无信号")
        
            # === 阶段4:NaN处理策略 ===
            if input.handle_nan == "ffill" and calc_cols:
                original_nan = df[calc_cols].isna().sum().sum()
                df[calc_cols] = df[calc_cols].ffill()
                if original_nan > 0:
                    warnings.append(f"️ 已填充{original_nan}个NaN (仅限调试!)")
            
            # === 阶段5:结果封装 ===
            summary = f"✓ 成功计算 {ma_type} | 数据行数: {len(df)} | 新增列: {', '.join(calc_cols)}"
            if input.use_atr_adaptation:
                summary += f" | ATR自适应周期: 短期{short_period}/长期{long_period}"
                
            return EnhancedMAOutput(
                processed_data=df,
                message=summary,
                warnings=warnings
            )
            
        except Exception as e:
            return EnhancedMAOutput(
                message=f" 计算异常: {type(e).__name__} - {str(e)}",
                warnings=[f"建议: 检查数据长度是否超过设定的周期"]
            )

# =============== 4. 本地测试模块 ===============
if __name__ == "__main__":
    node = EnhancedMACalculator()
    test_input = EnhancedMACalculatorInput(window_short=5, window_long=20, ma_type="SMA")
    print(node.run(test_input))

7a378e7e6cb6433d8d41dcd613d71abc.png

🌟操作步骤:
🔹点击“+”号添加代码框;
🔹鼠标右键,选择“填充节点模版”;
🔹点击运行,测试功能是否正常or报错;
🔹左键点击节点标题(紫色的字),拖动到画布中,即可生成节点。

📌 返回节点库,可以看到“用户自定义节点”
🔹在节点库中,可对自定义节点进行“编辑”和删除;
🔹节点编辑框的右上角有“插件帮助文档”;
🔹如有修改,记得点“保存”!

【特别提示】
!!!在工作流画布中创建或修改了节点后,务必“Ctrl+S”!!!
否则,等再点开工作流,将收获一片荒芜……

💡操作体验:
🔹创建“用户自定义节点”,自由度高,支持实现定制化,更适合有代码功底的用户。
🔹在有节点模板的情况下,可以在基础模版上进行调整,善用“插件帮助文档”。
🔹节点库中包含大部分常用功能,即使不创建自定义节点,也基本够用√。
🔹【强烈建议】!!!在有修改的地方,用户退出界面时进行提示,例如:“您已对XXX进行修改,是否保存”。

image.png

最后一次编辑于 4天前 0

暂无评论

推荐阅读
  tyler   24小时前   16   0   0 新手入门