Skip to content

Commit 2003872

Browse files
author
TheoS
committed
Format
1 parent 0b4f3fc commit 2003872

File tree

3 files changed

+19
-16
lines changed

3 files changed

+19
-16
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2386,9 +2386,9 @@ def _schedule_dag_run(
23862386
.where(TI.state.in_(State.unfinished) | (TI.state.is_(None)))
23872387
).all()
23882388
last_unfinished_ti = max(
2389-
unfinished_task_instances,
2390-
key=lambda ti: ti.start_date or timezone.make_aware(datetime.min),
2391-
default=None,
2389+
unfinished_task_instances,
2390+
key=lambda ti: ti.start_date or timezone.make_aware(datetime.min),
2391+
default=None,
23922392
)
23932393
for task_instance in unfinished_task_instances:
23942394
task_instance.state = TaskInstanceState.SKIPPED
@@ -2417,12 +2417,12 @@ def _schedule_dag_run(
24172417
return None
24182418
dag_run = dag_run_reloaded
24192419
callback_to_execute = dag_run.produce_dag_callback(
2420-
dag=dag,
2421-
success=False,
2422-
relevant_ti=last_unfinished_ti,
2423-
reason="timed_out",
2424-
execute=False,
2425-
)
2420+
dag=dag,
2421+
success=False,
2422+
relevant_ti=last_unfinished_ti,
2423+
reason="timed_out",
2424+
execute=False,
2425+
)
24262426

24272427
dag_run.notify_dagrun_state_changed(msg="timed_out")
24282428
if dag_run.end_date and dag_run.start_date:

airflow-core/src/airflow/models/dagrun.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,12 +1280,15 @@ def recalculate(self) -> _UnfinishedStates:
12801280
if dag.has_on_failure_callback:
12811281
unfinished_non_schedulable = (ti for ti in unfinished.tis if ti not in set(schedulable_tis))
12821282
finished_task_ids = {ti.task_id for ti in finished_tis}
1283-
blocking_ti = next(iter(
1284-
ti for ti in unfinished_non_schedulable
1285-
if ti.task and not (
1286-
ti.task.get_direct_relative_ids(upstream=True).isdisjoint(finished_task_ids)
1287-
)
1288-
), None)
1283+
blocking_ti = next(
1284+
iter(
1285+
ti
1286+
for ti in unfinished_non_schedulable
1287+
if ti.task
1288+
and not (ti.task.get_direct_relative_ids(upstream=True).isdisjoint(finished_task_ids))
1289+
),
1290+
None,
1291+
)
12891292
callback = self.produce_dag_callback(
12901293
dag=dag,
12911294
success=False,

airflow-core/tests/unit/models/test_dagrun.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ def on_failure_callable(context):
490490
schedule=datetime.timedelta(days=1),
491491
start_date=datetime.datetime(2017, 1, 1),
492492
on_failure_callback=on_failure_callable,
493-
) as dag:
493+
):
494494
up = EmptyOperator(task_id="upstream")
495495
middle = EmptyOperator(task_id="wrong")
496496
down = EmptyOperator(task_id="downstream")

0 commit comments

Comments
 (0)