Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dbt-athena/.changes/unreleased/Fixes-20260311-132641.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Handle unpartitioned models in create_table_as_with_partitions
time: 2026-03-11T13:26:41.924396+09:00
custom:
Author: dtaniwaki
Issue: "1742"
Original file line number Diff line number Diff line change
Expand Up @@ -164,35 +164,47 @@
{%- do log('CREATE NON-PARTITIONED STAGING TABLE: ' ~ tmp_relation) -%}
{%- do run_query(create_table_as(temporary, tmp_relation, compiled_code, language, true)) -%}

{% set partitions_batches = get_partition_batches(sql=tmp_relation, as_subquery=False) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}

{%- set partitioned_by = config.get('partitioned_by') -%}
{%- set dest_columns = adapter.get_columns_in_relation(tmp_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}
{%- if partitioned_by is not none -%}

{%- if loop.index == 1 -%}
{%- set create_target_relation_sql -%}
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
{%- endset -%}
{%- do run_query(create_table_as(temporary, relation, create_target_relation_sql, language)) -%}
{%- else -%}
{%- set insert_batch_partitions_sql -%}
insert into {{ relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
{%- endset -%}

{%- do run_query(insert_batch_partitions_sql) -%}
{%- endif -%}
{% set partitions_batches = get_partition_batches(sql=tmp_relation, as_subquery=False) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}

{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}

{%- if loop.index == 1 -%}
{%- set create_target_relation_sql -%}
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
{%- endset -%}
{%- do run_query(create_table_as(temporary, relation, create_target_relation_sql, language)) -%}
{%- else -%}
{%- set insert_batch_partitions_sql -%}
insert into {{ relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
{%- endset -%}

{%- do run_query(insert_batch_partitions_sql) -%}
{%- endif -%}

{%- endfor -%}

{%- endfor -%}
{%- else -%}

{%- do log('UNPARTITIONED MODEL: CREATE TARGET TABLE FROM STAGING TABLE') -%}
{%- set create_from_staging_sql -%}
select {{ dest_cols_csv }} from {{ tmp_relation }}
{%- endset -%}
{%- do run_query(create_table_as(temporary, relation, create_from_staging_sql, language)) -%}

{%- endif -%}

{%- do drop_relation(tmp_relation) -%}

Expand Down
88 changes: 88 additions & 0 deletions dbt-athena/tests/functional/adapter/test_force_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,42 @@
cross join unnest(date_array) as t2(date_column)
"""

models_unpartitioned_force_batch_sql = """
{{ config(
materialized='table',
force_batch=true
)
}}

select
random() as rnd,
id
from (
values (sequence(1, 10))
) as t1(id_array)
cross join unnest(id_array) as t2(id)
"""

models_unpartitioned_merge_force_batch_sql = """
{{ config(
table_type='iceberg',
materialized='incremental',
incremental_strategy='merge',
unique_key=['id'],
force_batch=true
)
}}
{% if is_incremental() %}
select 1 as rnd, id
from (values (sequence(1, 10))) as t1(id_array)
cross join unnest(id_array) as t2(id)
{% else %}
select 2 as rnd, id
from (values (sequence(1, 10))) as t1(id_array)
cross join unnest(id_array) as t2(id)
{% endif %}
"""

models_merge_force_batch_sql = """
{{ config(
table_type='iceberg',
Expand Down Expand Up @@ -115,6 +151,58 @@ def test__append_force_batch_param(self, project):
assert models_records_count == 212


class TestUnpartitionedForceBatch:
@pytest.fixture(scope="class")
def models(self):
return {"models_unpartitioned_force_batch.sql": models_unpartitioned_force_batch_sql}

def test__unpartitioned_force_batch(self, project):
relation_name = "models_unpartitioned_force_batch"
model_run_result_row_count_query = (
f"select count(*) as records from {project.test_schema}.{relation_name}"
)

model_run = run_dbt(["run", "--select", relation_name])
model_run_result = model_run.results[0]
assert model_run_result.status == RunStatus.Success

models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
assert models_records_count == 10


class TestUnpartitionedMergeForceBatch:
@pytest.fixture(scope="class")
def models(self):
return {
"models_unpartitioned_merge_force_batch.sql": models_unpartitioned_merge_force_batch_sql
}

def test__unpartitioned_merge_force_batch(self, project):
relation_name = "models_unpartitioned_merge_force_batch"
model_run_result_row_count_query = (
f"select count(*) as records from {project.test_schema}.{relation_name}"
)
model_run_result_distinct_query = (
f"select distinct rnd from {project.test_schema}.{relation_name}"
)

model_run = run_dbt(["run", "--select", relation_name])
model_run_result = model_run.results[0]
assert model_run_result.status == RunStatus.Success

model_update = run_dbt(["run", "--select", relation_name])
model_update_result = model_update.results[0]
assert model_update_result.status == RunStatus.Success

models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
assert models_records_count == 10

models_distinct_records = project.run_sql(model_run_result_distinct_query, fetch="all")[0][
0
]
assert models_distinct_records == 1


class TestMergeForceBatch:
@pytest.fixture(scope="class")
def models(self):
Expand Down
Loading