Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dbt-athena/.changes/unreleased/Fixes-20260311-115819.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Exclude ICEBERG_FILESYSTEM_ERROR from outer retry
time: 2026-03-11T11:58:19.948752+09:00
custom:
Author: dtaniwaki
Issue: "1739"
13 changes: 7 additions & 6 deletions dbt-athena/src/dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,14 @@ def execute(
@retry(
# No need to retry if TOO_MANY_OPEN_PARTITIONS occurs.
# Otherwise, Athena throws ICEBERG_FILESYSTEM_ERROR after retry,
# because not all files are removed immediately after first try to create table
# because not all files are removed immediately after first try to create table.
# Similarly, ICEBERG_FILESYSTEM_ERROR itself should not be retried,
# as the same SQL (with the same hardcoded S3 location UUID) would fail again.
retry=retry_if_exception(
lambda e: (
False
if catch_partitions_limit and "TOO_MANY_OPEN_PARTITIONS" in str(e)
else True
)
lambda e: False
if (catch_partitions_limit and "TOO_MANY_OPEN_PARTITIONS" in str(e))
or "ICEBERG_FILESYSTEM_ERROR" in str(e)
else True
),
stop=stop_after_attempt(self._retry_config.attempt),
wait=wait_random_exponential(
Expand Down
82 changes: 81 additions & 1 deletion dbt-athena/tests/unit/test_connection_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import get_context
from unittest import mock
from unittest.mock import MagicMock

import pytest
from pyathena.error import OperationalError
from pyathena.model import AthenaQueryExecution

from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.connections import AthenaAdapterResponse
from dbt.adapters.athena.connections import AthenaAdapterResponse, AthenaCursor


class TestAthenaConnectionManager:
Expand Down Expand Up @@ -33,3 +36,80 @@ def test_data_type_code_to_name(self):
assert cm.data_type_code_to_name("array<string>") == "ARRAY"
assert cm.data_type_code_to_name("map<int, boolean>") == "MAP"
assert cm.data_type_code_to_name("DECIMAL(3, 7)") == "DECIMAL"


class TestAthenaCursorRetry:
def _make_cursor(self, num_retries=1, num_iceberg_retries=3):
"""Create a minimal AthenaCursor with mocked internals for retry testing."""
cursor = AthenaCursor.__new__(AthenaCursor)
retry_config = MagicMock()
retry_config.attempt = num_retries + 1
retry_config.max_delay = 0
retry_config.exponential_base = 1
cursor._retry_config = retry_config
cursor._connection = MagicMock()
cursor._connection.cursor_kwargs = {"num_iceberg_retries": num_iceberg_retries}
cursor._executor = ThreadPoolExecutor()
return cursor

def _make_failed_result(self, reason):
result = MagicMock()
result.state = AthenaQueryExecution.STATE_FAILED
result.state_change_reason = reason
return result

def test_iceberg_filesystem_error_is_not_retried(self):
"""ICEBERG_FILESYSTEM_ERROR should not be retried by the outer retry."""
cursor = self._make_cursor(num_retries=3)
cursor._execute = MagicMock(return_value="fake-query-id")
cursor._collect_result_set = MagicMock(
return_value=self._make_failed_result(
"ICEBERG_FILESYSTEM_ERROR: Cannot create a table on a non-empty location"
)
)

with pytest.raises(OperationalError, match="ICEBERG_FILESYSTEM_ERROR"):
cursor.execute("CREATE TABLE AS SELECT 1")

assert cursor._execute.call_count == 1

def test_too_many_open_partitions_not_retried_when_catch_enabled(self):
"""TOO_MANY_OPEN_PARTITIONS should not be retried when catch_partitions_limit=True."""
cursor = self._make_cursor(num_retries=3)
cursor._execute = MagicMock(return_value="fake-query-id")
cursor._collect_result_set = MagicMock(
return_value=self._make_failed_result("TOO_MANY_OPEN_PARTITIONS")
)

with pytest.raises(OperationalError, match="TOO_MANY_OPEN_PARTITIONS"):
cursor.execute("SELECT 1", catch_partitions_limit=True)

assert cursor._execute.call_count == 1

def test_too_many_open_partitions_is_retried_when_catch_disabled(self):
"""TOO_MANY_OPEN_PARTITIONS should be retried when catch_partitions_limit=False."""
num_retries = 2
cursor = self._make_cursor(num_retries=num_retries)
cursor._execute = MagicMock(return_value="fake-query-id")
cursor._collect_result_set = MagicMock(
return_value=self._make_failed_result("TOO_MANY_OPEN_PARTITIONS")
)

with pytest.raises(OperationalError, match="TOO_MANY_OPEN_PARTITIONS"):
cursor.execute("SELECT 1", catch_partitions_limit=False)

assert cursor._execute.call_count == num_retries + 1

def test_generic_error_is_retried_by_outer_retry(self):
"""Generic errors should be retried by the outer retry."""
num_retries = 2
cursor = self._make_cursor(num_retries=num_retries)
cursor._execute = MagicMock(return_value="fake-query-id")
cursor._collect_result_set = MagicMock(
return_value=self._make_failed_result("GENERIC_ERROR: something went wrong")
)

with pytest.raises(OperationalError, match="GENERIC_ERROR"):
cursor.execute("SELECT 1")

assert cursor._execute.call_count == num_retries + 1
Loading