diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/impl.py b/dbt-snowflake/src/dbt/adapters/snowflake/impl.py index 00803f713..90d70e0ec 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/impl.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/impl.py @@ -67,6 +67,7 @@ class SnowflakeConfig(AdapterConfig): tmp_relation_type: Optional[str] = None merge_update_columns: Optional[str] = None target_lag: Optional[str] = None + scheduler: Optional[str] = None row_access_policy: Optional[str] = None table_tag: Optional[str] = None immutable_where: Optional[str] = None @@ -562,6 +563,8 @@ def describe_dynamic_table( available_columns = [c.lower() for c in dt_table.column_names] if "initialization_warehouse" in available_columns: base_columns.insert(base_columns.index("warehouse") + 1, "initialization_warehouse") + if "scheduler" in available_columns: + base_columns.append("scheduler") selected = dt_table.select(base_columns) diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation.py index 50112d37e..0108f9430 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation.py @@ -22,6 +22,7 @@ SnowflakeDynamicTableConfigChangeset, SnowflakeDynamicTableInitializationWarehouseConfigChange, SnowflakeDynamicTableRefreshModeConfigChange, + SnowflakeDynamicTableSchedulerConfigChange, SnowflakeDynamicTableTargetLagConfigChange, SnowflakeDynamicTableWarehouseConfigChange, SnowflakeDynamicTableImmutableWhereConfigChange, @@ -105,7 +106,10 @@ def dynamic_table_config_changeset( config_change_collection = SnowflakeDynamicTableConfigChangeset() - if new_dynamic_table.target_lag != existing_dynamic_table.target_lag: + if ( + new_dynamic_table.target_lag != existing_dynamic_table.target_lag + and new_dynamic_table.target_lag is not None + ): config_change_collection.target_lag = SnowflakeDynamicTableTargetLagConfigChange( action=RelationConfigChangeAction.alter, # type:ignore context=new_dynamic_table.target_lag, @@ -139,6 +143,12 @@ def dynamic_table_config_changeset( context=new_dynamic_table.refresh_mode, ) + if new_dynamic_table.scheduler != existing_dynamic_table.scheduler: + config_change_collection.scheduler = SnowflakeDynamicTableSchedulerConfigChange( + action=RelationConfigChangeAction.alter, # type:ignore + context=new_dynamic_table.scheduler, + ) + if new_dynamic_table.immutable_where != existing_dynamic_table.immutable_where: config_change_collection.immutable_where = ( SnowflakeDynamicTableImmutableWhereConfigChange( diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/__init__.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/__init__.py index cd0456094..6c31aec90 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/__init__.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/__init__.py @@ -1,9 +1,11 @@ from dbt.adapters.snowflake.relation_configs.dynamic_table import ( RefreshMode, + Scheduler, SnowflakeDynamicTableConfig, SnowflakeDynamicTableConfigChangeset, SnowflakeDynamicTableInitializationWarehouseConfigChange, SnowflakeDynamicTableRefreshModeConfigChange, + SnowflakeDynamicTableSchedulerConfigChange, SnowflakeDynamicTableWarehouseConfigChange, SnowflakeDynamicTableTargetLagConfigChange, SnowflakeDynamicTableImmutableWhereConfigChange, diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 64e87cf43..b40ab36cb 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -33,6 +33,11 @@ def default(cls) -> Self: return cls("ON_CREATE") +class Scheduler(StrEnum): + ENABLE = "ENABLE" + DISABLE = "DISABLE" + + @dataclass(frozen=True, eq=True, unsafe_hash=True) class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): """ @@ -47,6 +52,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): - snowflake_initialization_warehouse: the name of the warehouse used for the initializations and reinitializations of the dynamic table - refresh_mode: specifies the refresh type for the dynamic table - initialize: specifies the behavior of the initial refresh of the dynamic table + - scheduler: specifies whether to ENABLE or DISABLE the dynamic table's scheduler - cluster_by: specifies the columns to cluster on - immutable_where: specifies an immutability constraint expression - transient: specifies whether the dynamic table is transient (no fail-safe). snowflake_default_transient_dynamic_tables determines the default value @@ -58,11 +64,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): schema_name: str database_name: str query: str - target_lag: str snowflake_warehouse: str + target_lag: Optional[str] = None snowflake_initialization_warehouse: Optional[str] = None refresh_mode: Optional[RefreshMode] = RefreshMode.default() initialize: Optional[Initialize] = Initialize.default() + scheduler: Optional[Scheduler] = None row_access_policy: Optional[str] = None table_tag: Optional[str] = None cluster_by: Optional[Union[str, list[str]]] = None @@ -89,6 +96,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self: ), "refresh_mode": config_dict.get("refresh_mode"), "initialize": config_dict.get("initialize"), + "scheduler": config_dict.get("scheduler"), "row_access_policy": config_dict.get("row_access_policy"), "table_tag": config_dict.get("table_tag"), "cluster_by": config_dict.get("cluster_by"), @@ -129,6 +137,13 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any if initialize := relation_config.config.extra.get("initialize"): # type:ignore config_dict["initialize"] = initialize.upper() + if scheduler := relation_config.config.extra.get("scheduler"): # type:ignore + config_dict["scheduler"] = scheduler.upper() + elif config_dict.get("target_lag"): + config_dict["scheduler"] = "ENABLE" + else: + config_dict["scheduler"] = "DISABLE" + return config_dict @classmethod @@ -163,15 +178,21 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, else: cluster_by = None + scheduler = dynamic_table.get("scheduler") + target_lag = dynamic_table.get("target_lag") + if scheduler is None: + scheduler = "ENABLE" if target_lag else "DISABLE" + config_dict = { "name": dynamic_table.get("name"), "schema_name": dynamic_table.get("schema_name"), "database_name": dynamic_table.get("database_name"), "query": dynamic_table.get("text"), - "target_lag": dynamic_table.get("target_lag"), + "target_lag": target_lag, "snowflake_warehouse": dynamic_table.get("warehouse"), "snowflake_initialization_warehouse": init_warehouse, "refresh_mode": dynamic_table.get("refresh_mode"), + "scheduler": scheduler, "row_access_policy": dynamic_table.get("row_access_policy"), "table_tag": dynamic_table.get("table_tag"), "cluster_by": cluster_by, @@ -239,6 +260,15 @@ def requires_full_refresh(self) -> bool: return False +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeDynamicTableSchedulerConfigChange(RelationConfigChange): + context: Optional[str] = None + + @property + def requires_full_refresh(self) -> bool: + return False + + @dataclass(frozen=True, eq=True, unsafe_hash=True) class SnowflakeDynamicTableTransientConfigChange(RelationConfigChange): context: Optional[bool] = None @@ -257,6 +287,7 @@ class SnowflakeDynamicTableConfigChangeset: SnowflakeDynamicTableInitializationWarehouseConfigChange ] = None refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None + scheduler: Optional[SnowflakeDynamicTableSchedulerConfigChange] = None immutable_where: Optional[SnowflakeDynamicTableImmutableWhereConfigChange] = None cluster_by: Optional[SnowflakeDynamicTableClusterByConfigChange] = None transient: Optional[SnowflakeDynamicTableTransientConfigChange] = None @@ -277,6 +308,7 @@ def requires_full_refresh(self) -> bool: else False ), self.refresh_mode.requires_full_refresh if self.refresh_mode else False, + self.scheduler.requires_full_refresh if self.scheduler else False, self.immutable_where.requires_full_refresh if self.immutable_where else False, self.cluster_by.requires_full_refresh if self.cluster_by else False, self.transient.requires_full_refresh if self.transient else False, @@ -291,6 +323,7 @@ def has_changes(self) -> bool: self.snowflake_warehouse, self.snowflake_initialization_warehouse, self.refresh_mode, + self.scheduler, self.immutable_where, self.cluster_by, self.transient, diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/materializations/dynamic_table.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/materializations/dynamic_table.sql index 2ab3fb9dd..de1f27671 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/materializations/dynamic_table.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/materializations/dynamic_table.sql @@ -15,6 +15,16 @@ {{ dynamic_table_execute_build_sql(build_sql, existing_relation, target_relation) }} {% endif %} + {%- set dynamic_table = target_relation.from_config(config.model) -%} + {% set scheduler_is_disabled = (dynamic_table.scheduler is not none and dynamic_table.scheduler | upper == 'DISABLE') %} + {% set scheduler_defaults_to_disabled = (dynamic_table.scheduler is none and dynamic_table.target_lag is none) %} + {% set needs_refresh = scheduler_is_disabled or scheduler_defaults_to_disabled %} + {% if build_sql != '' and needs_refresh %} + {% call statement(name="refresh") %} + {{ snowflake__refresh_dynamic_table(target_relation) }} + {% endcall %} + {% endif %} + {{ run_hooks(post_hooks) }} {% do unset_query_tag(query_tag) %} diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql index 4badd0aa5..7dbc18b26 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql @@ -17,20 +17,23 @@ {%- if snowflake_warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%} {%- set snowflake_initialization_warehouse = configuration_changes.snowflake_initialization_warehouse -%} {%- if snowflake_initialization_warehouse and snowflake_initialization_warehouse.context -%}{{- log('Applying UPDATE INITIALIZATION_WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%} + {%- set scheduler = configuration_changes.scheduler -%} + {%- if scheduler -%}{{- log('Applying UPDATE SCHEDULER to: ' ~ existing_relation) -}}{%- endif -%} {%- set immutable_where = configuration_changes.immutable_where -%} {%- if immutable_where and immutable_where.context -%}{{- log('Applying UPDATE IMMUTABLE WHERE to: ' ~ existing_relation) -}}{%- endif -%} {%- set cluster_by = configuration_changes.cluster_by -%} {%- if cluster_by and cluster_by.context -%}{{- log('Applying UPDATE CLUSTER BY to: ' ~ existing_relation) -}}{%- endif -%} {#- Determine what SET changes we have -#} - {%- set has_set_changes = target_lag or snowflake_warehouse or (snowflake_initialization_warehouse and snowflake_initialization_warehouse.context) or (immutable_where and immutable_where.context) -%} + {%- set has_set_changes = target_lag or snowflake_warehouse or (snowflake_initialization_warehouse and snowflake_initialization_warehouse.context) or scheduler or (immutable_where and immutable_where.context) -%} {#- Handle SET operations -#} {% if has_set_changes %} alter dynamic table {{ existing_relation }} set - {% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %} + {% if target_lag and target_lag.context %}target_lag = '{{ target_lag.context }}'{% endif %} {% if snowflake_warehouse %}warehouse = {{ snowflake_warehouse.context }}{% endif %} {% if snowflake_initialization_warehouse and snowflake_initialization_warehouse.context %}initialization_warehouse = {{ snowflake_initialization_warehouse.context }}{% endif %} + {% if scheduler %}scheduler = '{{ scheduler.context }}'{% endif %} {% if immutable_where and immutable_where.context %}immutable where ({{ immutable_where.context }}){% endif %} {% endif %} diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index c3a893c12..222ed86e2 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -41,11 +41,16 @@ {%- endif -%} {%- set transient_keyword = 'transient ' if is_transient else '' -%} create {{ transient_keyword }}dynamic table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' + {% if dynamic_table.target_lag is not none %}target_lag = '{{ dynamic_table.target_lag }}'{% endif %} warehouse = {{ dynamic_table.snowflake_warehouse }} {{ optional('initialization_warehouse', dynamic_table.snowflake_initialization_warehouse) }} {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} + {% if dynamic_table.scheduler is not none %} + scheduler = '{{ dynamic_table.scheduler }}' + {% elif dynamic_table.target_lag is none %} + scheduler = 'DISABLE' + {% endif %} {{ optional('with row access policy', dynamic_table.row_access_policy, equals_char='') }} {{ optional('with tag', dynamic_table.table_tag, quote_char='(', equals_char='') }} {{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }} @@ -77,7 +82,7 @@ {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} create dynamic iceberg table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' + {% if dynamic_table.target_lag is not none %}target_lag = '{{ dynamic_table.target_lag }}'{% endif %} warehouse = {{ dynamic_table.snowflake_warehouse }} {{ optional('initialization_warehouse', dynamic_table.snowflake_initialization_warehouse) }} {{ optional('external_volume', catalog_relation.external_volume, "'") }} @@ -85,6 +90,11 @@ base_location = '{{ catalog_relation.base_location }}' {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} + {% if dynamic_table.scheduler is not none %} + scheduler = '{{ dynamic_table.scheduler }}' + {% elif dynamic_table.target_lag is none %} + scheduler = 'DISABLE' + {% endif %} {{ optional('row_access_policy', dynamic_table.row_access_policy) }} {{ optional('table_tag', dynamic_table.table_tag) }} {{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }} diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index 3819c383c..cb4f49964 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -53,12 +53,19 @@ {%- set is_transient = false -%} {%- endif -%} {%- set transient_keyword = 'transient ' if is_transient else '' -%} + {%- set copy_grants = config.get('copy_grants', default=false) -%} create or replace {{ transient_keyword }}dynamic table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' + {% if copy_grants -%} copy grants {%- endif %} + {% if dynamic_table.target_lag is not none %}target_lag = '{{ dynamic_table.target_lag }}'{% endif %} warehouse = {{ dynamic_table.snowflake_warehouse }} {{ optional('initialization_warehouse', dynamic_table.snowflake_initialization_warehouse) }} {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} + {% if dynamic_table.scheduler is not none %} + scheduler = '{{ dynamic_table.scheduler }}' + {% elif dynamic_table.target_lag is none %} + scheduler = 'DISABLE' + {% endif %} {{ optional('with row access policy', dynamic_table.row_access_policy, equals_char='') }} {{ optional('with tag', dynamic_table.table_tag, quote_char='(', equals_char='') }} {{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }} @@ -88,9 +95,11 @@ create or replace {{ transient_keyword }}dynamic table {{ relation }} -#} {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} +{%- set copy_grants = config.get('copy_grants', default=false) -%} create or replace dynamic iceberg table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' + {% if copy_grants -%} copy grants {%- endif %} + {% if dynamic_table.target_lag is not none %}target_lag = '{{ dynamic_table.target_lag }}'{% endif %} warehouse = {{ dynamic_table.snowflake_warehouse }} {{ optional('initialization_warehouse', dynamic_table.snowflake_initialization_warehouse) }} {{ optional('external_volume', catalog_relation.external_volume, "'") }} @@ -98,6 +107,11 @@ create or replace dynamic iceberg table {{ relation }} base_location = '{{ catalog_relation.base_location }}' {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} + {% if dynamic_table.scheduler is not none %} + scheduler = '{{ dynamic_table.scheduler }}' + {% elif dynamic_table.target_lag is none %} + scheduler = 'DISABLE' + {% endif %} {{ optional('row_access_policy', dynamic_table.row_access_policy) }} {{ optional('table_tag', dynamic_table.table_tag) }} {{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }} diff --git a/dbt-snowflake/tests/functional/relation_tests/dynamic_table_tests/models.py b/dbt-snowflake/tests/functional/relation_tests/dynamic_table_tests/models.py index 79969b258..f4ef639fa 100644 --- a/dbt-snowflake/tests/functional/relation_tests/dynamic_table_tests/models.py +++ b/dbt-snowflake/tests/functional/relation_tests/dynamic_table_tests/models.py @@ -310,6 +310,31 @@ """ +# Copy Grants fixtures +DYNAMIC_TABLE_COPY_GRANTS = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='INCREMENTAL', + copy_grants=True, +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_NO_COPY_GRANTS = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='INCREMENTAL', + copy_grants=False, +) }} +select * from {{ ref('my_seed') }} +""" + + # Transient dynamic table fixtures DYNAMIC_TABLE_TRANSIENT = """ {{ config( @@ -345,3 +370,56 @@ ) }} select * from {{ ref('my_seed') }} """ + + +# Scheduler fixtures +DYNAMIC_TABLE_SCHEDULER_DISABLED = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + scheduler='DISABLE', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_SCHEDULER_ENABLED = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + scheduler='ENABLE', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_NO_TARGET_LAG = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_TARGET_LAG_ONLY = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_SCHEDULER_DISABLED_TO_ENABLED = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + scheduler='ENABLE', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/dbt-snowflake/tests/functional/relation_tests/dynamic_table_tests/test_basic.py b/dbt-snowflake/tests/functional/relation_tests/dynamic_table_tests/test_basic.py index a4b5b7c36..93669cd0e 100644 --- a/dbt-snowflake/tests/functional/relation_tests/dynamic_table_tests/test_basic.py +++ b/dbt-snowflake/tests/functional/relation_tests/dynamic_table_tests/test_basic.py @@ -3,7 +3,7 @@ from dbt.tests.util import assert_message_in_logs, run_dbt, run_dbt_and_capture from tests.functional.relation_tests.dynamic_table_tests import models -from tests.functional.utils import query_relation_type +from tests.functional.utils import query_relation_type, update_model class TestBasic: @@ -32,6 +32,45 @@ def test_dynamic_table_full_refresh(self, project): assert query_relation_type(project, "my_dynamic_iceberg_table") == "dynamic_table" +class TestDynamicTableCopyGrants: + + DT_COPY_GRANTS = "my_dynamic_table_copy_grants" + DT_NO_COPY_GRANTS = "my_dynamic_table_no_copy_grants" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + f"{self.DT_COPY_GRANTS}.sql": models.DYNAMIC_TABLE_COPY_GRANTS, + f"{self.DT_NO_COPY_GRANTS}.sql": models.DYNAMIC_TABLE_NO_COPY_GRANTS, + } + + def test_copy_grants_in_replace_ddl(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + + _, logs = run_dbt_and_capture(["--debug", "run", "--full-refresh"]) + + copy_grants_qualified = ( + f"{project.database}.{project.test_schema}.{self.DT_COPY_GRANTS}" + ) + no_copy_grants_qualified = ( + f"{project.database}.{project.test_schema}.{self.DT_NO_COPY_GRANTS}" + ) + + assert_message_in_logs( + f"create or replace dynamic table {copy_grants_qualified}", logs + ) + assert_message_in_logs("copy grants", logs) + + assert_message_in_logs( + f"create or replace dynamic table {no_copy_grants_qualified}", logs + ) + + class TestAutoConfigDoesntFullRefresh: """ AUTO refresh_strategy will be compared accurately with both INCREMENTAL and FULL. @@ -71,3 +110,151 @@ def test_auto_config_doesnt_full_refresh(self, project, test_dt): f"No configuration changes were identified on: `{model_qualified_name}`. Continuing.", logs, ) + + +class TestSchedulerDisabled: + """Verify SCHEDULER=DISABLED appears in DDL and REFRESH is called after build.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_dt_scheduler_disabled.sql": models.DYNAMIC_TABLE_SCHEDULER_DISABLED, + } + + def test_scheduler_disabled_in_create_ddl(self, project): + run_dbt(["seed"]) + _, logs = run_dbt_and_capture(["--debug", "run"]) + + model_qualified = ( + f"{project.database}.{project.test_schema}.MY_DT_SCHEDULER_DISABLED" + ) + assert_message_in_logs("scheduler = 'DISABLE'", logs) + assert_message_in_logs("alter dynamic table", logs) + assert_message_in_logs("Applying REFRESH to:", logs) + assert query_relation_type(project, "my_dt_scheduler_disabled") == "dynamic_table" + + +class TestSchedulerEnabled: + """Verify SCHEDULER=ENABLE with target_lag works.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_dt_scheduler_enabled.sql": models.DYNAMIC_TABLE_SCHEDULER_ENABLED, + } + + def test_scheduler_enabled_in_create_ddl(self, project): + run_dbt(["seed"]) + _, logs = run_dbt_and_capture(["--debug", "run"]) + + assert_message_in_logs("scheduler = 'ENABLE'", logs) + assert_message_in_logs("target_lag = '2 minutes'", logs) + assert_message_in_logs("Applying REFRESH to:", logs, False) + assert query_relation_type(project, "my_dt_scheduler_enabled") == "dynamic_table" + + +class TestNoTargetLagDefaultsSchedulerDisabled: + """Verify missing target_lag defaults scheduler to DISABLE and REFRESH is called.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_dt_no_target_lag.sql": models.DYNAMIC_TABLE_NO_TARGET_LAG, + } + + def test_no_target_lag_defaults_scheduler_disabled(self, project): + run_dbt(["seed"]) + _, logs = run_dbt_and_capture(["--debug", "run"]) + + assert_message_in_logs("scheduler = 'DISABLE'", logs) + assert_message_in_logs("alter dynamic table", logs) + assert_message_in_logs("Applying REFRESH to:", logs) + assert query_relation_type(project, "my_dt_no_target_lag") == "dynamic_table" + + +class TestSchedulerInReplaceDDL: + """Verify scheduler is preserved in CREATE OR REPLACE.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_dt_scheduler_replace.sql": models.DYNAMIC_TABLE_SCHEDULER_DISABLED, + } + + def test_scheduler_in_replace_ddl(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + + _, logs = run_dbt_and_capture(["--debug", "run", "--full-refresh"]) + + assert_message_in_logs("scheduler = 'DISABLE'", logs) + assert query_relation_type(project, "my_dt_scheduler_replace") == "dynamic_table" + + +class TestSchedulerConfigChange: + """Verify scheduler can be changed from DISABLED to ENABLED via ALTER.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "dynamic_table_scheduler.sql": models.DYNAMIC_TABLE_SCHEDULER_ENABLED, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + @pytest.fixture(scope="function", autouse=True) + def setup_class(self, project): + run_dbt(["seed"]) + yield + project.run_sql(f"drop schema if exists {project.test_schema} cascade") + + @pytest.fixture(scope="function", autouse=True) + def setup_method(self, project, setup_class): + run_dbt(["run", "--full-refresh"]) + yield + update_model(project, "dynamic_table_scheduler", models.DYNAMIC_TABLE_SCHEDULER_ENABLED) + + def test_alter_scheduler_to_disabled(self, project): + """Verify scheduler can be altered from ENABLE to DISABLE.""" + update_model(project, "dynamic_table_scheduler", models.DYNAMIC_TABLE_SCHEDULER_DISABLED) + _, logs = run_dbt_and_capture(["--debug", "run"]) + + assert_message_in_logs("Applying UPDATE SCHEDULER to:", logs) + assert_message_in_logs("scheduler = 'DISABLE'", logs) + assert_message_in_logs("alter dynamic table", logs) + assert_message_in_logs("Applying REFRESH to:", logs) + + def test_alter_scheduler_to_enabled_with_target_lag(self, project): + """Verify scheduler changes to ENABLE when only target_lag is added.""" + update_model(project, "dynamic_table_scheduler", models.DYNAMIC_TABLE_SCHEDULER_DISABLED) + run_dbt(["run"]) + + update_model(project, "dynamic_table_scheduler", models.DYNAMIC_TABLE_TARGET_LAG_ONLY) + _, logs = run_dbt_and_capture(["--debug", "run"]) + + assert_message_in_logs("Applying UPDATE SCHEDULER to:", logs) + assert_message_in_logs("scheduler = 'ENABLE'", logs) + assert_message_in_logs("target_lag = '2 minutes'", logs) + assert_message_in_logs("Applying REFRESH to:", logs, False) diff --git a/dbt-snowflake/tests/unit/test_alter_relation_comment_macro.py b/dbt-snowflake/tests/unit/test_alter_relation_comment_macro.py index 4351b4866..76ca5906b 100644 --- a/dbt-snowflake/tests/unit/test_alter_relation_comment_macro.py +++ b/dbt-snowflake/tests/unit/test_alter_relation_comment_macro.py @@ -1,13 +1,18 @@ +import os import unittest from unittest import mock import re from jinja2 import Environment, FileSystemLoader +MACROS_DIR = os.path.normpath( + os.path.join(os.path.dirname(__file__), "../../src/dbt/include/snowflake/macros") +) + class TestSnowflakeAlterRelationCommentMacro(unittest.TestCase): def setUp(self): self.jinja_env = Environment( - loader=FileSystemLoader("src/dbt/include/snowflake/macros"), + loader=FileSystemLoader(MACROS_DIR), extensions=[ "jinja2.ext.do", ], diff --git a/dbt-snowflake/tests/unit/test_dynamic_table_config.py b/dbt-snowflake/tests/unit/test_dynamic_table_config.py index ddadca02b..d08049bb2 100644 --- a/dbt-snowflake/tests/unit/test_dynamic_table_config.py +++ b/dbt-snowflake/tests/unit/test_dynamic_table_config.py @@ -3,6 +3,7 @@ - snowflake_initialization_warehouse parameter - immutable_where parameter - transient parameter +- scheduler parameter """ import pytest @@ -13,6 +14,7 @@ SnowflakeDynamicTableConfigChangeset, SnowflakeDynamicTableImmutableWhereConfigChange, SnowflakeDynamicTableInitializationWarehouseConfigChange, + SnowflakeDynamicTableSchedulerConfigChange, SnowflakeDynamicTableTransientConfigChange, ) @@ -474,3 +476,269 @@ def test_no_config_ignores_existing_transient_false(self): if changeset is not None: assert changeset.transient is None + + +class TestSchedulerOptional: + """Tests to verify scheduler is an optional parameter for dynamic tables.""" + + def test_scheduler_is_optional(self): + """scheduler should be optional and default to None.""" + config = SnowflakeDynamicTableConfig.from_dict( + { + "name": "test_table", + "schema_name": "test_schema", + "database_name": "test_db", + "query": "SELECT 1", + "target_lag": "1 hour", + "snowflake_warehouse": "MY_WH", + } + ) + assert config.scheduler is None + + def test_scheduler_can_be_set_enable(self): + """scheduler should be settable to ENABLE.""" + config = SnowflakeDynamicTableConfig.from_dict( + { + "name": "test_table", + "schema_name": "test_schema", + "database_name": "test_db", + "query": "SELECT 1", + "target_lag": "1 hour", + "snowflake_warehouse": "MY_WH", + "scheduler": "ENABLE", + } + ) + assert config.scheduler == "ENABLE" + + def test_scheduler_can_be_set_disable(self): + """scheduler should be settable to DISABLE.""" + config = SnowflakeDynamicTableConfig.from_dict( + { + "name": "test_table", + "schema_name": "test_schema", + "database_name": "test_db", + "query": "SELECT 1", + "snowflake_warehouse": "MY_WH", + "scheduler": "DISABLE", + } + ) + assert config.scheduler == "DISABLE" + + def test_scheduler_can_be_none(self): + """scheduler can be explicitly set to None.""" + config = SnowflakeDynamicTableConfig.from_dict( + { + "name": "test_table", + "schema_name": "test_schema", + "database_name": "test_db", + "query": "SELECT 1", + "target_lag": "1 hour", + "snowflake_warehouse": "MY_WH", + "scheduler": None, + } + ) + assert config.scheduler is None + + +class TestSchedulerChangeset: + """Tests for scheduler change detection in SnowflakeDynamicTableConfigChangeset.""" + + def test_changeset_without_scheduler_has_no_changes(self): + """A changeset with no scheduler change should not have changes.""" + changeset = SnowflakeDynamicTableConfigChangeset() + assert changeset.scheduler is None + assert changeset.has_changes is False + assert changeset.requires_full_refresh is False + + def test_changeset_with_scheduler_change(self): + """A changeset with scheduler change should have changes but not require full refresh.""" + changeset = SnowflakeDynamicTableConfigChangeset( + scheduler=SnowflakeDynamicTableSchedulerConfigChange( + action=RelationConfigChangeAction.alter, + context="DISABLE", + ) + ) + assert changeset.scheduler is not None + assert changeset.has_changes is True + assert changeset.requires_full_refresh is False + + def test_scheduler_change_does_not_require_full_refresh(self): + """Changing scheduler should not require a full refresh (can be altered in place).""" + change = SnowflakeDynamicTableSchedulerConfigChange( + action=RelationConfigChangeAction.alter, + context="ENABLE", + ) + assert change.requires_full_refresh is False + + def test_scheduler_change_to_disable(self): + """Changing scheduler to DISABLE should be a valid change.""" + changeset = SnowflakeDynamicTableConfigChangeset( + scheduler=SnowflakeDynamicTableSchedulerConfigChange( + action=RelationConfigChangeAction.alter, + context="DISABLE", + ) + ) + assert changeset.scheduler is not None + assert changeset.scheduler.context == "DISABLE" + assert changeset.has_changes is True + assert changeset.requires_full_refresh is False + + +class TestTargetLagOptional: + """Tests to verify target_lag is now optional.""" + + def test_target_lag_can_be_none(self): + """target_lag should now accept None (for scheduler=DISABLE scenarios).""" + config = SnowflakeDynamicTableConfig.from_dict( + { + "name": "test_table", + "schema_name": "test_schema", + "database_name": "test_db", + "query": "SELECT 1", + "snowflake_warehouse": "MY_WH", + "scheduler": "DISABLE", + } + ) + assert config.target_lag is None + assert config.scheduler == "DISABLE" + + def test_target_lag_can_still_be_set(self): + """target_lag should still work when provided.""" + config = SnowflakeDynamicTableConfig.from_dict( + { + "name": "test_table", + "schema_name": "test_schema", + "database_name": "test_db", + "query": "SELECT 1", + "target_lag": "5 minutes", + "snowflake_warehouse": "MY_WH", + } + ) + assert config.target_lag == "5 minutes" + + def test_target_lag_defaults_to_none(self): + """target_lag should default to None when omitted.""" + config = SnowflakeDynamicTableConfig.from_dict( + { + "name": "test_table", + "schema_name": "test_schema", + "database_name": "test_db", + "query": "SELECT 1", + "snowflake_warehouse": "MY_WH", + } + ) + assert config.target_lag is None + + +class TestSchedulerChangeDetectionLogic: + """ + Tests for scheduler change detection in SnowflakeRelation.dynamic_table_config_changeset(). + """ + + @staticmethod + def _make_relation_results(scheduler=None): + import agate + + dt_row_data = { + "name": "test_table", + "schema_name": "test_schema", + "database_name": "test_db", + "text": "SELECT 1", + "target_lag": "1 hour", + "warehouse": "MY_WH", + "refresh_mode": "AUTO", + "immutable_where": None, + } + column_types = [agate.Text()] * len(dt_row_data) + + if scheduler is not None: + dt_row_data["scheduler"] = scheduler + column_types.append(agate.Text()) + + return { + "dynamic_table": agate.Table( + [list(dt_row_data.values())], + list(dt_row_data.keys()), + column_types, + ) + } + + @staticmethod + def _make_relation_config(scheduler=None, target_lag="1 hour"): + from unittest.mock import MagicMock + + relation_config = MagicMock() + relation_config.identifier = "test_table" + relation_config.schema = "test_schema" + relation_config.database = "test_db" + relation_config.compiled_code = "SELECT 1" + extra = { + "snowflake_warehouse": "MY_WH", + } + if target_lag is not None: + extra["target_lag"] = target_lag + if scheduler is not None: + extra["scheduler"] = scheduler + relation_config.config.extra = extra + relation_config.config.get = lambda key, default=None: relation_config.config.extra.get( + key, default + ) + return relation_config + + def test_no_scheduler_config_no_column_no_change(self): + """When user doesn't set scheduler and column is absent, no change detected.""" + from dbt.adapters.snowflake.relation import SnowflakeRelation + + relation_results = self._make_relation_results() + relation_config = self._make_relation_config(scheduler=None) + + changeset = SnowflakeRelation.dynamic_table_config_changeset( + relation_results, relation_config + ) + + if changeset is not None: + assert changeset.scheduler is None + + def test_scheduler_disable_matches_no_change(self): + """When user sets scheduler=DISABLE and SHOW output is DISABLE, no change.""" + from dbt.adapters.snowflake.relation import SnowflakeRelation + + relation_results = self._make_relation_results(scheduler="DISABLE") + relation_config = self._make_relation_config(scheduler="DISABLE") + + changeset = SnowflakeRelation.dynamic_table_config_changeset( + relation_results, relation_config + ) + + if changeset is not None: + assert changeset.scheduler is None + + def test_scheduler_enable_matches_no_change(self): + """When user sets scheduler=ENABLE and SHOW output is ENABLE, no change.""" + from dbt.adapters.snowflake.relation import SnowflakeRelation + + relation_results = self._make_relation_results(scheduler="ENABLE") + relation_config = self._make_relation_config(scheduler="ENABLE") + + changeset = SnowflakeRelation.dynamic_table_config_changeset( + relation_results, relation_config + ) + + if changeset is not None: + assert changeset.scheduler is None + + def test_scheduler_change_triggers_changeset(self): + """When user sets scheduler=DISABLE but SHOW output is ENABLE, change detected.""" + from dbt.adapters.snowflake.relation import SnowflakeRelation + + relation_results = self._make_relation_results(scheduler="ENABLE") + relation_config = self._make_relation_config(scheduler="DISABLE") + + changeset = SnowflakeRelation.dynamic_table_config_changeset( + relation_results, relation_config + ) + + assert changeset is not None + assert changeset.scheduler is not None + assert changeset.scheduler.context == "DISABLE" + assert changeset.requires_full_refresh is False