1.自定义节点初步了解
- 打开专家模式然后,在notebook上点击可以填充节点模版代码,里面有测试节点的示例;
- 图片看不清的话,我把源码粘贴到了下面,可以看看自定义节点的代码结构是怎样,也可以丢给AI去学习,本人代码能力基本为零也是通过AI再一点点摸索,
在就是你要知道你想创造的节点是要起什么作用,你的自定义节点要和那些节点连接的,可以在专家模式里面在看看其他节点的代码结构是怎样的,要不然
你也没办法让AI明白你要做什么。
from typing import Optional, Type
from panda_plugins.base import BaseWorkNode, work_node
from pydantic import BaseModel
class InputModel(BaseModel):
"""
Define the input model for the node.
Use pydantic to define, which is a library for data validation and parsing.
Reference: https://pydantic-docs.helpmanual.io
为工作节点定义输入模型.
使用 Pydantic 定义, Pydantic 是一个用于数据验证和解析的库.
参考文档: https://pydantic-docs.helpmanual.io
"""
number1: int
number2: int
class OutputModel(BaseModel):
"""
Define the output model for the node.
Use pydantic to define, which is a library for data validation and parsing.
Reference: https://pydantic-docs.helpmanual.io
为工作节点定义输出模型.
使用 Pydantic 定义, Pydantic 是一个用于数据验证和解析的库.
参考文档: https://pydantic-docs.helpmanual.io
"""
result: int
@work_node(name="示例-两数求和", group="测试节点")
class ExamplePluginAddition(BaseWorkNode):
"""
Implement a example node, which can add two numbers and return the result.
实现一个示例节点, 完成一个简单的加法运算, 输入 2 个数值, 输出 2 个数值的和.
"""
# Return the input model
# 返回输入模型
@classmethod
def input_model(cls) -> Optional[Type[BaseModel]]:
return InputModel
# Return the output model
# 返回输出模型
@classmethod
def output_model(cls) -> Optional[Type[BaseModel]]:
return OutputModel
# Node running logic
# 节点运行逻辑
def run(self, input: BaseModel) -> BaseModel:
result = input.number1 + input.number2
return OutputModel(result=result)
if __name__ == "__main__":
node = ExamplePluginAddition()
input = InputModel(number1=1, number2=2)
print(node.run(input));
2. 创造属于自己的自定义功能
- 因为最近刚好在学习多因子组合,所以我想能不能自己做一个因子权重优化的节点。说实话过程有点煎熬,基本算是代码小白,反反复复的测试了很久,程序没问题了又来调整UI,中间省略N个小时。只能说功夫不负有心人,还算是弄出来了;
- 第一张图没截好…这是等权下的表现,第二张图是我加入了权重优化节点,虽然IC的提升不大,不过收益率和5组的分层表现明显有提升,还是挺欣慰的有点小作用;
- 然后说下我自己的感受,这个自定义功能确实很强,可以实现很多的想法,而且节点也可以保存反复的利用,就目前对我而言我能想到的就很多,可以做一些节点模块来筛选因子组合,特征组合,还有做策略的优化,在加上机器学习我觉得能做更多有意思的尝试,我把我权重优化节点的源码放到最后,也是算抛砖引玉了,希望大家做出更多有意思的自定义节点来。
import sys
import os
import logging
from typing import Optional, Type, List
import pandas as pd
import numpy as np
from scipy.optimize import minimize
logger = logging.getLogger(__name__)
# 尝試導入可能存在的插件系統,如果不存在則使用簡化版本
try:
from panda_plugins.base import BaseWorkNode, work_node
from panda_plugins.base.ui_control import ui
from pydantic import BaseModel, Field
try:
from pydantic import field_validator
except ImportError:
field_validator = None # For older Pydantic versions
HAS_PANDA_PLUGINS = True
except ImportError:
HAS_PANDA_PLUGINS = False
# 定義簡化版本的類和裝飾器
class BaseWorkNode:
def log_info(self, msg):
print(f"INFO: {msg}")
def work_node(name="", group="", type="", box_color="", order=0):
def decorator(cls):
return cls
return decorator
def ui(**kwargs):
def decorator(cls):
return cls
return decorator
class BaseModel:
pass
def Field(title="", description="", default=None):
return default
field_validator = None # 简化版本不支持验证器
# 定義輸入輸出模型
@ui(
factor_group={"input_type": "None"},
)
class FactorWeightOptimizerInputModel(BaseModel):
"""
因子權重優化輸入模型
"""
factor_group: object = Field(title="因子組", description="輸入的因子DataFrame,包含多個因子列")
class FactorWeightOptimizerOutputModel(BaseModel):
"""
因子權重優化輸出模型
"""
weighted_factor: object = Field(title="加權後因子", description="經過權重優化後的因子值")
if HAS_PANDA_PLUGINS:
model_config = {"arbitrary_types_allowed": True}
else:
# For compatibility when pydantic is not available
pass
if field_validator is not None:
@field_validator('weighted_factor')
@classmethod
def validate_weighted_factor(cls, v):
if not isinstance(v, pd.DataFrame):
raise ValueError('weighted_factor must be a pandas DataFrame')
return v
@work_node(name="因子權重優化", group="04-因子相關", type="general", box_color="purple", order=9)
class FactorWeightOptimizerNode(BaseWorkNode):
"""
因子權重優化節點,對輸入的因子組進行權重優化。
通過優化算法計算各因子的最優權重,然後將因子加權合併為單一因子值。
"""
def __init__(self):
super().__init__()
@classmethod
def input_model(cls) -> Optional[Type[BaseModel]]:
return FactorWeightOptimizerInputModel
@classmethod
def output_model(cls) -> Optional[Type[BaseModel]]:
return FactorWeightOptimizerOutputModel
def run(self, input: BaseModel) -> BaseModel:
"""
執行因子權重優化
Args:
input: 包含因子組的輸入模型
Returns:
包含優化結果的輸出模型
"""
try:
print(f"開始因子權重優化")
# 確保輸入的是DataFrame格式
if not isinstance(input.factor_group, pd.DataFrame):
raise ValueError("輸入的factor_group必須是pandas DataFrame格式")
# 獲取因子列名(排除date、symbol)
factor_columns = [col for col in input.factor_group.columns
if col not in ['date', 'symbol']]
if len(factor_columns) == 0:
raise ValueError("輸入的因子組中沒有找到因子列(除了date、symbol)")
print(f"發現 {len(factor_columns)} 個因子: {factor_columns}")
# 使用簡化的優化方法計算權重
optimized_weights, optimization_score = self._optimize_weights_simple(
input.factor_group[factor_columns]
)
# 應用權重計算加權因子
weighted_factor = self._compute_weighted_factor(
input.factor_group[factor_columns],
optimized_weights
)
# 合併到原始數據框,添加加權因子列
result_df = input.factor_group.copy()
result_df['weighted_factor'] = weighted_factor
# 將權重轉換為字典格式
weight_dict = {factor: weight for factor, weight in zip(factor_columns, optimized_weights)}
print(f"優化完成! 權重: {weight_dict}")
print(f"優化得分: {optimization_score:.4f}")
# 返回結果 - 使用兼容的方式
if HAS_PANDA_PLUGINS:
return FactorWeightOptimizerOutputModel(
weighted_factor=result_df
)
else:
# 如果沒有pydantic支持,創建一個兼容的對象
class OutputModel:
def __init__(self):
self.weighted_factor = result_df
return OutputModel()
except Exception as e:
print(f"因子權重優化過程中出現錯誤: {str(e)}")
# 返回錯誤情況下的默認輸出 - 使用兼容的方式
if HAS_PANDA_PLUGINS:
return FactorWeightOptimizerOutputModel(
weighted_factor=input.factor_group
)
else:
# 如果沒有pydantic支持,創建一個兼容的對象
class OutputModel:
def __init__(self):
self.weighted_factor = input.factor_group
return OutputModel()
def _optimize_weights(self, factor_data: pd.DataFrame, target_data: pd.Series = None,
weight_constraint: str = "sum_to_one") -> tuple[np.ndarray, float]:
"""
優化因子權重
Args:
factor_data: 因子數據
target_data: 目標數據(可選)
weight_constraint: 權重約束
Returns:
最優權重數組和優化得分
"""
n_factors = factor_data.shape[1]
# 處理缺失值
factor_data_clean = factor_data.ffill().fillna(0).values
# 目標函數:最大化與目標變量的相關性或最小化某種風險指標
def objective(weights):
# 計算加權因子
weighted_values = factor_data_clean.dot(weights)
if target_data is not None and len(target_data) > 0:
# 如果有目標數據,最大化與目標的相關性
target_clean = target_data.fillna(target_data.mean()).values
# 確保長度一致
min_len = min(len(weighted_values), len(target_clean))
correlation = np.corrcoef(weighted_values[:min_len], target_clean[:min_len])[0, 1]
# 如果相關係數是NaN,設為0
if np.isnan(correlation):
correlation = 0
# 返回負相關係數(因為minimize是最小化函數)
return -correlation
else:
# 如果沒有目標,使用因子自身的統計特性進行優化
# 例如最大化加權因子的標準差(信息含量)
std_score = np.std(weighted_values)
# 添加正則化項防止權重過大
reg_term = 0.01 * np.sum(weights ** 2)
return -std_score + reg_term
# 決定約束條件
constraints = []
if weight_constraint == "sum_to_one":
# 權重之和為1的約束
constraints.append({'type': 'eq', 'fun': lambda w: np.sum(w) - 1})
# 邊界:權重大於等於0(可根據需要調整)
bounds = [(0, 1) for _ in range(n_factors)] # 限制權重在0到1之間
# 初始猜測(等權重)
initial_weights = np.ones(n_factors) / n_factors
# 執行優化
try:
result = minimize(
objective,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints,
options={'disp': False, 'maxiter': 1000}
)
if result.success:
optimized_weights = result.x
optimization_score = -result.fun # 轉回正數
else:
print("優化失敗,使用等權重")
optimized_weights = initial_weights
optimization_score = -objective(initial_weights)
except Exception as e:
print(f"優化過程出錯: {str(e)},使用等權重")
optimized_weights = initial_weights
optimization_score = -objective(initial_weights)
# 標準化權重使其和為1(以防計算誤差)
if weight_constraint == "sum_to_one":
optimized_weights = optimized_weights / np.sum(optimized_weights)
return optimized_weights, optimization_score
def _optimize_weights_simple(self, factor_data: pd.DataFrame) -> tuple[np.ndarray, float]:
"""
简单的优化权重方法 - 基于波动率调整权重
Args:
factor_data: 因子數據
Returns:
最優權重數組和優化得分
"""
n_factors = factor_data.shape[1]
# 處理缺失值
factor_data_clean = factor_data.ffill().fillna(0).values
# 计算每个因子的标准差(波动率)
std_devs = np.array([np.std(factor_data_clean[:, i]) for i in range(n_factors)])
# 防止除零
std_devs = np.where(std_devs == 0, 1, std_devs)
# 波动率越小,权重越大(假设低波动率因子更稳定)
weights = 1 / std_devs
# 归一化
weights = weights / weights.sum()
# 简单评分(基于波动率的倒数)
score = np.mean(1 / (std_devs + 0.01)) # 加小常数避免除零
return weights, score
def _compute_weighted_factor(self, factor_data: pd.DataFrame, weights: np.ndarray) -> pd.Series:
"""
計算加權因子
Args:
factor_data: 因子數據
weights: 權重數組
Returns:
加權後的因子值
"""
# 確保數據完整
factor_values = factor_data.ffill().fillna(0).values
# 計算加權和
weighted_values = factor_values.dot(weights)
# 返回pandas Series,保持原有的索引
return pd.Series(weighted_values, index=factor_data.index)
# 設置節點描述(僅在有插件系統時)
if HAS_PANDA_PLUGINS:
FactorWeightOptimizerNode.set_short_description('''
<p>該節點用於對<span style="color: #ff0000;">因子組</span>進行權重優化,計算最優權重後合併為單一因子值。</p>
<p>使用簡單的波動率調整方法計算權重,輸出最優加權合併的因子值。</p>
''')
FactorWeightOptimizerNode.set_long_description('''
<p>該節點用於對<span style="color: #ff0000;">因子組</span>進行權重優化,通過波動率調整算法計算各因子的最優權重。</p>
<p>基於因子穩定性(波動率)來確定權重,波動率較低的因子獲得較高的權重。</p>
<p><strong>功能亮點:</strong></p>
<p>-自動計算<span style="color: #ff6600;">因子權重</span>
<p>-基於<span style="color: #ff6600;">波動率調整</span>的權重分配
<p>-輸出<span style="color: #ff6600;">加權合併因子</span>
<p><strong>參數說明:</strong></p>
<p><span style="color: #008000;">因子組:</span> 包含多個因子列的DataFrame</p>
<p><strong>輸出格式:</strong></p>
<p>-添加<span style="color: #ff6600;">weighted_factor</span>列的原始DataFrame</p>
<p><strong><span style="color: #808080;">使用建議:</span></strong></p>
<p><span style="color: #808080;">-低波動率因子被認為更穩定,會獲得更高權重</span></p>
<p><span style="color: #808080;">-適用於無明確目標變量的因子合併場景</span></p>
''')
def create_input_model(factor_group):
"""創建輸入模型的兼容函數"""
if HAS_PANDA_PLUGINS:
# 如果有pydantic支持,使用正常的構造方式
return FactorWeightOptimizerInputModel(
factor_group=factor_group
)
else:
# 如果沒有pydantic支持,創建一個兼容的對象
class InputModel:
def __init__(self):
self.factor_group = factor_group
model = InputModel()
return model
def create_output_model(weighted_factor):
"""創建輸出模型的兼容函數"""
if HAS_PANDA_PLUGINS:
# 如果有pydantic支持,使用正常的構造方式
return FactorWeightOptimizerOutputModel(
weighted_factor=weighted_factor
)
else:
# 如果沒有pydantic支持,創建一個兼容的對象
class OutputModel:
def __init__(self):
self.weighted_factor = weighted_factor
model = OutputModel()
return model
if __name__ == "__main__":
# 創建測試數據
print("測試因子權重優化節點...")
node = FactorWeightOptimizerNode()
# 創建測試數據 - 模擬包含多個因子和目標變量的DataFrame
dates = pd.date_range(start="2024-01-01", periods=100, freq='D')
symbols = [f'STOCK_{i:03d}' for i in range(10)]
# 創建所有可能的日期-股票組合
date_symbol_pairs = [(date.strftime('%Y-%m-%d'), symbol) for date in dates for symbol in symbols]
# 創建DataFrame
df_data = []
for date_str, symbol in date_symbol_pairs:
row = {
'date': date_str,
'symbol': symbol,
'factor1': np.random.normal(0, 0.1), # 第一個因子
'factor2': np.random.normal(0, 0.08), # 第二個因子
'factor3': np.random.normal(0, 0.12), # 第三個因子
'factor4': np.random.normal(0, 0.09), # 第四個因子
'target': np.random.normal(0, 0.1) # 目標變量
}
df_data.append(row)
test_df = pd.DataFrame(df_data)
print(f"測試數據形狀: {test_df.shape}")
print(f"因子列: {[col for col in test_df.columns if col.startswith('factor')]}")
# 測試權重優化
print("\n--- 測試因子權重優化 ---")
input_model = create_input_model(
factor_group=test_df
)
result = node.run(input_model)
print(f"\n優化完成!")
print(f"輸出數據形狀: {result.weighted_factor.shape}")
print(f"是否包含加權因子列: {'weighted_factor' in result.weighted_factor.columns}")
# 顯示部分結果
print(f"前5行加權因子值:")
print(result.weighted_factor[['date', 'symbol', 'weighted_factor']].head())