Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class LinkableElementProperty(Enum):
METRIC = "metric"
# A time dimension with a DatePart.
DATE_PART = "date_part"
# A linkable element that is itself part of an SCD model, or a linkable element that gets joined through another SCD model.
SCD_HOP = "scd_hop"

@staticmethod
def all_properties() -> FrozenSet[LinkableElementProperty]: # noqa: D102
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ class LinkableSpecIndex:
semantic_model_reference_to_joined_elements: Dict[SemanticModelReference, LinkableElementSet]
semantic_model_reference_to_local_elements: Dict[SemanticModelReference, LinkableElementSet]
measure_to_metric_time_elements: Dict[Optional[MeasureReference], LinkableElementSet]
metric_time_elements: LinkableElementSet
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ def __init__(
def build_index(self) -> LinkableSpecIndex: # noqa: D102
start_time = time.perf_counter()
for metric in self._semantic_manifest.metrics:
# self._metric_references_to_metrics[MetricReference(metric.name)] = metric
linkable_sets_for_measure = []
linkable_sets_for_measure: List[LinkableElementSet] = []
for measure in metric.measure_references:
if metric.type is MetricType.CUMULATIVE:
linkable_sets_for_measure.append(
Expand Down Expand Up @@ -185,9 +184,9 @@ def build_index(self) -> LinkableSpecIndex: # noqa: D102
for semantic_model in self._semantic_manifest.semantic_models:
linkable_element_sets_for_no_metrics_queries.append(self._get_elements_in_semantic_model(semantic_model))

metric_time_elements_for_no_metrics = self._get_metric_time_elements(measure_reference=None)
self._linkable_spec_index.metric_time_elements = self._get_metric_time_elements()
self._linkable_spec_index.no_metric_linkable_element_set = LinkableElementSet.merge_by_path_key(
linkable_element_sets_for_no_metrics_queries + [metric_time_elements_for_no_metrics]
linkable_element_sets_for_no_metrics_queries + [self._linkable_spec_index.metric_time_elements]
)

logger.debug(
Expand All @@ -196,10 +195,6 @@ def build_index(self) -> LinkableSpecIndex: # noqa: D102

return self._linkable_spec_index

def _is_semantic_model_scd(self, semantic_model: SemanticModel) -> bool:
"""Whether the semantic model's underlying table is an SCD."""
return any(dim.validity_params is not None for dim in semantic_model.dimensions)

def _generate_linkable_time_dimensions(
self,
semantic_model_origin: SemanticModelReference,
Expand Down Expand Up @@ -306,8 +301,6 @@ def get_joinable_metrics_for_semantic_model(
necessary.
"""
properties = frozenset({LinkableElementProperty.METRIC, LinkableElementProperty.JOINED})
if self._is_semantic_model_scd(semantic_model):
properties = properties.union({LinkableElementProperty.SCD_HOP})

join_path_has_path_links = len(using_join_path.path_elements) > 0
if join_path_has_path_links:
Expand Down Expand Up @@ -352,14 +345,10 @@ def _get_elements_in_semantic_model(self, semantic_model: SemanticModel) -> Link
if result is not None:
return result

semantic_model_is_scd = self._is_semantic_model_scd(semantic_model)

linkable_dimensions = []
linkable_entities = []

entity_properties = frozenset({LinkableElementProperty.LOCAL, LinkableElementProperty.ENTITY})
if semantic_model_is_scd:
entity_properties = entity_properties.union({LinkableElementProperty.SCD_HOP})

for entity in semantic_model.entities:
linkable_entities.append(
Expand Down Expand Up @@ -390,8 +379,6 @@ def _get_elements_in_semantic_model(self, semantic_model: SemanticModel) -> Link
)

dimension_properties = frozenset({LinkableElementProperty.LOCAL})
if semantic_model_is_scd:
dimension_properties = dimension_properties.union({LinkableElementProperty.SCD_HOP})

for entity_link in SemanticModelHelper.entity_links_for_local_elements(semantic_model):
for dimension in semantic_model.dimensions:
Expand Down Expand Up @@ -487,7 +474,9 @@ def _get_time_granularity_for_dimension(

return type_params.time_granularity

def _get_metric_time_elements(self, measure_reference: Optional[MeasureReference] = None) -> LinkableElementSet:
def _get_metric_time_elements_for_measure(
self, measure_reference: Optional[MeasureReference] = None
) -> LinkableElementSet:
"""Create elements for metric_time for a given measure in a semantic model.

metric_time is a virtual dimension that is the same as aggregation time dimension for a measure, but with a
Expand All @@ -499,26 +488,34 @@ def _get_metric_time_elements(self, measure_reference: Optional[MeasureReference
return result

measure_semantic_model: Optional[SemanticModel] = None
defined_granularity: Optional[ExpandedTimeGranularity] = None
defined_granularity: Optional[TimeGranularity] = None
if measure_reference:
# measure_semantic_model = self._get_semantic_model_for_measure(measure_reference)
measure_semantic_model = self._manifest_object_lookup.get_semantic_model_containing_measure(
measure_reference
)
semantic_model_is_scd = self._is_semantic_model_scd(measure_semantic_model)
measure_agg_time_dimension_reference = measure_semantic_model.checked_agg_time_dimension_for_measure(
measure_reference=measure_reference
)
min_granularity = self._get_time_granularity_for_dimension(
defined_granularity = self._get_time_granularity_for_dimension(
semantic_model=measure_semantic_model,
time_dimension_reference=measure_agg_time_dimension_reference,
)
defined_granularity = ExpandedTimeGranularity.from_time_granularity(min_granularity)
else:
# If querying metric_time without metrics, will query from time spines.
# Defaults to DAY granularity if available in time spines, else smallest available granularity.
min_granularity = min(self._time_spine_sources.keys())
semantic_model_is_scd = False

metric_time_elements = self._get_metric_time_elements()
for path_key in metric_time_elements.path_key_to_linkable_dimensions:
pass # TODO: filter out any where granularity / date part is not compatible with defined_granularity

self._linkable_spec_index.measure_to_metric_time_elements[measure_reference] = metric_time_elements
return metric_time_elements

def _get_metric_time_elements(self) -> LinkableElementSet:
result = self._linkable_spec_index.metric_time_elements
if result is not None:
return result

# If querying metric_time without metrics, will query from time spines.
# Defaults to DAY granularity if available in time spines, else smallest available granularity.
min_granularity = min(self._time_spine_sources.keys())
possible_metric_time_granularities = tuple(
ExpandedTimeGranularity.from_time_granularity(time_granularity)
for time_granularity in TimeGranularity
Expand All @@ -536,21 +533,15 @@ def _get_metric_time_elements(self, measure_reference: Optional[MeasureReference
path_key_to_linkable_dimensions: Dict[ElementPathKey, List[LinkableDimension]] = defaultdict(list)
for time_granularity in possible_metric_time_granularities:
properties = {LinkableElementProperty.METRIC_TIME}
if time_granularity != defined_granularity:
if time_granularity != min_granularity:
properties.add(LinkableElementProperty.DERIVED_TIME_GRANULARITY)
if semantic_model_is_scd:
properties.add(LinkableElementProperty.SCD_HOP)
linkable_dimension = LinkableDimension.create(
defined_in_semantic_model=measure_semantic_model.reference if measure_semantic_model else None,
defined_in_semantic_model=None,
element_name=MetricFlowReservedKeywords.METRIC_TIME.value,
dimension_type=DimensionType.TIME,
entity_links=(),
join_path=SemanticModelJoinPath(
left_semantic_model_reference=(
measure_semantic_model.reference
if measure_semantic_model
else SemanticModelDerivation.VIRTUAL_SEMANTIC_MODEL_REFERENCE
),
left_semantic_model_reference=SemanticModelDerivation.VIRTUAL_SEMANTIC_MODEL_REFERENCE,
),
properties=frozenset(properties),
time_granularity=time_granularity,
Expand All @@ -561,19 +552,13 @@ def _get_metric_time_elements(self, measure_reference: Optional[MeasureReference
if date_part.to_int() < min_granularity.to_int():
continue
properties = {LinkableElementProperty.METRIC_TIME, LinkableElementProperty.DATE_PART}
if semantic_model_is_scd:
properties.add(LinkableElementProperty.SCD_HOP)
linkable_dimension = LinkableDimension.create(
defined_in_semantic_model=measure_semantic_model.reference if measure_semantic_model else None,
defined_in_semantic_model=None,
element_name=MetricFlowReservedKeywords.METRIC_TIME.value,
dimension_type=DimensionType.TIME,
entity_links=(),
join_path=SemanticModelJoinPath(
left_semantic_model_reference=(
measure_semantic_model.reference
if measure_semantic_model
else SemanticModelDerivation.VIRTUAL_SEMANTIC_MODEL_REFERENCE
),
left_semantic_model_reference=SemanticModelDerivation.VIRTUAL_SEMANTIC_MODEL_REFERENCE,
),
properties=frozenset(properties),
date_part=date_part,
Expand All @@ -586,7 +571,7 @@ def _get_metric_time_elements(self, measure_reference: Optional[MeasureReference
for path_key, linkable_dimensions in path_key_to_linkable_dimensions.items()
}
)
self._linkable_spec_index.measure_to_metric_time_elements[measure_reference] = result
self._linkable_spec_index.metric_time_elements = result
return result

def _get_joined_elements(self, measure_semantic_model_reference: SemanticModelReference) -> LinkableElementSet:
Expand Down Expand Up @@ -656,36 +641,34 @@ def _get_joined_elements_without_cache(

return LinkableElementSet.merge_by_path_key((single_hop_elements, multi_hop_elements))

def _get_linkable_elements_accessible_from_semantic_model(
self, semantic_model: SemanticModel
) -> LinkableElementSet:
elements_in_semantic_model = self._get_elements_in_semantic_model(semantic_model)
joined_elements = self._get_joined_elements(semantic_model.reference)
return LinkableElementSet.merge_by_path_key((elements_in_semantic_model, joined_elements))

def _get_linkable_element_set_for_measure(
self,
measure_reference: MeasureReference,
element_filter: LinkableElementFilter = LinkableElementFilter(),
) -> LinkableElementSet:
"""See get_linkable_element_set_for_measure()."""
# measure_semantic_model = self._get_semantic_model_for_measure(measure_reference)
measure_semantic_model = self._manifest_object_lookup.get_semantic_model_containing_measure(measure_reference)

elements_in_semantic_model = self._get_elements_in_semantic_model(measure_semantic_model)
semantic_model = self._manifest_object_lookup.get_semantic_model_containing_measure(measure_reference)
linkable_elements = self._get_linkable_elements_accessible_from_semantic_model(semantic_model)

# Filter out group-by metrics if not specified by the property as there can be a large number of them.
metrics_linked_to_semantic_model = LinkableElementSet()
if LinkableElementProperty.METRIC not in element_filter.without_any_of:
metrics_linked_to_semantic_model = self.get_joinable_metrics_for_semantic_model(
semantic_model=measure_semantic_model,
using_join_path=SemanticModelJoinPath(left_semantic_model_reference=measure_semantic_model.reference),
semantic_model=semantic_model,
using_join_path=SemanticModelJoinPath(left_semantic_model_reference=semantic_model.reference),
)
else:
metrics_linked_to_semantic_model = LinkableElementSet()

metric_time_elements = self._get_metric_time_elements(measure_reference)
joined_elements = self._get_joined_elements(measure_semantic_model.reference)
metric_time_elements = self._get_metric_time_elements_for_measure(measure_reference)

return LinkableElementSet.merge_by_path_key(
(
elements_in_semantic_model,
metrics_linked_to_semantic_model,
metric_time_elements,
joined_elements,
)
(linkable_elements, metrics_linked_to_semantic_model, metric_time_elements)
).filter(element_filter)

def get_linkable_element_set_for_measure(
Expand Down Expand Up @@ -798,17 +781,6 @@ def create_linkable_element_set_from_join_path(
if len(join_path.path_elements) > 1:
properties = properties.union({LinkableElementProperty.MULTI_HOP})

# If any of the semantic models in the join path is an SCD, add SCD_HOP
for reference_to_derived_model in join_path.derived_from_semantic_models:
derived_model = self._semantic_model_lookup.get_by_reference(reference_to_derived_model)
assert (
derived_model
), f"Semantic model {reference_to_derived_model.semantic_model_name} is in join path but does not exist in SemanticModelLookup"

if self._is_semantic_model_scd(derived_model):
properties = properties.union({LinkableElementProperty.SCD_HOP})
break

linkable_dimensions: List[LinkableDimension] = []
linkable_entities: List[LinkableEntity] = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ def __init__( # noqa: D107
self._linkable_spec_index = linkable_spec_index
self._manifest_object_lookup = manifest_object_lookup

def _is_semantic_model_scd(self, semantic_model: SemanticModel) -> bool:
"""Whether the semantic model's underlying table is an SCD."""
return any(dim.validity_params is not None for dim in semantic_model.dimensions)

def _generate_linkable_time_dimensions(
self,
semantic_model_origin: SemanticModelReference,
Expand Down Expand Up @@ -157,8 +153,6 @@ def get_joinable_metrics_for_semantic_model(
necessary.
"""
properties = frozenset({LinkableElementProperty.METRIC, LinkableElementProperty.JOINED})
if self._is_semantic_model_scd(semantic_model):
properties = properties.union({LinkableElementProperty.SCD_HOP})

join_path_has_path_links = len(using_join_path.path_elements) > 0
if join_path_has_path_links:
Expand Down Expand Up @@ -322,17 +316,6 @@ def create_linkable_element_set_from_join_path(
if len(join_path.path_elements) > 1:
properties = properties.union({LinkableElementProperty.MULTI_HOP})

# If any of the semantic models in the join path is an SCD, add SCD_HOP
for reference_to_derived_model in join_path.derived_from_semantic_models:
derived_model = self._semantic_model_lookup.get_by_reference(reference_to_derived_model)
assert (
derived_model
), f"Semantic model {reference_to_derived_model.semantic_model_name} is in join path but does not exist in SemanticModelLookup"

if self._is_semantic_model_scd(derived_model):
properties = properties.union({LinkableElementProperty.SCD_HOP})
break

linkable_dimensions: List[LinkableDimension] = []
linkable_entities: List[LinkableEntity] = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from metricflow_semantics.model.semantics.manifest_object_lookup import SemanticManifestObjectLookup
from metricflow_semantics.model.semantics.semantic_model_join_evaluator import MAX_JOIN_HOPS
from metricflow_semantics.model.semantics.semantic_model_lookup import SemanticModelLookup
from metricflow_semantics.specs.instance_spec import LinkableInstanceSpec
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.time.granularity import ExpandedTimeGranularity

Expand Down Expand Up @@ -323,15 +322,3 @@ def _get_min_queryable_time_granularity(self, metric_reference: MetricReference)
agg_time_dimension_grains.add(measure_properties.agg_time_granularity)

return max(agg_time_dimension_grains, key=lambda time_granularity: time_granularity.to_int())

def get_joinable_scd_specs_for_metric(self, metric_reference: MetricReference) -> Sequence[LinkableInstanceSpec]:
"""Get the SCDs that can be joined to a metric."""
filter = LinkableElementFilter(
with_any_of=frozenset([LinkableElementProperty.SCD_HOP]),
)
scd_elems = self.linkable_elements_for_metrics(
metric_references=(metric_reference,),
element_set_filter=filter,
)

return scd_elems.specs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ def __init__( # noqa: D107
> 0
)

self._scd_linkable_element_set = self._resolve_group_by_item_result.linkable_element_set.filter(
LinkableElementFilter(with_any_of=frozenset({LinkableElementProperty.SCD_HOP}))
)

def _query_includes_agg_time_dimension_of_metric(self, metric_reference: MetricReference) -> bool:
valid_agg_time_dimensions = self._manifest_lookup.metric_lookup.get_valid_agg_time_dimensions_for_metric(
metric_reference
Expand Down
Loading