Skip to content

Commit febf1fe

Browse files
olegkachur-eOleg Kachur
andauthored
fix DataprocSubmitTrigger deferred tasks stuck forever (#62082)
- To prevent tasks getting stuck in the deffered state, as a result of sync_hook calls thread stuck on retrieveing credentials. Observed on secrets storage connection retrival. Co-authored-by: Oleg Kachur <kachur@google.com>
1 parent 04b921b commit febf1fe

File tree

2 files changed

+15
-10
lines changed

2 files changed

+15
-10
lines changed

providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,13 @@ async def safe_to_cancel(self) -> bool:
187187
return task_state != TaskInstanceState.DEFERRED
188188

189189
async def run(self):
190+
hook = self.get_async_hook()
191+
# Trigger client cache with sync call get_credentials(), evaluated once.
192+
await hook.get_job_client(region=self.region)
193+
190194
try:
191195
while True:
192-
job = await self.get_async_hook().get_job(
193-
project_id=self.project_id, region=self.region, job_id=self.job_id
194-
)
196+
job = await hook.get_job(project_id=self.project_id, region=self.region, job_id=self.job_id)
195197
state = job.status.state
196198
self.log.info("Dataproc job: %s is in state: %s", self.job_id, state)
197199
if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR):

providers/google/tests/unit/google/cloud/triggers/test_dataproc.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -587,10 +587,8 @@ def test_submit_trigger_serialization(self, submit_trigger):
587587
async def test_submit_trigger_run_success(self, mock_get_async_hook, submit_trigger):
588588
"""Test the trigger correctly handles a job completion."""
589589
mock_job = Job(status=JobStatus(state=JobStatus.State.DONE))
590-
future = asyncio.Future()
591-
future.set_result(mock_job)
592-
mock_get_async_hook.return_value.get_job.return_value = future
593-
590+
mock_get_async_hook.return_value.get_job_client = mock.AsyncMock()
591+
mock_get_async_hook.return_value.get_job = mock.AsyncMock(return_value=mock_job)
594592
async_gen = submit_trigger.run()
595593
event = await async_gen.asend(None)
596594
expected_event = TriggerEvent(
@@ -603,9 +601,12 @@ async def test_submit_trigger_run_success(self, mock_get_async_hook, submit_trig
603601
async def test_submit_trigger_run_error(self, mock_get_async_hook, submit_trigger):
604602
"""Test the trigger correctly handles a job error."""
605603
mock_job = Job(status=JobStatus(state=JobStatus.State.ERROR))
606-
future = asyncio.Future()
607-
future.set_result(mock_job)
608-
mock_get_async_hook.return_value.get_job.return_value = future
604+
mock_get_async_hook.return_value.get_job_client = mock.AsyncMock()
605+
mock_get_async_hook.return_value.get_job = mock.AsyncMock(return_value=mock_job)
606+
607+
# future = asyncio.Future()
608+
# future.set_result(mock_job)
609+
# mock_get_async_hook.return_value.get_job.return_value = future
609610

610611
async_gen = submit_trigger.run()
611612
event = await async_gen.asend(None)
@@ -625,6 +626,8 @@ async def test_submit_trigger_run_cancelled(
625626
"""Test the trigger correctly handles an asyncio.CancelledError."""
626627
mock_safe_to_cancel.return_value = is_safe_to_cancel
627628
mock_async_hook = mock_get_async_hook.return_value
629+
mock_async_hook.get_job_client = mock.AsyncMock()
630+
628631
mock_async_hook.get_job.side_effect = asyncio.CancelledError
629632

630633
mock_sync_hook = mock_get_sync_hook.return_value

0 commit comments

Comments
 (0)