Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add scheduler configuration support for Snowflake dynamic tables, enabling dbt-managed refresh scheduling that is decoupled from upstream and downstream dynamic table dependencies.
time: 2026-03-12T15:07:24-07:00
custom:
Author: ibelianski
Issue: none
3 changes: 3 additions & 0 deletions dbt-snowflake/src/dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class SnowflakeConfig(AdapterConfig):
tmp_relation_type: Optional[str] = None
merge_update_columns: Optional[str] = None
target_lag: Optional[str] = None
scheduler: Optional[str] = None
row_access_policy: Optional[str] = None
table_tag: Optional[str] = None
immutable_where: Optional[str] = None
Expand Down Expand Up @@ -562,6 +563,8 @@ def describe_dynamic_table(
available_columns = [c.lower() for c in dt_table.column_names]
if "initialization_warehouse" in available_columns:
base_columns.insert(base_columns.index("warehouse") + 1, "initialization_warehouse")
if "scheduler" in available_columns:
base_columns.append("scheduler")

selected = dt_table.select(base_columns)

Expand Down
12 changes: 11 additions & 1 deletion dbt-snowflake/src/dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableInitializationWarehouseConfigChange,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableSchedulerConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableImmutableWhereConfigChange,
Expand Down Expand Up @@ -105,7 +106,10 @@ def dynamic_table_config_changeset(

config_change_collection = SnowflakeDynamicTableConfigChangeset()

if new_dynamic_table.target_lag != existing_dynamic_table.target_lag:
if (
new_dynamic_table.target_lag != existing_dynamic_table.target_lag
and new_dynamic_table.target_lag is not None
):
config_change_collection.target_lag = SnowflakeDynamicTableTargetLagConfigChange(
action=RelationConfigChangeAction.alter, # type:ignore
context=new_dynamic_table.target_lag,
Expand Down Expand Up @@ -139,6 +143,12 @@ def dynamic_table_config_changeset(
context=new_dynamic_table.refresh_mode,
)

if new_dynamic_table.scheduler != existing_dynamic_table.scheduler:
config_change_collection.scheduler = SnowflakeDynamicTableSchedulerConfigChange(
action=RelationConfigChangeAction.alter, # type:ignore
context=new_dynamic_table.scheduler,
)

if new_dynamic_table.immutable_where != existing_dynamic_table.immutable_where:
config_change_collection.immutable_where = (
SnowflakeDynamicTableImmutableWhereConfigChange(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
RefreshMode,
Scheduler,
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableInitializationWarehouseConfigChange,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableSchedulerConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableImmutableWhereConfigChange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def default(cls) -> Self:
return cls("ON_CREATE")


class Scheduler(StrEnum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there's actually three states:

  1. DISABLE - we don't want the MV to be updated
  2. ENABLE - require target_lag (default state today
  3. MANUAL - dbt runs the refresh manually on every run

Copy link
Contributor Author

@igorbelianski-cyber igorbelianski-cyber Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 states:
ENABLE ( default like Dynamic TAbles /MV ) work today subject to be scheduled by Snowflake directly or part of the downstream schedule
DISABLED ( only manual refreshes allowed , Dynamic table scheduler will never touch it. directly or through upstream dependency). => DT's

i do not see a meaningful scenario to justify 3rd mode where we want to disable Dynamic tables from both DBT refreshes and Snowflake Dynamic table scheduler.
( i guess target_lag = downstream on all DTs in the pipeline achieves similar effect functionally , but i am not seeing meaningful scenario for that case)

moreover 2 modes provide really simple syntax for users ( they just need to specify warehouse, absence of target lag implies disabling Dynmic table scheduler and DBT running refreshes)

ENABLE = "ENABLE"
DISABLE = "DISABLE"


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"""
Expand All @@ -47,6 +52,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
- snowflake_initialization_warehouse: the name of the warehouse used for the initializations and reinitializations of the dynamic table
- refresh_mode: specifies the refresh type for the dynamic table
- initialize: specifies the behavior of the initial refresh of the dynamic table
- scheduler: specifies whether to ENABLE or DISABLE the dynamic table's scheduler
- cluster_by: specifies the columns to cluster on
- immutable_where: specifies an immutability constraint expression
- transient: specifies whether the dynamic table is transient (no fail-safe). snowflake_default_transient_dynamic_tables determines the default value
Expand All @@ -58,11 +64,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
schema_name: str
database_name: str
query: str
target_lag: str
snowflake_warehouse: str
target_lag: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target_lag is still required if scheduler is enabled right? We should raise a validation exception in that case.

Copy link
Contributor Author

@igorbelianski-cyber igorbelianski-cyber Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target lag is not mandatory any more
with scheduler = DISABLED , target_lag will not be allowed ( we treat dynamic tables as a unit of incrementalization and completely disconnected from the dynamic table built in scheduler)
with scheduler = ENABLED it is target lag is still mandatory

PS
i was debating whether DBT needs to validate it as opposed allowing snowflake to produce appropriate precise error ( implementing it here seem like extra validation effort that doesn't add a lot of value and make chnages (if snowflake adds some new modes hard to version)

snowflake_initialization_warehouse: Optional[str] = None
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()
scheduler: Optional[Scheduler] = None
row_access_policy: Optional[str] = None
table_tag: Optional[str] = None
cluster_by: Optional[Union[str, list[str]]] = None
Expand All @@ -89,6 +96,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
),
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
"scheduler": config_dict.get("scheduler"),
"row_access_policy": config_dict.get("row_access_policy"),
"table_tag": config_dict.get("table_tag"),
"cluster_by": config_dict.get("cluster_by"),
Expand Down Expand Up @@ -129,6 +137,13 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
if initialize := relation_config.config.extra.get("initialize"): # type:ignore
config_dict["initialize"] = initialize.upper()

if scheduler := relation_config.config.extra.get("scheduler"): # type:ignore
config_dict["scheduler"] = scheduler.upper()
elif config_dict.get("target_lag"):
config_dict["scheduler"] = Scheduler.ENABLE.value
else:
config_dict["scheduler"] = Scheduler.DISABLE.value

return config_dict

@classmethod
Expand Down Expand Up @@ -163,15 +178,21 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str,
else:
cluster_by = None

scheduler = dynamic_table.get("scheduler")
target_lag = dynamic_table.get("target_lag")
if scheduler is None:
scheduler = Scheduler.ENABLE.value if target_lag else Scheduler.DISABLE.value

config_dict = {
"name": dynamic_table.get("name"),
"schema_name": dynamic_table.get("schema_name"),
"database_name": dynamic_table.get("database_name"),
"query": dynamic_table.get("text"),
"target_lag": dynamic_table.get("target_lag"),
"target_lag": target_lag,
"snowflake_warehouse": dynamic_table.get("warehouse"),
"snowflake_initialization_warehouse": init_warehouse,
"refresh_mode": dynamic_table.get("refresh_mode"),
"scheduler": scheduler,
"row_access_policy": dynamic_table.get("row_access_policy"),
"table_tag": dynamic_table.get("table_tag"),
"cluster_by": cluster_by,
Expand Down Expand Up @@ -239,6 +260,15 @@ def requires_full_refresh(self) -> bool:
return False


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableSchedulerConfigChange(RelationConfigChange):
context: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
return False


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableTransientConfigChange(RelationConfigChange):
context: Optional[bool] = None
Expand All @@ -257,6 +287,7 @@ class SnowflakeDynamicTableConfigChangeset:
SnowflakeDynamicTableInitializationWarehouseConfigChange
] = None
refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None
scheduler: Optional[SnowflakeDynamicTableSchedulerConfigChange] = None
immutable_where: Optional[SnowflakeDynamicTableImmutableWhereConfigChange] = None
cluster_by: Optional[SnowflakeDynamicTableClusterByConfigChange] = None
transient: Optional[SnowflakeDynamicTableTransientConfigChange] = None
Expand All @@ -277,6 +308,7 @@ def requires_full_refresh(self) -> bool:
else False
),
self.refresh_mode.requires_full_refresh if self.refresh_mode else False,
self.scheduler.requires_full_refresh if self.scheduler else False,
self.immutable_where.requires_full_refresh if self.immutable_where else False,
self.cluster_by.requires_full_refresh if self.cluster_by else False,
self.transient.requires_full_refresh if self.transient else False,
Expand All @@ -291,6 +323,7 @@ def has_changes(self) -> bool:
self.snowflake_warehouse,
self.snowflake_initialization_warehouse,
self.refresh_mode,
self.scheduler,
self.immutable_where,
self.cluster_by,
self.transient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
{{ dynamic_table_execute_build_sql(build_sql, existing_relation, target_relation) }}
{% endif %}

{%- set dynamic_table = target_relation.from_config(config.model) -%}
{% set scheduler_is_disabled = (dynamic_table.scheduler is not none and dynamic_table.scheduler | upper == 'DISABLE') %}
{% set scheduler_defaults_to_disabled = (dynamic_table.scheduler is none and dynamic_table.target_lag is none) %}
{% set needs_refresh = scheduler_is_disabled or scheduler_defaults_to_disabled %}
{% if build_sql != '' and needs_refresh %}
{% call statement(name="refresh") %}
{{ snowflake__refresh_dynamic_table(target_relation) }}
{% endcall %}
{% endif %}

{{ run_hooks(post_hooks) }}

{% do unset_query_tag(query_tag) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
{%- if snowflake_warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%}
{%- set snowflake_initialization_warehouse = configuration_changes.snowflake_initialization_warehouse -%}
{%- if snowflake_initialization_warehouse and snowflake_initialization_warehouse.context -%}{{- log('Applying UPDATE INITIALIZATION_WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%}
{%- set scheduler = configuration_changes.scheduler -%}
{%- if scheduler -%}{{- log('Applying UPDATE SCHEDULER to: ' ~ existing_relation) -}}{%- endif -%}
{%- set immutable_where = configuration_changes.immutable_where -%}
{%- if immutable_where and immutable_where.context -%}{{- log('Applying UPDATE IMMUTABLE WHERE to: ' ~ existing_relation) -}}{%- endif -%}
{%- set cluster_by = configuration_changes.cluster_by -%}
{%- if cluster_by and cluster_by.context -%}{{- log('Applying UPDATE CLUSTER BY to: ' ~ existing_relation) -}}{%- endif -%}

{#- Determine what SET changes we have -#}
{%- set has_set_changes = target_lag or snowflake_warehouse or (snowflake_initialization_warehouse and snowflake_initialization_warehouse.context) or (immutable_where and immutable_where.context) -%}
{%- set has_set_changes = target_lag or snowflake_warehouse or (snowflake_initialization_warehouse and snowflake_initialization_warehouse.context) or scheduler or (immutable_where and immutable_where.context) -%}

{#- Handle SET operations -#}
{% if has_set_changes %}
alter dynamic table {{ existing_relation }} set
{% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %}
{% if target_lag and target_lag.context %}target_lag = '{{ target_lag.context }}'{% endif %}
{% if snowflake_warehouse %}warehouse = {{ snowflake_warehouse.context }}{% endif %}
{% if snowflake_initialization_warehouse and snowflake_initialization_warehouse.context %}initialization_warehouse = {{ snowflake_initialization_warehouse.context }}{% endif %}
{% if scheduler %}scheduler = '{{ scheduler.context }}'{% endif %}
{% if immutable_where and immutable_where.context %}immutable where ({{ immutable_where.context }}){% endif %}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,16 @@
{%- endif -%}
{%- set transient_keyword = 'transient ' if is_transient else '' -%}
create {{ transient_keyword }}dynamic table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
{% if dynamic_table.target_lag is not none %}target_lag = '{{ dynamic_table.target_lag }}'{% endif %}
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('initialization_warehouse', dynamic_table.snowflake_initialization_warehouse) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
{% if dynamic_table.scheduler is not none %}
scheduler = '{{ dynamic_table.scheduler }}'
{% elif dynamic_table.target_lag is none %}
scheduler = 'DISABLE'
{% endif %}
{{ optional('with row access policy', dynamic_table.row_access_policy, equals_char='') }}
{{ optional('with tag', dynamic_table.table_tag, quote_char='(', equals_char='') }}
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
Expand Down Expand Up @@ -77,14 +82,19 @@
{%- set catalog_relation = adapter.build_catalog_relation(config.model) -%}

create dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
{% if dynamic_table.target_lag is not none %}target_lag = '{{ dynamic_table.target_lag }}'{% endif %}
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('initialization_warehouse', dynamic_table.snowflake_initialization_warehouse) }}
{{ optional('external_volume', catalog_relation.external_volume, "'") }}
catalog = 'SNOWFLAKE' -- required, and always SNOWFLAKE for built-in Iceberg tables
base_location = '{{ catalog_relation.base_location }}'
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
{% if dynamic_table.scheduler is not none %}
scheduler = '{{ dynamic_table.scheduler }}'
{% elif dynamic_table.target_lag is none %}
scheduler = 'DISABLE'
{% endif %}
{{ optional('row_access_policy', dynamic_table.row_access_policy) }}
{{ optional('table_tag', dynamic_table.table_tag) }}
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,16 @@
{%- endif -%}
{%- set transient_keyword = 'transient ' if is_transient else '' -%}
create or replace {{ transient_keyword }}dynamic table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
{% if dynamic_table.target_lag is not none %}target_lag = '{{ dynamic_table.target_lag }}'{% endif %}
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('initialization_warehouse', dynamic_table.snowflake_initialization_warehouse) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
{% if dynamic_table.scheduler is not none %}
scheduler = '{{ dynamic_table.scheduler }}'
{% elif dynamic_table.target_lag is none %}
scheduler = 'DISABLE'
{% endif %}
{{ optional('with row access policy', dynamic_table.row_access_policy, equals_char='') }}
{{ optional('with tag', dynamic_table.table_tag, quote_char='(', equals_char='') }}
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
Expand Down Expand Up @@ -90,14 +95,19 @@ create or replace {{ transient_keyword }}dynamic table {{ relation }}
{%- set catalog_relation = adapter.build_catalog_relation(config.model) -%}

create or replace dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
{% if dynamic_table.target_lag is not none %}target_lag = '{{ dynamic_table.target_lag }}'{% endif %}
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('initialization_warehouse', dynamic_table.snowflake_initialization_warehouse) }}
{{ optional('external_volume', catalog_relation.external_volume, "'") }}
catalog = 'snowflake'
base_location = '{{ catalog_relation.base_location }}'
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
{% if dynamic_table.scheduler is not none %}
scheduler = '{{ dynamic_table.scheduler }}'
{% elif dynamic_table.target_lag is none %}
scheduler = 'DISABLE'
{% endif %}
{{ optional('row_access_policy', dynamic_table.row_access_policy) }}
{{ optional('table_tag', dynamic_table.table_tag) }}
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,84 @@
) }}
select * from {{ ref('my_seed') }}
"""


# Scheduler fixtures
DYNAMIC_TABLE_SCHEDULER_DISABLED = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
scheduler='DISABLE',
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE_SCHEDULER_ENABLED = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
scheduler='ENABLE',
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE_NO_TARGET_LAG = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE_TARGET_LAG_ONLY = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE_SCHEDULER_DISABLED_TO_ENABLED = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
scheduler='ENABLE',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
"""


# Iceberg Scheduler fixtures
DYNAMIC_ICEBERG_TABLE_SCHEDULER_DISABLED = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
scheduler='DISABLE',
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_subpath="subpath",
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_ICEBERG_TABLE_SCHEDULER_ENABLED = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
scheduler='ENABLE',
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_subpath="subpath",
) }}
select * from {{ ref('my_seed') }}
"""
Loading
Loading