Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions core/dbt/artifacts/resources/v1/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ class ConstantPropertyInput(dbtClassMixin):

@dataclass
class ConversionTypeParams(dbtClassMixin):
base_measure: MetricInputMeasure
conversion_measure: MetricInputMeasure
entity: str
base_measure: Optional[MetricInputMeasure] = None
conversion_measure: Optional[MetricInputMeasure] = None
base_metric: Optional[MetricInput] = None
conversion_metric: Optional[MetricInput] = None
calculation: ConversionCalculationType = ConversionCalculationType.CONVERSION_RATE
window: Optional[MetricTimeWindow] = None
constant_properties: Optional[List[ConstantPropertyInput]] = None
Expand All @@ -106,13 +108,15 @@ class MetricAggregationParams(dbtClassMixin):
agg_params: Optional[MeasureAggregationParameters] = None
agg_time_dimension: Optional[str] = None
non_additive_dimension: Optional[NonAdditiveDimension] = None
expr: Optional[str] = None


@dataclass
class MetricTypeParams(dbtClassMixin):
# Only used in old-style YAML (pre-November 2025)
measure: Optional[MetricInputMeasure] = None
# Only used in old-style YAML (pre-November 2025)
input_measures: List[MetricInputMeasure] = field(default_factory=list)

numerator: Optional[MetricInput] = None
denominator: Optional[MetricInput] = None
expr: Optional[str] = None
Expand All @@ -123,7 +127,12 @@ class MetricTypeParams(dbtClassMixin):
metrics: Optional[List[MetricInput]] = None
conversion_type_params: Optional[ConversionTypeParams] = None
cumulative_type_params: Optional[CumulativeTypeParams] = None

# Only used in v2 YAML
metric_aggregation_params: Optional[MetricAggregationParams] = None
fill_nulls_with: Optional[int] = None
join_to_timespine: Optional[bool] = None
is_private: Optional[bool] = None # populated by "hidden" field in YAML


@dataclass
Expand All @@ -148,8 +157,6 @@ class Metric(GraphResource):
metadata: Optional[SourceFileMetadata] = None
time_granularity: Optional[str] = None
resource_type: Literal[NodeType.Metric]
meta: Dict[str, Any] = field(default_factory=dict, metadata=MergeBehavior.Update.meta())
tags: List[str] = field(default_factory=list)
config: MetricConfig = field(default_factory=MetricConfig)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
sources: List[List[str]] = field(default_factory=list)
Expand All @@ -159,14 +166,18 @@ class Metric(GraphResource):
created_at: float = field(default_factory=lambda: time.time())
group: Optional[str] = None

# These fields are only used in v1 metrics.
meta: Dict[str, Any] = field(default_factory=dict, metadata=MergeBehavior.Update.meta())
tags: List[str] = field(default_factory=list)

@property
def input_metrics(self) -> List[MetricInput]:
return self.type_params.metrics or []

@property
def input_measures(self) -> List[MetricInputMeasure]:
return self.type_params.input_measures

@property
def measure_references(self) -> List[MeasureReference]:
return [x.measure_reference() for x in self.input_measures]

@property
def input_metrics(self) -> List[MetricInput]:
return self.type_params.metrics or []
62 changes: 52 additions & 10 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import datetime
import re
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Sequence, Union

from typing_extensions import override

# trigger the PathEncoder
import dbt_common.helper_types # noqa:F401
from dbt import deprecations
Expand Down Expand Up @@ -580,15 +583,34 @@ class UnparsedMetricInput(dbtClassMixin):

@dataclass
class UnparsedConversionTypeParams(dbtClassMixin):
base_measure: Union[UnparsedMetricInputMeasure, str]
conversion_measure: Union[UnparsedMetricInputMeasure, str]
entity: str

# *_measure params are for old-style YAML.
base_measure: Optional[Union[UnparsedMetricInputMeasure, str]] = None
conversion_measure: Optional[Union[UnparsedMetricInputMeasure, str]] = None
# *_metric params are for v2-style metrics.
base_metric: Optional[Union[UnparsedMetricInput, str]] = None
conversion_metric: Optional[Union[UnparsedMetricInput, str]] = None

calculation: str = (
ConversionCalculationType.CONVERSION_RATE.value
) # ConversionCalculationType Enum
window: Optional[str] = None
constant_properties: Optional[List[ConstantPropertyInput]] = None

@override
@classmethod
def validate(cls, data):
super().validate(data)
if data.get("base_measure") is None and data.get("base_metric") is None:
raise ValidationError(
"Conversion metrics must define a base_measure or base_metric parameter."
)
if data.get("conversion_measure") is None and data.get("conversion_metric") is None:
raise ValidationError(
"Conversion metrics must define a conversion_measure or conversion_metric parameter."
)


@dataclass
class UnparsedCumulativeTypeParams(dbtClassMixin):
Expand Down Expand Up @@ -674,30 +696,37 @@ class UnparsedMetricV2(UnparsedMetricBase):

join_to_timespine: Optional[bool] = None
fill_nulls_with: Optional[int] = None
expr: Optional[Union[str, int]] = None
expr: Optional[Union[str, bool]] = None

non_additive_dimension: Optional[UnparsedNonAdditiveDimensionV2] = None
agg_time_dimension: Optional[str] = None

# For cumulative metrics
window: Optional[str] = None
grain_to_date: Optional[str] = None
period_agg: Optional[str] = None
period_agg: str = PeriodAggregation.FIRST.value
input_metric: Optional[Union[str, Dict[str, Any]]] = None

# For ratio metrics
numerator: Optional[Union[str, Dict[str, Any]]] = None
denominator: Optional[Union[str, Dict[str, Any]]] = None
numerator: Optional[Union[UnparsedMetricInput, str]] = None
denominator: Optional[Union[UnparsedMetricInput, str]] = None

# For derived metrics
input_metrics: Optional[List[Dict[str, Any]]] = None
input_metrics: Optional[List[Union[UnparsedMetricInput, str]]] = None

# For conversion metrics
entity: Optional[str] = None
calculation: Optional[str] = None
base_metric: Optional[Union[str, Dict[str, Any]]] = None
conversion_metric: Optional[Union[str, Dict[str, Any]]] = None
constant_properties: Optional[List[Dict[str, Any]]] = None
base_metric: Optional[Union[UnparsedMetricInput, str]] = None
conversion_metric: Optional[Union[UnparsedMetricInput, str]] = None
constant_properties: Optional[List[ConstantPropertyInput]] = None

@classmethod
@override
def validate(cls, data):
super().validate(data)
if data["type"] == "simple" and data.get("agg") is None:
raise ValidationError("Simple metrics must have an agg param.")


@dataclass
Expand All @@ -712,6 +741,14 @@ def validate(cls, data):
super(UnparsedGroup, cls).validate(data)
if data["owner"].get("name") is None and data["owner"].get("email") is None:
raise ValidationError("Group owner must have at least one of 'name' or 'email'.")
# TODO DI-4413: the following are not strictly necessary (they will be handled
# in dsi validation), but they would be a better user experience
# if we did it at parse time.
# TODO: validate that conversion metrics have base_metric, conversion_metric, and entity
# TODO: validate that cumulative metrics have all required inputs here
# TODO: validate that derived metrics have all required inputs here
# TODO: validate that ratio metrics have all required inputs here
# TODO: validate that simple metrics have all required inputs here


@dataclass
Expand Down Expand Up @@ -748,6 +785,11 @@ class UnparsedNonAdditiveDimension(dbtClassMixin):
window_groupings: List[str] = field(default_factory=list)


class PercentileType(str, Enum):
DISCRETE = "discrete"
CONTINUOUS = "continuous"


@dataclass
class UnparsedMeasure(dbtClassMixin):
name: str
Expand Down
120 changes: 82 additions & 38 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dbt.artifacts.resources import (
CatalogWriteIntegrationConfig,
FileHash,
MetricInput,
NodeRelation,
NodeVersion,
)
Expand Down Expand Up @@ -1860,14 +1861,14 @@ def _process_refs(
node.depends_on.add_node(target_model_id)


def _process_metric_depends_on(
def _process_metric_depends_on_semantic_models_for_measures(
manifest: Manifest,
current_project: str,
metric: Metric,
) -> None:
"""For a given metric, set the `depends_on` property"""

assert len(metric.type_params.input_measures) > 0
# assert len(metric.type_params.input_measures) > 0 or metric.type_params.metric_aggregation_params is not None, f"{metric} should have a measure or agg type defined, but it does not."
for input_measure in metric.type_params.input_measures:
target_semantic_model = manifest.resolve_semantic_model_for_measure(
target_measure_name=input_measure.name,
Expand All @@ -1888,6 +1889,39 @@ def _process_metric_depends_on(
metric.depends_on.add_node(target_semantic_model.unique_id)


def _process_multiple_metric_inputs(
manifest: Manifest,
current_project: str,
metric: Metric,
metric_inputs: List[MetricInput],
) -> None:
for input_metric in metric_inputs:
target_metric = manifest.resolve_metric(
target_metric_name=input_metric.name,
target_metric_package=None,
current_project=current_project,
node_package=metric.package_name,
)

if target_metric is None:
raise dbt.exceptions.ParsingError(
f"The metric `{input_metric.name}` does not exist but was referenced.",
node=metric,
)
elif isinstance(target_metric, Disabled):
raise dbt.exceptions.ParsingError(
f"The metric `{input_metric.name}` is disabled and thus cannot be referenced.",
node=metric,
)

_process_metric_node(
manifest=manifest, current_project=current_project, metric=target_metric
)
for input_measure in target_metric.type_params.input_measures:
metric.add_input_measure(input_measure)
metric.depends_on.add_node(target_metric.unique_id)


def _process_metric_node(
manifest: Manifest,
current_project: str,
Expand All @@ -1903,22 +1937,51 @@ def _process_metric_node(
return

if metric.type is MetricType.SIMPLE or metric.type is MetricType.CUMULATIVE:
assert (
metric.type_params.measure is not None
), f"{metric} should have a measure defined, but it does not."
metric.add_input_measure(metric.type_params.measure)
_process_metric_depends_on(
manifest=manifest, current_project=current_project, metric=metric
)
if (
metric.type_params.measure is None
and metric.type_params.metric_aggregation_params is None
):
# This should be caught earlier, but just in case, we assert here to avoid
# any unexpected behaviors.
raise dbt.exceptions.ParsingError(
f"Metric {metric} should have a measure or agg type defined, but it does not.",
node=metric,
)
if metric.type_params.measure is not None:
metric.add_input_measure(metric.type_params.measure)
_process_metric_depends_on_semantic_models_for_measures(
manifest=manifest, current_project=current_project, metric=metric
)
# TODO: Once we can process simple metric merged into a model directly,
# we need to add a 'depends on'
elif metric.type is MetricType.CONVERSION:
conversion_type_params = metric.type_params.conversion_type_params
assert (
conversion_type_params
), f"{metric.name} is a conversion metric and must have conversion_type_params defined."
metric.add_input_measure(conversion_type_params.base_measure)
metric.add_input_measure(conversion_type_params.conversion_measure)
_process_metric_depends_on(
manifest=manifest, current_project=current_project, metric=metric
# Handle old-style YAML measure inputs
if conversion_type_params.base_measure is not None:
# TODO: add test for base_measure
metric.add_input_measure(conversion_type_params.base_measure)
if conversion_type_params.conversion_measure is not None:
# TODO: add test for conversion_measure
metric.add_input_measure(conversion_type_params.conversion_measure)
_process_metric_depends_on_semantic_models_for_measures(
manifest=manifest,
current_project=current_project,
metric=metric,
)
# Recursively process input metrics' input measures for blended v2 & old-style YAML inputs
metric_inputs = []
if conversion_type_params.base_metric is not None:
metric_inputs.append(conversion_type_params.base_metric)
if conversion_type_params.conversion_metric is not None:
metric_inputs.append(conversion_type_params.conversion_metric)
_process_multiple_metric_inputs(
manifest=manifest,
current_project=current_project,
metric=metric,
metric_inputs=metric_inputs,
)
elif metric.type is MetricType.DERIVED or metric.type is MetricType.RATIO:
input_metrics = metric.input_metrics
Expand All @@ -1930,31 +1993,12 @@ def _process_metric_node(
)
input_metrics = [metric.type_params.numerator, metric.type_params.denominator]

for input_metric in input_metrics:
target_metric = manifest.resolve_metric(
target_metric_name=input_metric.name,
target_metric_package=None,
current_project=current_project,
node_package=metric.package_name,
)

if target_metric is None:
raise dbt.exceptions.ParsingError(
f"The metric `{input_metric.name}` does not exist but was referenced.",
node=metric,
)
elif isinstance(target_metric, Disabled):
raise dbt.exceptions.ParsingError(
f"The metric `{input_metric.name}` is disabled and thus cannot be referenced.",
node=metric,
)

_process_metric_node(
manifest=manifest, current_project=current_project, metric=target_metric
)
for input_measure in target_metric.type_params.input_measures:
metric.add_input_measure(input_measure)
metric.depends_on.add_node(target_metric.unique_id)
_process_multiple_metric_inputs(
manifest=manifest,
current_project=current_project,
metric=metric,
metric_inputs=input_metrics,
)
else:
assert_values_exhausted(metric.type)

Expand Down
Loading
Loading