File tree Expand file tree Collapse file tree 2 files changed +7
-2
lines changed Expand file tree Collapse file tree 2 files changed +7
-2
lines changed Original file line number Diff line number Diff line change @@ -41,7 +41,7 @@ def find_common_branches(dataflow_plan: DataflowPlan) -> Sequence[DataflowPlanNo
41
41
42
42
@staticmethod
43
43
def group_nodes_by_type (dataflow_plan : DataflowPlan ) -> DataflowPlanNodeSet :
44
- """Grouops dataflow plan nodes by type."""
44
+ """Groups dataflow plan nodes by type."""
45
45
grouping_visitor = _GroupNodesByTypeVisitor ()
46
46
return dataflow_plan .sink_node .accept (grouping_visitor )
47
47
@@ -98,12 +98,14 @@ class DataflowPlanNodeSet(Mergeable):
98
98
99
99
compute_metric_nodes : FrozenSet [ComputeMetricsNode ]
100
100
101
+ @override
101
102
def merge (self , other : DataflowPlanNodeSet ) -> DataflowPlanNodeSet :
102
103
return DataflowPlanNodeSet (
103
104
compute_metric_nodes = self .compute_metric_nodes .union (other .compute_metric_nodes ),
104
105
)
105
106
106
107
@classmethod
108
+ @override
107
109
def empty_instance (cls ) -> DataflowPlanNodeSet :
108
110
return DataflowPlanNodeSet (
109
111
compute_metric_nodes = frozenset (),
Original file line number Diff line number Diff line change @@ -273,7 +273,10 @@ def _get_nodes_to_convert_to_cte(
273
273
"""Handles logic for selecting which nodes to convert to CTEs based on the request."""
274
274
dataflow_plan = dataflow_plan_node .as_plan ()
275
275
nodes_to_convert_to_cte : Set [DataflowPlanNode ] = set (DataflowPlanAnalyzer .find_common_branches (dataflow_plan ))
276
- # Additional nodes will be added later.
276
+
277
+ compute_metric_nodes = DataflowPlanAnalyzer .group_nodes_by_type (dataflow_plan ).compute_metric_nodes
278
+ if len (compute_metric_nodes ) > 1 :
279
+ nodes_to_convert_to_cte .update (compute_metric_nodes )
277
280
278
281
return frozenset (nodes_to_convert_to_cte )
279
282
You can’t perform that action at this time.
0 commit comments