Skip to content

Commit efa5fd2

Browse files
Merge pull request #258 from preset-io/dbt_consolidate_logic
chore(dbt): move logic to a function
2 parents 3f19017 + 5458513 commit efa5fd2

File tree

1 file changed

+39
-28
lines changed

1 file changed

+39
-28
lines changed

src/preset_cli/cli/superset/sync/dbt/command.py

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import sys
77
import warnings
88
from pathlib import Path
9-
from typing import List, Optional, Tuple
9+
from typing import Dict, List, Optional, Tuple
1010

1111
import click
1212
import yaml
@@ -335,6 +335,43 @@ def get_job(
335335
raise ValueError(f"Job {job_id} not available")
336336

337337

338+
def process_sl_metrics(
339+
dbt_client: DBTClient,
340+
environment_id: int,
341+
model_map: Dict[ModelKey, ModelSchema],
342+
) -> Optional[List[MFMetricWithSQLSchema]]:
343+
"""
344+
Fetch metrics from the semantic layer and return the ones we can map to models.
345+
"""
346+
dialect = dbt_client.get_sl_dialect(environment_id)
347+
mf_metric_schema = MFMetricWithSQLSchema()
348+
sl_metrics: List[MFMetricWithSQLSchema] = []
349+
for metric in dbt_client.get_sl_metrics(environment_id):
350+
sql = dbt_client.get_sl_metric_sql(metric["name"], environment_id)
351+
if sql is None:
352+
continue
353+
354+
try:
355+
model = get_model_from_sql(sql, dialect, model_map)
356+
except MultipleModelsError:
357+
continue
358+
359+
sl_metrics.append(
360+
mf_metric_schema.load(
361+
{
362+
"name": metric["name"],
363+
"type": metric["type"],
364+
"description": metric["description"],
365+
"sql": sql,
366+
"dialect": dialect.value,
367+
"model": model["unique_id"],
368+
},
369+
),
370+
)
371+
372+
return sl_metrics
373+
374+
338375
@click.command()
339376
@click.argument("token")
340377
@click.argument("job_id", type=click.INT, required=False, default=None)
@@ -448,34 +485,8 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
448485
models = apply_select(models, select, exclude)
449486
model_map = {ModelKey(model["schema"], model["name"]): model for model in models}
450487

451-
# original dbt <= 1.6 metrics
452488
og_metrics = dbt_client.get_og_metrics(job["id"])
453-
454-
# MetricFlow metrics
455-
dialect = dbt_client.get_sl_dialect(job["environment_id"])
456-
mf_metric_schema = MFMetricWithSQLSchema()
457-
sl_metrics: List[MFMetricWithSQLSchema] = []
458-
for metric in dbt_client.get_sl_metrics(job["environment_id"]):
459-
sql = dbt_client.get_sl_metric_sql(metric["name"], job["environment_id"])
460-
if sql is not None:
461-
try:
462-
model = get_model_from_sql(sql, dialect, model_map)
463-
except MultipleModelsError:
464-
continue
465-
466-
sl_metrics.append(
467-
mf_metric_schema.load(
468-
{
469-
"name": metric["name"],
470-
"type": metric["type"],
471-
"description": metric["description"],
472-
"sql": sql,
473-
"dialect": dialect.value,
474-
"model": model["unique_id"],
475-
},
476-
),
477-
)
478-
489+
sl_metrics = process_sl_metrics(dbt_client, job["environment_id"], model_map)
479490
superset_metrics = get_superset_metrics_per_model(og_metrics, sl_metrics)
480491

481492
if exposures_only:

0 commit comments

Comments
 (0)