Skip to content

Commit b741857

Browse files
SameerMesiah97Sameer Mesiah
andauthored
Raise on unexpected terminal dbt Cloud job run states (#61300)
DbtCloudHook.wait_for_job_run_status previously returned False when a job run reached a terminal failure state (ERROR or CANCELLED), which could allow Airflow tasks to succeed silently when dbt Cloud jobs failed. This change updates the helper to raise DbtCloudJobRunException when a job run reaches an unexpected terminal state before the expected status is reached, ensuring task failure semantics correctly reflect external job failures. Call sites are updated accordingly, and on_kill now guards against propagated exceptions since cancellation confirmation is best-effort and should not affect task termination behavior. Co-authored-by: Sameer Mesiah <smesiah971@gmail.com>
1 parent 352feb2 commit b741857

File tree

4 files changed

+35
-15
lines changed

4 files changed

+35
-15
lines changed

providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -797,20 +797,32 @@ def wait_for_job_run_status(
797797
:param check_interval: Time in seconds to check on a pipeline run's status.
798798
:param timeout: Time in seconds to wait for a pipeline to reach a terminal status or the expected
799799
status.
800-
:return: Boolean indicating if the job run has reached the ``expected_status``.
800+
:return: ``True`` if the job run has reached the ``expected_status``.
801+
:raises: ``DbtCloudJobRunException`` If the job run reaches an unexpected terminal status
802+
or does not reach an expected status within the timeout.
801803
"""
802804
expected_statuses = (expected_statuses,) if isinstance(expected_statuses, int) else expected_statuses
803805

804806
DbtCloudJobRunStatus.check_is_valid(expected_statuses)
805807

806808
job_run_info = JobRunInfo(account_id=account_id, run_id=run_id)
807-
job_run_status = self.get_job_run_status(**job_run_info)
808809

809810
start_time = time.monotonic()
810811

811-
while (
812-
not DbtCloudJobRunStatus.is_terminal(job_run_status) and job_run_status not in expected_statuses
813-
):
812+
while True:
813+
job_run_status = self.get_job_run_status(**job_run_info)
814+
815+
if job_run_status in expected_statuses:
816+
return True
817+
818+
# Reached terminal failure before expected state.
819+
if DbtCloudJobRunStatus.is_terminal(job_run_status):
820+
raise DbtCloudJobRunException(
821+
f"Job run {run_id} reached terminal status "
822+
f"{DbtCloudJobRunStatus(job_run_status).name} "
823+
f"before reaching expected statuses {expected_statuses}"
824+
)
825+
814826
# Check if the job-run duration has exceeded the ``timeout`` configured.
815827
if start_time + timeout < time.monotonic():
816828
raise DbtCloudJobRunException(
@@ -820,10 +832,6 @@ def wait_for_job_run_status(
820832
# Wait to check the status of the job run based on the ``check_interval`` configured.
821833
time.sleep(check_interval)
822834

823-
job_run_status = self.get_job_run_status(**job_run_info)
824-
825-
return job_run_status in expected_statuses
826-
827835
@fallback_to_default_account
828836
def cancel_job_run(self, run_id: int, account_id: int | None = None) -> None:
829837
"""

providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,14 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
256256
return int(event["run_id"])
257257

258258
def on_kill(self) -> None:
259-
if self.run_id:
260-
self.hook.cancel_job_run(account_id=self.account_id, run_id=self.run_id)
259+
if not self.run_id:
260+
return
261261

262+
self.hook.cancel_job_run(account_id=self.account_id, run_id=self.run_id)
263+
264+
# Attempt best-effort confirmation of cancellation.
265+
try:
266+
# This can raise a DbtCloudJobRunException under normal operation.
262267
if self.hook.wait_for_job_run_status(
263268
run_id=self.run_id,
264269
account_id=self.account_id,
@@ -268,6 +273,13 @@ def on_kill(self) -> None:
268273
):
269274
self.log.info("Job run %s has been cancelled successfully.", self.run_id)
270275

276+
except DbtCloudJobRunException as exc:
277+
self.log.warning(
278+
"Failed to confirm cancellation of job run %s during task kill: %s",
279+
self.run_id,
280+
exc,
281+
)
282+
271283
@cached_property
272284
def hook(self):
273285
"""Returns DBT Cloud hook."""

providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -904,8 +904,8 @@ def test_get_job_run_with_payload(self, mock_http_run, mock_paginate, conn_id, a
904904

905905
wait_for_job_run_status_test_args = [
906906
(DbtCloudJobRunStatus.SUCCESS.value, DbtCloudJobRunStatus.SUCCESS.value, True),
907-
(DbtCloudJobRunStatus.ERROR.value, DbtCloudJobRunStatus.SUCCESS.value, False),
908-
(DbtCloudJobRunStatus.CANCELLED.value, DbtCloudJobRunStatus.SUCCESS.value, False),
907+
(DbtCloudJobRunStatus.ERROR.value, DbtCloudJobRunStatus.SUCCESS.value, "exception"),
908+
(DbtCloudJobRunStatus.CANCELLED.value, DbtCloudJobRunStatus.SUCCESS.value, "exception"),
909909
(DbtCloudJobRunStatus.RUNNING.value, DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
910910
(DbtCloudJobRunStatus.QUEUED.value, DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
911911
(DbtCloudJobRunStatus.STARTING.value, DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
@@ -943,7 +943,7 @@ def fake_sleep(seconds):
943943
):
944944
mock_job_run_status.return_value = job_run_status
945945

946-
if expected_output != "timeout":
946+
if expected_output not in ("timeout", "exception"):
947947
assert hook.wait_for_job_run_status(**config) == expected_output
948948
else:
949949
with pytest.raises(DbtCloudJobRunException):

providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ def mock_monotonic():
376376
assert mock_run_job.return_value.data["id"] == RUN_ID
377377
elif expected_output == "exception":
378378
# The operator should fail if the job run fails or is cancelled.
379-
error_message = r"has failed or has been cancelled\.$"
379+
error_message = r"reached terminal status (ERROR|CANCELLED)"
380380
with pytest.raises(DbtCloudJobRunException, match=error_message):
381381
operator.execute(context=self.mock_context)
382382
else:

0 commit comments

Comments
 (0)