@@ -41,78 +41,81 @@ def pytask_execute_build(session):
4141
4242 while session .scheduler .is_active ():
4343
44- newly_collected_reports = []
45- n_new_tasks = session .config ["n_workers" ] - len (running_tasks )
46-
47- if n_new_tasks >= 1 :
48- ready_tasks = list (session .scheduler .get_ready (n_new_tasks ))
49- else :
50- ready_tasks = []
51-
52- for task_name in ready_tasks :
53- task = session .dag .nodes [task_name ]["task" ]
54- session .hook .pytask_execute_task_log_start (
55- session = session , task = task
56- )
57- try :
58- session .hook .pytask_execute_task_setup (
59- session = session , task = task
60- )
61- except Exception :
62- report = ExecutionReport .from_task_and_exception (
63- task , sys .exc_info ()
64- )
65- newly_collected_reports .append (report )
66- session .scheduler .done (task_name )
44+ try :
45+ newly_collected_reports = []
46+ n_new_tasks = session .config ["n_workers" ] - len (running_tasks )
47+
48+ if n_new_tasks >= 1 :
49+ ready_tasks = list (session .scheduler .get_ready (n_new_tasks ))
6750 else :
68- running_tasks [task_name ] = session .hook .pytask_execute_task (
69- session = session , task = task
70- )
51+ ready_tasks = []
7152
72- for task_name in list (running_tasks ):
73- future = running_tasks [task_name ]
74- if future .done () and future .exception () is not None :
53+ for task_name in ready_tasks :
7554 task = session .dag .nodes [task_name ]["task" ]
76- exception = future .exception ()
77- newly_collected_reports .append (
78- ExecutionReport .from_task_and_exception (
79- task , (type (exception ), exception , None )
80- )
55+ session .hook .pytask_execute_task_log_start (
56+ session = session , task = task
8157 )
82- running_tasks .pop (task_name )
83- session .scheduler .done (task_name )
84- elif future .done () and future .exception () is None :
85- task = session .dag .nodes [task_name ]["task" ]
8658 try :
87- session .hook .pytask_execute_task_teardown (
59+ session .hook .pytask_execute_task_setup (
8860 session = session , task = task
8961 )
9062 except Exception :
9163 report = ExecutionReport .from_task_and_exception (
9264 task , sys .exc_info ()
9365 )
66+ newly_collected_reports .append (report )
67+ session .scheduler .done (task_name )
9468 else :
95- report = ExecutionReport .from_task (task )
69+ running_tasks [task_name ] = session .hook .pytask_execute_task (
70+ session = session , task = task
71+ )
72+
73+ for task_name in list (running_tasks ):
74+ future = running_tasks [task_name ]
75+ if future .done () and future .exception () is not None :
76+ task = session .dag .nodes [task_name ]["task" ]
77+ exception = future .exception ()
78+ newly_collected_reports .append (
79+ ExecutionReport .from_task_and_exception (
80+ task , (type (exception ), exception , None )
81+ )
82+ )
83+ running_tasks .pop (task_name )
84+ session .scheduler .done (task_name )
85+ elif future .done () and future .exception () is None :
86+ task = session .dag .nodes [task_name ]["task" ]
87+ try :
88+ session .hook .pytask_execute_task_teardown (
89+ session = session , task = task
90+ )
91+ except Exception :
92+ report = ExecutionReport .from_task_and_exception (
93+ task , sys .exc_info ()
94+ )
95+ else :
96+ report = ExecutionReport .from_task (task )
97+
98+ running_tasks .pop (task_name )
99+ newly_collected_reports .append (report )
100+ session .scheduler .done (task_name )
101+ else :
102+ pass
103+
104+ for report in newly_collected_reports :
105+ session .hook .pytask_execute_task_process_report (
106+ session = session , report = report
107+ )
108+ session .hook .pytask_execute_task_log_end (
109+ session = session , task = task , report = report
110+ )
111+ reports .append (report )
96112
97- running_tasks .pop (task_name )
98- newly_collected_reports .append (report )
99- session .scheduler .done (task_name )
113+ if session .should_stop :
114+ break
100115 else :
101- pass
102-
103- for report in newly_collected_reports :
104- session .hook .pytask_execute_task_process_report (
105- session = session , report = report
106- )
107- session .hook .pytask_execute_task_log_end (
108- session = session , task = task , report = report
109- )
110- reports .append (report )
111-
112- if session .should_stop :
116+ time .sleep (session .config ["delay" ])
117+ except KeyboardInterrupt :
113118 break
114- else :
115- time .sleep (session .config ["delay" ])
116119
117120 return True
118121
0 commit comments