工作流JSON高级编辑技巧
  13608564384 24天前 59 0

工作流JSON高级编辑技巧

一、节点ID自动重构工具

在大型工作流中,手动修改节点ID极其容易出错,且需要同步更新 links 数组和所有节点的 inputs/outputs 引用。编写一个自动化重构脚本可以避免99%的人为错误。

核心思路

  1. 扫描所有节点,建立旧ID到新ID的映射表
  2. 遍历 links 数组,更新所有节点ID引用
  3. 遍历每个节点的 inputsoutputs,更新 linklinks 字段
  4. 保持连接ID不变,只更新节点ID引用

完整重构脚本

import json from typing import Dict, List def rebuild_node_ids(workflow_json, start_id=1): """ 重构工作流中所有节点ID,确保从start_id开始连续递增 参数: workflow_json: 工作流JSON字典 start_id: 起始节点ID,默认为1 返回: 重构后的工作流JSON """ nodes = workflow_json['litegraph']['nodes'] links = workflow_json['litegraph']['links'] # 1. 建立旧ID到新ID的映射 old_to_new: Dict[int, int] = {} new_id = start_id for node in nodes: old_id = node['id'] old_to_new[old_id] = new_id node['id'] = new_id node['order'] = new_id # order通常与id保持一致 new_id += 1 # 2. 更新links数组中的节点ID引用 for link in links: # link格式: [link_id, src_node_id, src_slot, tgt_node_id, tgt_slot, type] old_src_id = link[1] old_tgt_id = link[3] link[1] = old_to_new[old_src_id] link[3] = old_to_new[old_tgt_id] # 3. 更新节点inputs/outputs中的link引用 nodes_dict = {node['id']: node for node in nodes} for node in nodes: # 更新outputs中的links数组(这些link ID不需要改变) for output in node.get('outputs', []): if 'links' in output: # links数组中的ID是连接ID,不需要改变 pass # 保持原样 # 更新inputs中的link字段(这些link ID也不需要改变) for input_slot in node.get('inputs', []): if 'link' in input_slot: # link字段的值是连接ID,不需要改变 pass # 保持原样 return workflow_json def validate_and_fix_workflow(workflow_json): """ 验证并修复工作流中的连接一致性 返回: (is_valid, fixed_json, error_messages) """ errors = [] nodes = {node['id']: node for node in workflow_json['litegraph']['nodes']} links = workflow_json['litegraph']['links'] # 检查每个连接 for link in links: link_id, src_id, src_slot, tgt_id, tgt_slot, _ = link # 检查节点是否存在 if src_id not in nodes: errors.append(f"连接{link_id}: 源节点{src_id}不存在") continue if tgt_id not in nodes: errors.append(f"连接{link_id}: 目标节点{tgt_id}不存在") continue # 检查输出槽 src_node = nodes[src_id] if src_slot >= len(src_node.get('outputs', [])): errors.append(f"连接{link_id}: 源节点{src_id}的输出槽{src_slot}不存在") elif link_id not in src_node['outputs'][src_slot].get('links', []): # 自动修复 if 'links' not in src_node['outputs'][src_slot]: src_node['outputs'][src_slot]['links'] = [] src_node['outputs'][src_slot]['links'].append(link_id) errors.append(f"已修复: 连接{link_id}添加到源节点{src_id}的输出槽{src_slot}") # 检查输入槽 tgt_node = nodes[tgt_id] if tgt_slot >= len(tgt_node.get('inputs', [])): errors.append(f"连接{link_id}: 目标节点{tgt_id}的输入槽{tgt_slot}不存在") elif tgt_node['inputs'][tgt_slot].get('link') != link_id: # 自动修复 tgt_node['inputs'][tgt_slot]['link'] = link_id errors.append(f"已修复: 连接{link_id}设置到目标节点{tgt_id}的输入槽{tgt_slot}") return len(errors) == 0, workflow_json, errors # 使用示例 if __name__ == "__main__": # 读取工作流文件 with open('工作流.json', 'r', encoding='utf-8') as f: workflow = json.load(f) # 重构节点ID workflow = rebuild_node_ids(workflow, start_id=1) # 验证并修复 is_valid, workflow, errors = validate_and_fix_workflow(workflow) if errors: print("发现的问题:") for error in errors: print(f" - {error}") # 保存修复后的工作流 with open('工作流_修复版.json', 'w', encoding='utf-8') as f: json.dump(workflow, f, indent=2, ensure_ascii=False) print("工作流重构完成!")

使用场景

  • 合并多个工作流时,需要统一节点ID
  • 删除节点后,需要重新整理ID序列
  • 从其他工作流复制节点时,避免ID冲突

二、工作流模块化与模板复用

将常用的因子组合封装成可复用的模块,可以大幅提高开发效率。通过Python脚本生成标准化的因子链,然后组合成完整工作流。

模块化设计思路

  1. 因子模块:单个因子的完整链路(CodeControl → FactorBuildProControl → FactorWeightAdjustControl)
  2. 分析模块:因子分析链路(MultiFactorMergeControl → FactorAnalysisControl → FactorAnalysisChartControl)
  3. 策略模块:策略回测链路(CodeControl → 回测节点)

模块化生成器

import json import uuid from typing import List, Dict, Optional class WorkflowModule: """工作流模块基类""" def __init__(self): self.nodes = [] self.links = [] self.node_id_counter = 1 self.link_id_counter = 1 def _add_node(self, type_name, title, pos, size, **kwargs): """添加节点""" nid = self.node_id_counter self.node_id_counter += 1 node = { "id": nid, "type": type_name, "title": title, "pos": pos, "size": size, "flags": {"uuid": str(uuid.uuid4())}, "order": nid, "mode": 0, **kwargs } self.nodes.append(node) return nid def _add_link(self, src_id, src_slot, tgt_id, tgt_slot, data_type): """添加连接""" lid = self.link_id_counter self.link_id_counter += 1 link = [lid, src_id, src_slot, tgt_id, tgt_slot, data_type] self.links.append(link) # 更新节点的连接引用 src_node = self.nodes[src_id - 1] if 'links' not in src_node['outputs'][src_slot]: src_node['outputs'][src_slot]['links'] = [] src_node['outputs'][src_slot]['links'].append(lid) tgt_node = self.nodes[tgt_id - 1] tgt_node['inputs'][tgt_slot]['link'] = lid return lid class FactorModule(WorkflowModule): """因子模块:生成单个因子的完整链路""" def __init__(self, factor_name: str, factor_code: str, weight: int, start_date: str, end_date: str, pos: List[int]): super().__init__() self.factor_name = factor_name self.pos = pos self._build(factor_code, weight, start_date, end_date) def _build(self, code: str, weight: int, start_date: str, end_date: str): """构建因子链路""" x, y = self.pos # 1. 因子代码节点 code_id = self._add_node( "CodeControl", f"{self.factor_name}因子代码", [x, y], [210, 63], properties={"策略代码": code}, widgets_values=[code], inputs=[{"name": "策略代码", "type": "string", "fieldName": "code"}], outputs=[{"name": "策略代码", "type": "string", "fieldName": "code"}] ) # 2. 因子构建节点 build_id = self._add_node( "FactorBuildProControl", f"{self.factor_name}因子构建", [x + 250, y], [210, 180], properties={ "start_date": start_date, "end_date": end_date, "type": "Python", "code": "", "market": "股票", "direction": "负向" }, inputs=[ {"name": "开始时间", "type": "string", "fieldName": "start_date"}, {"name": "结束时间", "type": "string", "fieldName": "end_date"}, {"name": "因子代码", "type": "string", "fieldName": "code"} ], outputs=[{"name": "因子值", "type": "dataframe", "fieldName": "factor"}] ) self._add_link(code_id, 0, build_id, 2, "string") # 3. 权重调整节点 weight_id = self._add_node( "FactorWeightAdjustControl", f"{self.factor_name}权重({weight})", [x + 500, y + 50], [210, 63], properties={"weight": weight}, inputs=[{"name": "因子值", "type": 0, "fieldName": "df_factor"}], outputs=[{"name": "因子值", "type": 0, "fieldName": "df_factor"}] ) self._add_link(build_id, 0, weight_id, 0, "dataframe") # 保存输出节点ID,供外部连接使用 self.output_node_id = weight_id self.output_slot = 0 class WorkflowBuilder: """工作流构建器:组合多个模块""" def __init__(self, name: str, description: str): self.name = name self.description = description self.modules: List[WorkflowModule] = [] self.base_node_id = 1 self.base_link_id = 1 def add_factor_module(self, factor_name: str, factor_code: str, weight: int, start_date: str, end_date: str, pos: List[int]): """添加因子模块""" module = FactorModule(factor_name, factor_code, weight, start_date, end_date, pos) self.modules.append(module) return module def build(self) -> Dict: """构建完整工作流""" all_nodes = [] all_links = [] node_id_offset = 0 link_id_offset = 0 # 合并所有模块的节点和连接 for module in self.modules: # 调整节点ID for node in module.nodes: node['id'] += node_id_offset node['order'] = node['id'] # 调整连接ID和节点ID引用 for link in module.links: link[0] += link_id_offset # 连接ID link[1] += node_id_offset # 源节点ID link[3] += node_id_offset # 目标节点ID all_nodes.extend(module.nodes) all_links.extend(module.links) node_id_offset += len(module.nodes) link_id_offset += len(module.links) return { "format_version": "V1.0", "name": self.name, "description": self.description, "litegraph": { "id": str(uuid.uuid4()), "version": 0.4, "nodes": all_nodes, "links": all_links, "groups": [], "config": {}, "extra": {} } } # 使用示例 if __name__ == "__main__": # 定义因子代码 atr_code = """class ATRFactor(Factor): def calculate(self, factors): close = factors['close'] high = factors['high'] low = factors['low'] close_prev = DELAY(close, 1) tr1 = high - low tr2 = (high - close_prev).abs() tr3 = (low - close_prev).abs() tr = tr1.where(tr1 > tr2, tr2) tr = tr.where(tr > tr3, tr3) atr = TS_MEAN(tr, 21) ema_close = TS_MEAN(close, 10) result = (close - ema_close) / (atr + 1e-10) return result""" kama_code = """class KAMAFactor(Factor): def calculate(self, factors): close = factors['close'] period = 20 close_delayed = DELAY(close, period) change = (close - close_delayed).abs() volatility = TS_MEAN(close.diff().abs(), period) * period er = change / (volatility + 1e-10) return er""" # 构建工作流 builder = WorkflowBuilder("多因子工作流", "ATR+KAMA双因子分析") # 添加ATR因子模块 builder.add_factor_module("ATR", atr_code, weight=2, start_date="20240101", end_date="20241201", pos=[100, 100]) # 添加KAMA因子模块 builder.add_factor_module("KAMA", kama_code, weight=1, start_date="20240101", end_date="20241201", pos=[100, 400]) # 生成工作流JSON workflow = builder.build() # 保存 with open('多因子工作流_模块化生成.json', 'w', encoding='utf-8') as f: json.dump(workflow, f, indent=2, ensure_ascii=False) print("模块化工作流生成完成!")

优势

  • 代码复用:常用因子只需定义一次,可快速组合
  • 维护简单:修改因子代码只需更新一处
  • 结构清晰:模块化设计便于理解和调试
  • 扩展性强:新增因子只需调用 add_factor_module

进阶应用

  • 将常用因子模块保存为JSON模板文件
  • 通过配置文件(YAML/JSON)批量生成工作流
  • 结合CI/CD实现工作流的自动化测试和部署
最后一次编辑于 24天前 0

暂无评论

推荐阅读