工作流JSON高级编辑技巧
一、节点ID自动重构工具
在大型工作流中,手动修改节点ID极其容易出错,且需要同步更新 links 数组和所有节点的 inputs/outputs 引用。编写一个自动化重构脚本可以避免99%的人为错误。
核心思路:
- 扫描所有节点,建立旧ID到新ID的映射表
- 遍历
links数组,更新所有节点ID引用 - 遍历每个节点的
inputs和outputs,更新link和links字段 - 保持连接ID不变,只更新节点ID引用
完整重构脚本:
import json
from typing import Dict, List
def rebuild_node_ids(workflow_json, start_id=1):
"""
重构工作流中所有节点ID,确保从start_id开始连续递增
参数:
workflow_json: 工作流JSON字典
start_id: 起始节点ID,默认为1
返回:
重构后的工作流JSON
"""
nodes = workflow_json['litegraph']['nodes']
links = workflow_json['litegraph']['links']
# 1. 建立旧ID到新ID的映射
old_to_new: Dict[int, int] = {}
new_id = start_id
for node in nodes:
old_id = node['id']
old_to_new[old_id] = new_id
node['id'] = new_id
node['order'] = new_id # order通常与id保持一致
new_id += 1
# 2. 更新links数组中的节点ID引用
for link in links:
# link格式: [link_id, src_node_id, src_slot, tgt_node_id, tgt_slot, type]
old_src_id = link[1]
old_tgt_id = link[3]
link[1] = old_to_new[old_src_id]
link[3] = old_to_new[old_tgt_id]
# 3. 更新节点inputs/outputs中的link引用
nodes_dict = {node['id']: node for node in nodes}
for node in nodes:
# 更新outputs中的links数组(这些link ID不需要改变)
for output in node.get('outputs', []):
if 'links' in output:
# links数组中的ID是连接ID,不需要改变
pass # 保持原样
# 更新inputs中的link字段(这些link ID也不需要改变)
for input_slot in node.get('inputs', []):
if 'link' in input_slot:
# link字段的值是连接ID,不需要改变
pass # 保持原样
return workflow_json
def validate_and_fix_workflow(workflow_json):
"""
验证并修复工作流中的连接一致性
返回:
(is_valid, fixed_json, error_messages)
"""
errors = []
nodes = {node['id']: node for node in workflow_json['litegraph']['nodes']}
links = workflow_json['litegraph']['links']
# 检查每个连接
for link in links:
link_id, src_id, src_slot, tgt_id, tgt_slot, _ = link
# 检查节点是否存在
if src_id not in nodes:
errors.append(f"连接{link_id}: 源节点{src_id}不存在")
continue
if tgt_id not in nodes:
errors.append(f"连接{link_id}: 目标节点{tgt_id}不存在")
continue
# 检查输出槽
src_node = nodes[src_id]
if src_slot >= len(src_node.get('outputs', [])):
errors.append(f"连接{link_id}: 源节点{src_id}的输出槽{src_slot}不存在")
elif link_id not in src_node['outputs'][src_slot].get('links', []):
# 自动修复
if 'links' not in src_node['outputs'][src_slot]:
src_node['outputs'][src_slot]['links'] = []
src_node['outputs'][src_slot]['links'].append(link_id)
errors.append(f"已修复: 连接{link_id}添加到源节点{src_id}的输出槽{src_slot}")
# 检查输入槽
tgt_node = nodes[tgt_id]
if tgt_slot >= len(tgt_node.get('inputs', [])):
errors.append(f"连接{link_id}: 目标节点{tgt_id}的输入槽{tgt_slot}不存在")
elif tgt_node['inputs'][tgt_slot].get('link') != link_id:
# 自动修复
tgt_node['inputs'][tgt_slot]['link'] = link_id
errors.append(f"已修复: 连接{link_id}设置到目标节点{tgt_id}的输入槽{tgt_slot}")
return len(errors) == 0, workflow_json, errors
# 使用示例
if __name__ == "__main__":
# 读取工作流文件
with open('工作流.json', 'r', encoding='utf-8') as f:
workflow = json.load(f)
# 重构节点ID
workflow = rebuild_node_ids(workflow, start_id=1)
# 验证并修复
is_valid, workflow, errors = validate_and_fix_workflow(workflow)
if errors:
print("发现的问题:")
for error in errors:
print(f" - {error}")
# 保存修复后的工作流
with open('工作流_修复版.json', 'w', encoding='utf-8') as f:
json.dump(workflow, f, indent=2, ensure_ascii=False)
print("工作流重构完成!")
使用场景:
- 合并多个工作流时,需要统一节点ID
- 删除节点后,需要重新整理ID序列
- 从其他工作流复制节点时,避免ID冲突
二、工作流模块化与模板复用
将常用的因子组合封装成可复用的模块,可以大幅提高开发效率。通过Python脚本生成标准化的因子链,然后组合成完整工作流。
模块化设计思路:
- 因子模块:单个因子的完整链路(CodeControl → FactorBuildProControl → FactorWeightAdjustControl)
- 分析模块:因子分析链路(MultiFactorMergeControl → FactorAnalysisControl → FactorAnalysisChartControl)
- 策略模块:策略回测链路(CodeControl → 回测节点)
模块化生成器:
import json
import uuid
from typing import List, Dict, Optional
class WorkflowModule:
"""工作流模块基类"""
def __init__(self):
self.nodes = []
self.links = []
self.node_id_counter = 1
self.link_id_counter = 1
def _add_node(self, type_name, title, pos, size, **kwargs):
"""添加节点"""
nid = self.node_id_counter
self.node_id_counter += 1
node = {
"id": nid,
"type": type_name,
"title": title,
"pos": pos,
"size": size,
"flags": {"uuid": str(uuid.uuid4())},
"order": nid,
"mode": 0,
**kwargs
}
self.nodes.append(node)
return nid
def _add_link(self, src_id, src_slot, tgt_id, tgt_slot, data_type):
"""添加连接"""
lid = self.link_id_counter
self.link_id_counter += 1
link = [lid, src_id, src_slot, tgt_id, tgt_slot, data_type]
self.links.append(link)
# 更新节点的连接引用
src_node = self.nodes[src_id - 1]
if 'links' not in src_node['outputs'][src_slot]:
src_node['outputs'][src_slot]['links'] = []
src_node['outputs'][src_slot]['links'].append(lid)
tgt_node = self.nodes[tgt_id - 1]
tgt_node['inputs'][tgt_slot]['link'] = lid
return lid
class FactorModule(WorkflowModule):
"""因子模块:生成单个因子的完整链路"""
def __init__(self, factor_name: str, factor_code: str, weight: int,
start_date: str, end_date: str, pos: List[int]):
super().__init__()
self.factor_name = factor_name
self.pos = pos
self._build(factor_code, weight, start_date, end_date)
def _build(self, code: str, weight: int, start_date: str, end_date: str):
"""构建因子链路"""
x, y = self.pos
# 1. 因子代码节点
code_id = self._add_node(
"CodeControl", f"{self.factor_name}因子代码",
[x, y], [210, 63],
properties={"策略代码": code},
widgets_values=[code],
inputs=[{"name": "策略代码", "type": "string", "fieldName": "code"}],
outputs=[{"name": "策略代码", "type": "string", "fieldName": "code"}]
)
# 2. 因子构建节点
build_id = self._add_node(
"FactorBuildProControl", f"{self.factor_name}因子构建",
[x + 250, y], [210, 180],
properties={
"start_date": start_date,
"end_date": end_date,
"type": "Python",
"code": "",
"market": "股票",
"direction": "负向"
},
inputs=[
{"name": "开始时间", "type": "string", "fieldName": "start_date"},
{"name": "结束时间", "type": "string", "fieldName": "end_date"},
{"name": "因子代码", "type": "string", "fieldName": "code"}
],
outputs=[{"name": "因子值", "type": "dataframe", "fieldName": "factor"}]
)
self._add_link(code_id, 0, build_id, 2, "string")
# 3. 权重调整节点
weight_id = self._add_node(
"FactorWeightAdjustControl", f"{self.factor_name}权重({weight})",
[x + 500, y + 50], [210, 63],
properties={"weight": weight},
inputs=[{"name": "因子值", "type": 0, "fieldName": "df_factor"}],
outputs=[{"name": "因子值", "type": 0, "fieldName": "df_factor"}]
)
self._add_link(build_id, 0, weight_id, 0, "dataframe")
# 保存输出节点ID,供外部连接使用
self.output_node_id = weight_id
self.output_slot = 0
class WorkflowBuilder:
"""工作流构建器:组合多个模块"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.modules: List[WorkflowModule] = []
self.base_node_id = 1
self.base_link_id = 1
def add_factor_module(self, factor_name: str, factor_code: str, weight: int,
start_date: str, end_date: str, pos: List[int]):
"""添加因子模块"""
module = FactorModule(factor_name, factor_code, weight,
start_date, end_date, pos)
self.modules.append(module)
return module
def build(self) -> Dict:
"""构建完整工作流"""
all_nodes = []
all_links = []
node_id_offset = 0
link_id_offset = 0
# 合并所有模块的节点和连接
for module in self.modules:
# 调整节点ID
for node in module.nodes:
node['id'] += node_id_offset
node['order'] = node['id']
# 调整连接ID和节点ID引用
for link in module.links:
link[0] += link_id_offset # 连接ID
link[1] += node_id_offset # 源节点ID
link[3] += node_id_offset # 目标节点ID
all_nodes.extend(module.nodes)
all_links.extend(module.links)
node_id_offset += len(module.nodes)
link_id_offset += len(module.links)
return {
"format_version": "V1.0",
"name": self.name,
"description": self.description,
"litegraph": {
"id": str(uuid.uuid4()),
"version": 0.4,
"nodes": all_nodes,
"links": all_links,
"groups": [],
"config": {},
"extra": {}
}
}
# 使用示例
if __name__ == "__main__":
# 定义因子代码
atr_code = """class ATRFactor(Factor):
def calculate(self, factors):
close = factors['close']
high = factors['high']
low = factors['low']
close_prev = DELAY(close, 1)
tr1 = high - low
tr2 = (high - close_prev).abs()
tr3 = (low - close_prev).abs()
tr = tr1.where(tr1 > tr2, tr2)
tr = tr.where(tr > tr3, tr3)
atr = TS_MEAN(tr, 21)
ema_close = TS_MEAN(close, 10)
result = (close - ema_close) / (atr + 1e-10)
return result"""
kama_code = """class KAMAFactor(Factor):
def calculate(self, factors):
close = factors['close']
period = 20
close_delayed = DELAY(close, period)
change = (close - close_delayed).abs()
volatility = TS_MEAN(close.diff().abs(), period) * period
er = change / (volatility + 1e-10)
return er"""
# 构建工作流
builder = WorkflowBuilder("多因子工作流", "ATR+KAMA双因子分析")
# 添加ATR因子模块
builder.add_factor_module("ATR", atr_code, weight=2,
start_date="20240101", end_date="20241201",
pos=[100, 100])
# 添加KAMA因子模块
builder.add_factor_module("KAMA", kama_code, weight=1,
start_date="20240101", end_date="20241201",
pos=[100, 400])
# 生成工作流JSON
workflow = builder.build()
# 保存
with open('多因子工作流_模块化生成.json', 'w', encoding='utf-8') as f:
json.dump(workflow, f, indent=2, ensure_ascii=False)
print("模块化工作流生成完成!")
优势:
- 代码复用:常用因子只需定义一次,可快速组合
- 维护简单:修改因子代码只需更新一处
- 结构清晰:模块化设计便于理解和调试
- 扩展性强:新增因子只需调用
add_factor_module
进阶应用:
- 将常用因子模块保存为JSON模板文件
- 通过配置文件(YAML/JSON)批量生成工作流
- 结合CI/CD实现工作流的自动化测试和部署