2323from kfp .local import executor_input_utils , task_dispatcher # noqa: E402
2424
2525# Paths
26- COMPONENT_ROOT = os .path .dirname (os .path .dirname (os .path .abspath (__file__ )))
27- TEST_DATA = os .path .join (COMPONENT_ROOT , "tests" , "test_data" )
26+ COMPONENT_DIR = os .path .dirname (os .path .dirname (os .path .abspath (__file__ )))
27+ TEST_DATA = os .path .join (COMPONENT_DIR , "tests" , "test_data" )
2828INPUT_PATH = os .path .abspath (os .path .join (TEST_DATA , "sample_input.jsonl" ))
2929FLOW_PATH = os .path .abspath (os .path .join (TEST_DATA , "llm_test_flow.yaml" ))
3030
@@ -35,7 +35,6 @@ def _patched_construct_executor_input(component_spec, arguments, task_root, bloc
3535 Removes input artifact keys from the component spec before calling
3636 the original function, so KFP doesn't reject or try to resolve them.
3737 """
38- # Remove input artifact definitions so KFP doesn't try to resolve them
3938 saved_artifacts = dict (component_spec .input_definitions .artifacts )
4039 component_spec .input_definitions .ClearField ("artifacts" )
4140
@@ -47,16 +46,11 @@ def _patched_construct_executor_input(component_spec, arguments, task_root, bloc
4746 block_input_artifact = False ,
4847 )
4948 finally :
50- # Restore the original spec
5149 for k , v in saved_artifacts .items ():
5250 component_spec .input_definitions .artifacts [k ].CopyFrom (v )
5351
5452
55- # Save original and apply patch
5653_original_construct = executor_input_utils .construct_executor_input
57- executor_input_utils .construct_executor_input = _patched_construct_executor_input
58-
59- # Also patch run_single_task to set block_input_artifact=False
6054_original_run = task_dispatcher .run_single_task_implementation
6155
6256
@@ -66,45 +60,48 @@ def _patched_run(*args, **kwargs):
6660 return _original_run (* args , ** kwargs )
6761
6862
69- task_dispatcher .run_single_task_implementation = _patched_run
70-
71- # Initialize KFP LocalRunner
72- with tempfile .TemporaryDirectory () as pipeline_root :
73- kfp .local .init (
74- runner = kfp .local .SubprocessRunner (use_venv = False ),
75- pipeline_root = pipeline_root ,
76- )
77-
78- print (f"Input: { INPUT_PATH } " )
79- print (f"Flow: { FLOW_PATH } " )
80- print (f"Output: { pipeline_root } " )
81- print ()
82-
83- # Run component through KFP LocalRunner (not python_func)
84- task = sdg (
85- input_pvc_path = INPUT_PATH ,
86- flow_yaml_path = FLOW_PATH ,
87- model = "openai/gpt-4o-mini" ,
88- max_concurrency = 1 ,
89- temperature = 0.7 ,
90- max_tokens = 2048 ,
91- )
92-
93- # Find the output artifact
94- output_path = task .outputs ["output_artifact" ].path
95- metrics_path = task .outputs ["output_metrics" ].path
96-
97- # Print results
98- print ("\n " + "=" * 60 )
99- print ("GENERATED OUTPUT" )
100- print ("=" * 60 )
101- df = pd .read_json (output_path , lines = True )
102- pd .set_option ("display.max_colwidth" , 80 )
103- pd .set_option ("display.width" , 200 )
104- print (df .to_string (index = False ))
105-
106- print ("\n " + "=" * 60 )
107- print ("METRICS" )
108- print ("=" * 60 )
109- with open (metrics_path ) as f :
110- print (json .dumps (json .load (f ), indent = 2 ))
63+ def main ():
64+ """Run the SDG component with LLM test flow via patched LocalRunner."""
65+ executor_input_utils .construct_executor_input = _patched_construct_executor_input
66+ task_dispatcher .run_single_task_implementation = _patched_run
67+
68+ with tempfile .TemporaryDirectory () as pipeline_root :
69+ kfp .local .init (
70+ runner = kfp .local .SubprocessRunner (use_venv = False ),
71+ pipeline_root = pipeline_root ,
72+ )
73+
74+ print (f"Input: { INPUT_PATH } " )
75+ print (f"Flow: { FLOW_PATH } " )
76+ print (f"Output: { pipeline_root } " )
77+ print ()
78+
79+ task = sdg (
80+ input_pvc_path = INPUT_PATH ,
81+ flow_yaml_path = FLOW_PATH ,
82+ model = "openai/gpt-4o-mini" ,
83+ max_concurrency = 1 ,
84+ temperature = 0.7 ,
85+ max_tokens = 2048 ,
86+ )
87+
88+ output_path = task .outputs ["output_artifact" ].path
89+ metrics_path = task .outputs ["output_metrics" ].path
90+
91+ print ("\n " + "=" * 60 )
92+ print ("GENERATED OUTPUT" )
93+ print ("=" * 60 )
94+ df = pd .read_json (output_path , lines = True )
95+ pd .set_option ("display.max_colwidth" , 80 )
96+ pd .set_option ("display.width" , 200 )
97+ print (df .to_string (index = False ))
98+
99+ print ("\n " + "=" * 60 )
100+ print ("METRICS" )
101+ print ("=" * 60 )
102+ with open (metrics_path ) as f :
103+ print (json .dumps (json .load (f ), indent = 2 ))
104+
105+
106+ if __name__ == "__main__" :
107+ main ()
0 commit comments