专家模式自定义节点功能初体验

1.自定义节点初步了解

image.png

image.png

  • 打开专家模式然后,在notebook上点击可以填充节点模版代码,里面有测试节点的示例;
  • 图片看不清的话,我把源码粘贴到了下面,可以看看自定义节点的代码结构是怎样,也可以丢给AI去学习,本人代码能力基本为零也是通过AI再一点点摸索,
    在就是你要知道你想创造的节点是要起什么作用,你的自定义节点要和那些节点连接的,可以在专家模式里面在看看其他节点的代码结构是怎样的,要不然
    你也没办法让AI明白你要做什么。
from typing import Optional, Type
from panda_plugins.base import BaseWorkNode, work_node
from pydantic import BaseModel

class InputModel(BaseModel):
   """
   Define the input model for the node.
   Use pydantic to define, which is a library for data validation and parsing.
   Reference: https://pydantic-docs.helpmanual.io
   
   
   为工作节点定义输入模型.
   使用 Pydantic 定义, Pydantic 是一个用于数据验证和解析的库.
   参考文档: https://pydantic-docs.helpmanual.io
   """
   number1: int
   number2: int

class OutputModel(BaseModel):
   """
   Define the output model for the node.
   Use pydantic to define, which is a library for data validation and parsing.
   Reference: https://pydantic-docs.helpmanual.io
   
   为工作节点定义输出模型.
   使用 Pydantic 定义, Pydantic 是一个用于数据验证和解析的库.
   参考文档: https://pydantic-docs.helpmanual.io
   """
   result: int

@work_node(name="示例-两数求和", group="测试节点")
class ExamplePluginAddition(BaseWorkNode):
   """
   Implement a example node, which can add two numbers and return the result.
   实现一个示例节点, 完成一个简单的加法运算, 输入 2 个数值, 输出 2 个数值的和.
   """

# Return the input model
# 返回输入模型
   @classmethod
   def input_model(cls) -> Optional[Type[BaseModel]]:
      return InputModel

# Return the output model
# 返回输出模型
   @classmethod
   def output_model(cls) -> Optional[Type[BaseModel]]:
      return OutputModel

# Node running logic
# 节点运行逻辑
   def run(self, input: BaseModel) -> BaseModel:
      result = input.number1 + input.number2
      return OutputModel(result=result)

if __name__ == "__main__":
   node = ExamplePluginAddition()
   input = InputModel(number1=1, number2=2)
   print(node.run(input));

2. 创造属于自己的自定义功能

image.png

  • 因为最近刚好在学习多因子组合,所以我想能不能自己做一个因子权重优化的节点。说实话过程有点煎熬,基本算是代码小白,反反复复的测试了很久,程序没问题了又来调整UI,中间省略N个小时。只能说功夫不负有心人,还算是弄出来了;

5e665e4b99034575b01c566848f43bb9.png

image.png

  • 第一张图没截好…这是等权下的表现,第二张图是我加入了权重优化节点,虽然IC的提升不大,不过收益率和5组的分层表现明显有提升,还是挺欣慰的有点小作用;
  • 然后说下我自己的感受,这个自定义功能确实很强,可以实现很多的想法,而且节点也可以保存反复的利用,就目前对我而言我能想到的就很多,可以做一些节点模块来筛选因子组合,特征组合,还有做策略的优化,在加上机器学习我觉得能做更多有意思的尝试,我把我权重优化节点的源码放到最后,也是算抛砖引玉了,希望大家做出更多有意思的自定义节点来。
import sys
import os
import logging
from typing import Optional, Type, List
import pandas as pd
import numpy as np
from scipy.optimize import minimize

logger = logging.getLogger(__name__)

# 尝試導入可能存在的插件系統,如果不存在則使用簡化版本
try:
    from panda_plugins.base import BaseWorkNode, work_node
    from panda_plugins.base.ui_control import ui
    from pydantic import BaseModel, Field
    try:
        from pydantic import field_validator
    except ImportError:
        field_validator = None  # For older Pydantic versions
    HAS_PANDA_PLUGINS = True
except ImportError:
    HAS_PANDA_PLUGINS = False

    # 定義簡化版本的類和裝飾器
    class BaseWorkNode:
        def log_info(self, msg):
            print(f"INFO: {msg}")

    def work_node(name="", group="", type="", box_color="", order=0):
        def decorator(cls):
            return cls
        return decorator

    def ui(**kwargs):
        def decorator(cls):
            return cls
        return decorator

    class BaseModel:
        pass

    def Field(title="", description="", default=None):
        return default

    field_validator = None  # 简化版本不支持验证器


# 定義輸入輸出模型
@ui(
    factor_group={"input_type": "None"},
)
class FactorWeightOptimizerInputModel(BaseModel):
    """
    因子權重優化輸入模型
    """
    factor_group: object = Field(title="因子組", description="輸入的因子DataFrame,包含多個因子列")


class FactorWeightOptimizerOutputModel(BaseModel):
    """
    因子權重優化輸出模型
    """
    weighted_factor: object = Field(title="加權後因子", description="經過權重優化後的因子值")

    if HAS_PANDA_PLUGINS:
        model_config = {"arbitrary_types_allowed": True}
    else:
        # For compatibility when pydantic is not available
        pass

    if field_validator is not None:
        @field_validator('weighted_factor')
        @classmethod
        def validate_weighted_factor(cls, v):
            if not isinstance(v, pd.DataFrame):
                raise ValueError('weighted_factor must be a pandas DataFrame')
            return v


@work_node(name="因子權重優化", group="04-因子相關", type="general", box_color="purple", order=9)
class FactorWeightOptimizerNode(BaseWorkNode):
    """
    因子權重優化節點,對輸入的因子組進行權重優化。
    通過優化算法計算各因子的最優權重,然後將因子加權合併為單一因子值。
    """

    def __init__(self):
        super().__init__()

    @classmethod
    def input_model(cls) -> Optional[Type[BaseModel]]:
        return FactorWeightOptimizerInputModel

    @classmethod
    def output_model(cls) -> Optional[Type[BaseModel]]:
        return FactorWeightOptimizerOutputModel

    def run(self, input: BaseModel) -> BaseModel:
        """
        執行因子權重優化

        Args:
            input: 包含因子組的輸入模型

        Returns:
            包含優化結果的輸出模型
        """

        try:
            print(f"開始因子權重優化")

            # 確保輸入的是DataFrame格式
            if not isinstance(input.factor_group, pd.DataFrame):
                raise ValueError("輸入的factor_group必須是pandas DataFrame格式")

            # 獲取因子列名(排除date、symbol)
            factor_columns = [col for col in input.factor_group.columns
                             if col not in ['date', 'symbol']]

            if len(factor_columns) == 0:
                raise ValueError("輸入的因子組中沒有找到因子列(除了date、symbol)")

            print(f"發現 {len(factor_columns)} 個因子: {factor_columns}")

            # 使用簡化的優化方法計算權重
            optimized_weights, optimization_score = self._optimize_weights_simple(
                input.factor_group[factor_columns]
            )

            # 應用權重計算加權因子
            weighted_factor = self._compute_weighted_factor(
                input.factor_group[factor_columns],
                optimized_weights
            )

            # 合併到原始數據框,添加加權因子列
            result_df = input.factor_group.copy()
            result_df['weighted_factor'] = weighted_factor

            # 將權重轉換為字典格式
            weight_dict = {factor: weight for factor, weight in zip(factor_columns, optimized_weights)}

            print(f"優化完成! 權重: {weight_dict}")
            print(f"優化得分: {optimization_score:.4f}")

            # 返回結果 - 使用兼容的方式
            if HAS_PANDA_PLUGINS:
                return FactorWeightOptimizerOutputModel(
                    weighted_factor=result_df
                )
            else:
                # 如果沒有pydantic支持,創建一個兼容的對象
                class OutputModel:
                    def __init__(self):
                        self.weighted_factor = result_df

                return OutputModel()

        except Exception as e:
            print(f"因子權重優化過程中出現錯誤: {str(e)}")
            # 返回錯誤情況下的默認輸出 - 使用兼容的方式
            if HAS_PANDA_PLUGINS:
                return FactorWeightOptimizerOutputModel(
                    weighted_factor=input.factor_group
                )
            else:
                # 如果沒有pydantic支持,創建一個兼容的對象
                class OutputModel:
                    def __init__(self):
                        self.weighted_factor = input.factor_group

                return OutputModel()

    def _optimize_weights(self, factor_data: pd.DataFrame, target_data: pd.Series = None,
                         weight_constraint: str = "sum_to_one") -> tuple[np.ndarray, float]:
        """
        優化因子權重

        Args:
            factor_data: 因子數據
            target_data: 目標數據(可選)
            weight_constraint: 權重約束

        Returns:
            最優權重數組和優化得分
        """
        n_factors = factor_data.shape[1]

        # 處理缺失值
        factor_data_clean = factor_data.ffill().fillna(0).values

        # 目標函數:最大化與目標變量的相關性或最小化某種風險指標
        def objective(weights):
            # 計算加權因子
            weighted_values = factor_data_clean.dot(weights)

            if target_data is not None and len(target_data) > 0:
                # 如果有目標數據,最大化與目標的相關性
                target_clean = target_data.fillna(target_data.mean()).values
                # 確保長度一致
                min_len = min(len(weighted_values), len(target_clean))
                correlation = np.corrcoef(weighted_values[:min_len], target_clean[:min_len])[0, 1]

                # 如果相關係數是NaN,設為0
                if np.isnan(correlation):
                    correlation = 0

                # 返回負相關係數(因為minimize是最小化函數)
                return -correlation
            else:
                # 如果沒有目標,使用因子自身的統計特性進行優化
                # 例如最大化加權因子的標準差(信息含量)
                std_score = np.std(weighted_values)
                # 添加正則化項防止權重過大
                reg_term = 0.01 * np.sum(weights ** 2)
                return -std_score + reg_term

        # 決定約束條件
        constraints = []
        if weight_constraint == "sum_to_one":
            # 權重之和為1的約束
            constraints.append({'type': 'eq', 'fun': lambda w: np.sum(w) - 1})

        # 邊界:權重大於等於0(可根據需要調整)
        bounds = [(0, 1) for _ in range(n_factors)]  # 限制權重在0到1之間

        # 初始猜測(等權重)
        initial_weights = np.ones(n_factors) / n_factors

        # 執行優化
        try:
            result = minimize(
                objective,
                initial_weights,
                method='SLSQP',
                bounds=bounds,
                constraints=constraints,
                options={'disp': False, 'maxiter': 1000}
            )

            if result.success:
                optimized_weights = result.x
                optimization_score = -result.fun  # 轉回正數
            else:
                print("優化失敗,使用等權重")
                optimized_weights = initial_weights
                optimization_score = -objective(initial_weights)
        except Exception as e:
            print(f"優化過程出錯: {str(e)},使用等權重")
            optimized_weights = initial_weights
            optimization_score = -objective(initial_weights)

        # 標準化權重使其和為1(以防計算誤差)
        if weight_constraint == "sum_to_one":
            optimized_weights = optimized_weights / np.sum(optimized_weights)

        return optimized_weights, optimization_score

    def _optimize_weights_simple(self, factor_data: pd.DataFrame) -> tuple[np.ndarray, float]:
        """
        简单的优化权重方法 - 基于波动率调整权重

        Args:
            factor_data: 因子數據

        Returns:
            最優權重數組和優化得分
        """
        n_factors = factor_data.shape[1]

        # 處理缺失值
        factor_data_clean = factor_data.ffill().fillna(0).values

        # 计算每个因子的标准差(波动率)
        std_devs = np.array([np.std(factor_data_clean[:, i]) for i in range(n_factors)])

        # 防止除零
        std_devs = np.where(std_devs == 0, 1, std_devs)

        # 波动率越小,权重越大(假设低波动率因子更稳定)
        weights = 1 / std_devs
        # 归一化
        weights = weights / weights.sum()

        # 简单评分(基于波动率的倒数)
        score = np.mean(1 / (std_devs + 0.01))  # 加小常数避免除零

        return weights, score

    def _compute_weighted_factor(self, factor_data: pd.DataFrame, weights: np.ndarray) -> pd.Series:
        """
        計算加權因子

        Args:
            factor_data: 因子數據
            weights: 權重數組

        Returns:
            加權後的因子值
        """
        # 確保數據完整
        factor_values = factor_data.ffill().fillna(0).values

        # 計算加權和
        weighted_values = factor_values.dot(weights)

        # 返回pandas Series,保持原有的索引
        return pd.Series(weighted_values, index=factor_data.index)


# 設置節點描述(僅在有插件系統時)
if HAS_PANDA_PLUGINS:
    FactorWeightOptimizerNode.set_short_description('''
<p>該節點用於對<span style="color: #ff0000;">因子組</span>進行權重優化,計算最優權重後合併為單一因子值。</p>
<p>使用簡單的波動率調整方法計算權重,輸出最優加權合併的因子值。</p>
    ''')

    FactorWeightOptimizerNode.set_long_description('''
<p>該節點用於對<span style="color: #ff0000;">因子組</span>進行權重優化,通過波動率調整算法計算各因子的最優權重。</p>
<p>基於因子穩定性(波動率)來確定權重,波動率較低的因子獲得較高的權重。</p>

<p><strong>功能亮點:</strong></p>

<p>-自動計算<span style="color: #ff6600;">因子權重</span>
<p>-基於<span style="color: #ff6600;">波動率調整</span>的權重分配
<p>-輸出<span style="color: #ff6600;">加權合併因子</span>

<p><strong>參數說明:</strong></p>

<p><span style="color: #008000;">因子組:</span> 包含多個因子列的DataFrame</p>

<p><strong>輸出格式:</strong></p>
<p>-添加<span style="color: #ff6600;">weighted_factor</span>列的原始DataFrame</p>

<p><strong><span style="color: #808080;">使用建議:</span></strong></p>

<p><span style="color: #808080;">-低波動率因子被認為更穩定,會獲得更高權重</span></p>
<p><span style="color: #808080;">-適用於無明確目標變量的因子合併場景</span></p>

    ''')


def create_input_model(factor_group):
    """創建輸入模型的兼容函數"""
    if HAS_PANDA_PLUGINS:
        # 如果有pydantic支持,使用正常的構造方式
        return FactorWeightOptimizerInputModel(
            factor_group=factor_group
        )
    else:
        # 如果沒有pydantic支持,創建一個兼容的對象
        class InputModel:
            def __init__(self):
                self.factor_group = factor_group

        model = InputModel()
        return model


def create_output_model(weighted_factor):
    """創建輸出模型的兼容函數"""
    if HAS_PANDA_PLUGINS:
        # 如果有pydantic支持,使用正常的構造方式
        return FactorWeightOptimizerOutputModel(
            weighted_factor=weighted_factor
        )
    else:
        # 如果沒有pydantic支持,創建一個兼容的對象
        class OutputModel:
            def __init__(self):
                self.weighted_factor = weighted_factor

        model = OutputModel()
        return model


if __name__ == "__main__":
    # 創建測試數據
    print("測試因子權重優化節點...")

    node = FactorWeightOptimizerNode()

    # 創建測試數據 - 模擬包含多個因子和目標變量的DataFrame
    dates = pd.date_range(start="2024-01-01", periods=100, freq='D')
    symbols = [f'STOCK_{i:03d}' for i in range(10)]

    # 創建所有可能的日期-股票組合
    date_symbol_pairs = [(date.strftime('%Y-%m-%d'), symbol) for date in dates for symbol in symbols]

    # 創建DataFrame
    df_data = []
    for date_str, symbol in date_symbol_pairs:
        row = {
            'date': date_str,
            'symbol': symbol,
            'factor1': np.random.normal(0, 0.1),  # 第一個因子
            'factor2': np.random.normal(0, 0.08),  # 第二個因子
            'factor3': np.random.normal(0, 0.12),  # 第三個因子
            'factor4': np.random.normal(0, 0.09),  # 第四個因子
            'target': np.random.normal(0, 0.1)     # 目標變量
        }
        df_data.append(row)

    test_df = pd.DataFrame(df_data)
    print(f"測試數據形狀: {test_df.shape}")
    print(f"因子列: {[col for col in test_df.columns if col.startswith('factor')]}")

    # 測試權重優化
    print("\n--- 測試因子權重優化 ---")
    input_model = create_input_model(
        factor_group=test_df
    )

    result = node.run(input_model)

    print(f"\n優化完成!")
    print(f"輸出數據形狀: {result.weighted_factor.shape}")
    print(f"是否包含加權因子列: {'weighted_factor' in result.weighted_factor.columns}")

    # 顯示部分結果
    print(f"前5行加權因子值:")
    print(result.weighted_factor[['date', 'symbol', 'weighted_factor']].head())
最后一次编辑于 3天前 0

暂无评论

推荐阅读
  tyler   23小时前   15   0   0 新手入门