File tree Expand file tree Collapse file tree 2 files changed +5
-4
lines changed
Expand file tree Collapse file tree 2 files changed +5
-4
lines changed Original file line number Diff line number Diff line change @@ -1248,7 +1248,7 @@ def recalculate(self) -> _UnfinishedStates:
12481248 if dag .has_on_success_callback :
12491249 last_succeeded_ti : TI | None = max (
12501250 (ti for ti in tis if ti .state == TaskInstanceState .SUCCESS ),
1251- key = lambda ti : ti .end_date ,
1251+ key = lambda ti : ti .end_date or timezone . make_aware ( datetime . min ) ,
12521252 default = None ,
12531253 )
12541254 callback = self .produce_dag_callback (
@@ -1281,7 +1281,8 @@ def recalculate(self) -> _UnfinishedStates:
12811281 unfinished_non_schedulable = (ti for ti in unfinished .tis if ti not in set (schedulable_tis ))
12821282 finished_task_ids = {ti .task_id for ti in finished_tis }
12831283 blocking_ti = next (iter (
1284- ti for ti in unfinished_non_schedulable if not (
1284+ ti for ti in unfinished_non_schedulable
1285+ if ti .task and not (
12851286 ti .task .get_direct_relative_ids (upstream = True ).isdisjoint (finished_task_ids )
12861287 )
12871288 ), None )
Original file line number Diff line number Diff line change @@ -498,7 +498,7 @@ def on_failure_callable(context):
498498 middle .trigger_rule = TriggerRule .ONE_FAILED
499499 middle .set_upstream (up )
500500 middle .set_downstream (down )
501-
501+
502502 dr = dag_maker .create_dagrun ()
503503
504504 ti_up : TI = dr .get_task_instance (task_id = up .task_id , session = session )
@@ -507,7 +507,7 @@ def on_failure_callable(context):
507507 ti_middle .set_state (state = None , session = session )
508508 ti_middle .task .trigger_rule = "invalid"
509509
510- serialized_dag = dr .get_dag ()
510+ serialized_dag = dr .get_dag ()
511511
512512 with mock .patch .object (dr , "execute_dag_callbacks" ) as execute_dag_callbacks :
513513 _ , callback = dr .update_state ()
You can’t perform that action at this time.
0 commit comments