@@ -437,6 +437,73 @@ def check_all_types(
437
437
return validation
438
438
439
439
440
+ def circular_dependency_checker (step_inputs : List [CWLObjectType ]) -> None :
441
+ """Check if a workflow has circular dependency."""
442
+ adjacency = get_dependency_tree (step_inputs )
443
+ vertices = adjacency .keys ()
444
+ processed : List [str ] = []
445
+ cycles : List [List [str ]] = []
446
+ for vertex in vertices :
447
+ if vertex not in processed :
448
+ traversal_path = [vertex ]
449
+ processDFS (adjacency , traversal_path , processed , cycles )
450
+ if cycles :
451
+ exception_msg = "The following steps have circular dependency:\n "
452
+ cyclestrs = [str (cycle ) for cycle in cycles ]
453
+ exception_msg += "\n " .join (cyclestrs )
454
+ raise ValidationException (exception_msg )
455
+
456
+
457
+ def get_dependency_tree (step_inputs : List [CWLObjectType ]) -> Dict [str , List [str ]]:
458
+ """Get the dependency tree in the form of adjacency list."""
459
+ adjacency = {} # adjacency list of the dependency tree
460
+ for step_input in step_inputs :
461
+ if "source" in step_input :
462
+ if isinstance (step_input ["source" ], list ):
463
+ vertices_in = [
464
+ get_step_id (cast (str , src )) for src in step_input ["source" ]
465
+ ]
466
+ else :
467
+ vertices_in = [get_step_id (cast (str , step_input ["source" ]))]
468
+ vertex_out = get_step_id (cast (str , step_input ["id" ]))
469
+ for vertex_in in vertices_in :
470
+ if vertex_in not in adjacency :
471
+ adjacency [vertex_in ] = [vertex_out ]
472
+ elif vertex_out not in adjacency [vertex_in ]:
473
+ adjacency [vertex_in ].append (vertex_out )
474
+ if vertex_out not in adjacency :
475
+ adjacency [vertex_out ] = []
476
+ return adjacency
477
+
478
+
479
+ def processDFS (
480
+ adjacency : Dict [str , List [str ]],
481
+ traversal_path : List [str ],
482
+ processed : List [str ],
483
+ cycles : List [List [str ]],
484
+ ) -> None :
485
+ """Perform depth first search."""
486
+ tip = traversal_path [- 1 ]
487
+ for vertex in adjacency [tip ]:
488
+ if vertex in traversal_path :
489
+ i = traversal_path .index (vertex )
490
+ cycles .append (traversal_path [i :])
491
+ elif vertex not in processed :
492
+ traversal_path .append (vertex )
493
+ processDFS (adjacency , traversal_path , processed , cycles )
494
+ processed .append (tip )
495
+ traversal_path .pop ()
496
+
497
+
498
+ def get_step_id (field_id : str ) -> str :
499
+ """Extract step id from either input or output fields."""
500
+ if "/" in field_id .split ("#" )[1 ]:
501
+ step_id = "/" .join (field_id .split ("/" )[:- 1 ])
502
+ else :
503
+ step_id = field_id .split ("#" )[0 ]
504
+ return step_id
505
+
506
+
440
507
def is_conditional_step (param_to_step : Dict [str , CWLObjectType ], parm_id : str ) -> bool :
441
508
source_step = param_to_step .get (parm_id )
442
509
if source_step is not None :
0 commit comments