Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 2263a3c

Browse files
committed
add events for dbt
1 parent 270e31f commit 2263a3c

File tree

4 files changed

+61
-24
lines changed

4 files changed

+61
-24
lines changed

data_diff/dbt.py

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import logging
33
import os
4+
import time
45
import rich
56
import yaml
67
from dataclasses import dataclass
@@ -11,6 +12,8 @@
1112
from dbt_artifacts_parser.parser import parse_run_results, parse_manifest
1213
from dbt.config.renderer import ProfileRenderer
1314

15+
from .tracking import set_entrypoint_name, create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled
16+
from .utils import run_as_daemon, truncate_error
1417
from . import connect_to_table, diff_tables, Algorithm
1518

1619
RUN_RESULTS_PATH = "/target/run_results.json"
@@ -20,6 +23,7 @@
2023
LOWER_DBT_V = "1.0.0"
2124
UPPER_DBT_V = "1.5.0"
2225

26+
set_entrypoint_name("CLI-dbt")
2327

2428
@dataclass
2529
class DiffVars:
@@ -190,22 +194,52 @@ def _cloud_diff(diff_vars: DiffVars) -> None:
190194
"Authorization": f"Key {api_key}",
191195
"Content-Type": "application/json",
192196
}
197+
if is_tracking_enabled():
198+
event_json = create_start_event_json({"is_cloud": True, "datasource_id": diff_vars.datasource_id})
199+
run_as_daemon(send_event_json, event_json)
193200

194-
response = requests.request("POST", url, headers=headers, json=payload, timeout=30)
195-
response.raise_for_status()
196-
data = response.json()
197-
diff_id = data["id"]
198-
# TODO in future we should support self hosted datafold
199-
diff_url = f"https://app.datafold.com/datadiffs/{diff_id}/overview"
200-
rich.print(
201-
"[red]"
202-
+ ".".join(diff_vars.dev_path)
203-
+ " <> "
204-
+ ".".join(diff_vars.prod_path)
205-
+ "[/] \n Diff in progress: \n "
206-
+ diff_url
207-
+ "\n"
208-
)
201+
start = time.monotonic()
202+
error = None
203+
try:
204+
response = requests.request("POST", url, headers=headers, json=payload, timeout=30)
205+
response.raise_for_status()
206+
data = response.json()
207+
diff_id = data["id"]
208+
# TODO in future we should support self hosted datafold
209+
diff_url = f"https://app.datafold.com/datadiffs/{diff_id}/overview"
210+
rich.print(
211+
"[red]"
212+
+ ".".join(diff_vars.dev_path)
213+
+ " <> "
214+
+ ".".join(diff_vars.prod_path)
215+
+ "[/] \n Diff in progress: \n "
216+
+ diff_url
217+
+ "\n"
218+
)
219+
except BaseException as ex: # Catch KeyboardInterrupt too
220+
error = ex
221+
finally:
222+
# we don't currently have much of this information
223+
# but I imagine a future iteration of this _cloud method
224+
# will poll for results
225+
if is_tracking_enabled():
226+
err_message = truncate_error(repr(error))
227+
event_json = create_end_event_json(
228+
is_success = error is None,
229+
runtime_seconds = time.monotonic() - start,
230+
data_source_1_type = "",
231+
data_source_2_type = "",
232+
table1_count = 0,
233+
table2_count = 0,
234+
diff_count = 0,
235+
error = err_message,
236+
diff_id = diff_id,
237+
is_cloud = True
238+
)
239+
send_event_json(event_json)
240+
241+
if error:
242+
raise error
209243

210244

211245
class DbtParser:
@@ -230,7 +264,6 @@ def get_models(self):
230264

231265
dbt_version = parse_version(run_results_obj.metadata.dbt_version)
232266

233-
# TODO 1.4 support
234267
if dbt_version < parse_version(LOWER_DBT_V) or dbt_version >= parse_version(UPPER_DBT_V):
235268
raise Exception(
236269
f"Found dbt: v{dbt_version} Expected the dbt project's version to be >= {LOWER_DBT_V} and < {UPPER_DBT_V}"

data_diff/diff_tables.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
from data_diff.info_tree import InfoTree, SegmentInfo
1616

17-
from .utils import run_as_daemon, safezip, getLogger
1817
from .thread_utils import ThreadedYielder
1918
from .table_segment import TableSegment
2019
from .tracking import create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled
@@ -32,9 +31,6 @@ class Algorithm(Enum):
3231
DiffResult = Iterator[Tuple[str, tuple]] # Iterator[Tuple[Literal["+", "-"], tuple]]
3332

3433

35-
def truncate_error(error: str):
36-
first_line = error.split("\n", 1)[0]
37-
return re.sub("'(.*?)'", "'***'", first_line)
3834

3935

4036
@dataclass

data_diff/tracking.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,14 @@ def create_start_event_json(diff_options: Dict[str, Any]):
8585
def create_end_event_json(
8686
is_success: bool,
8787
runtime_seconds: float,
88-
db1: str,
89-
db2: str,
88+
data_source_1_type: str,
89+
data_source_2_type: str,
9090
table1_count: int,
9191
table2_count: int,
9292
diff_count: int,
9393
error: Optional[str],
94+
diff_id: Optional[int] = None,
95+
is_cloud: bool = False
9496
):
9597
return {
9698
"event": "os_diff_run_end",
@@ -100,14 +102,16 @@ def create_end_event_json(
100102
"time": time(),
101103
"is_success": is_success,
102104
"runtime_seconds": runtime_seconds,
103-
"data_source_1_type": db1,
104-
"data_source_2_type": db2,
105+
"data_source_1_type": data_source_1_type,
106+
"data_source_2_type": data_source_2_type,
105107
"table_1_rows_cnt": table1_count,
106108
"table_2_rows_cnt": table2_count,
107109
"diff_rows_cnt": diff_count,
108110
"error_message": error,
109111
"data_diff_version:": __version__,
110112
"entrypoint_name": entrypoint_name,
113+
"is_cloud": is_cloud,
114+
"diff_id": diff_id
111115
},
112116
}
113117

data_diff/utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,7 @@ def get_timestamp(_match):
7272
return datetime.now().isoformat("_", "seconds").replace(":", "_")
7373

7474
return re.sub("%t", get_timestamp, name)
75+
76+
def truncate_error(error: str):
77+
first_line = error.split("\n", 1)[0]
78+
return re.sub("'(.*?)'", "'***'", first_line)

0 commit comments

Comments
 (0)