Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 608312d

Browse files
committed
Merge branch 'dlawin-dbt'
2 parents 13f9beb + 1420e9c commit 608312d

File tree

5 files changed

+71
-30
lines changed

5 files changed

+71
-30
lines changed

data_diff/dbt.py

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import logging
33
import os
4+
import time
45
import rich
56
import yaml
67
from dataclasses import dataclass
@@ -11,6 +12,14 @@
1112
from dbt_artifacts_parser.parser import parse_run_results, parse_manifest
1213
from dbt.config.renderer import ProfileRenderer
1314

15+
from .tracking import (
16+
set_entrypoint_name,
17+
create_end_event_json,
18+
create_start_event_json,
19+
send_event_json,
20+
is_tracking_enabled,
21+
)
22+
from .utils import run_as_daemon, truncate_error
1423
from . import connect_to_table, diff_tables, Algorithm
1524

1625
RUN_RESULTS_PATH = "/target/run_results.json"
@@ -33,6 +42,7 @@ class DiffVars:
3342
def dbt_diff(
3443
profiles_dir_override: Optional[str] = None, project_dir_override: Optional[str] = None, is_cloud: bool = False
3544
) -> None:
45+
set_entrypoint_name("CLI-dbt")
3646
dbt_parser = DbtParser(profiles_dir_override, project_dir_override, is_cloud)
3747
models = dbt_parser.get_models()
3848
dbt_parser.set_project_dict()
@@ -190,22 +200,53 @@ def _cloud_diff(diff_vars: DiffVars) -> None:
190200
"Authorization": f"Key {api_key}",
191201
"Content-Type": "application/json",
192202
}
203+
if is_tracking_enabled():
204+
event_json = create_start_event_json({"is_cloud": True, "datasource_id": diff_vars.datasource_id})
205+
run_as_daemon(send_event_json, event_json)
193206

194-
response = requests.request("POST", url, headers=headers, json=payload, timeout=30)
195-
response.raise_for_status()
196-
data = response.json()
197-
diff_id = data["id"]
198-
# TODO in future we should support self hosted datafold
199-
diff_url = f"https://app.datafold.com/datadiffs/{diff_id}/overview"
200-
rich.print(
201-
"[red]"
202-
+ ".".join(diff_vars.dev_path)
203-
+ " <> "
204-
+ ".".join(diff_vars.prod_path)
205-
+ "[/] \n Diff in progress: \n "
206-
+ diff_url
207-
+ "\n"
208-
)
207+
start = time.monotonic()
208+
error = None
209+
diff_id = None
210+
try:
211+
response = requests.request("POST", url, headers=headers, json=payload, timeout=30)
212+
response.raise_for_status()
213+
data = response.json()
214+
diff_id = data["id"]
215+
# TODO in future we should support self hosted datafold
216+
diff_url = f"https://app.datafold.com/datadiffs/{diff_id}/overview"
217+
rich.print(
218+
"[red]"
219+
+ ".".join(diff_vars.dev_path)
220+
+ " <> "
221+
+ ".".join(diff_vars.prod_path)
222+
+ "[/] \n Diff in progress: \n "
223+
+ diff_url
224+
+ "\n"
225+
)
226+
except BaseException as ex: # Catch KeyboardInterrupt too
227+
error = ex
228+
finally:
229+
# we don't currently have much of this information
230+
# but I imagine a future iteration of this _cloud method
231+
# will poll for results
232+
if is_tracking_enabled():
233+
err_message = truncate_error(repr(error))
234+
event_json = create_end_event_json(
235+
is_success=error is None,
236+
runtime_seconds=time.monotonic() - start,
237+
data_source_1_type="",
238+
data_source_2_type="",
239+
table1_count=0,
240+
table2_count=0,
241+
diff_count=0,
242+
error=err_message,
243+
diff_id=diff_id,
244+
is_cloud=True,
245+
)
246+
send_event_json(event_json)
247+
248+
if error:
249+
raise error
209250

210251

211252
class DbtParser:
@@ -230,7 +271,6 @@ def get_models(self):
230271

231272
dbt_version = parse_version(run_results_obj.metadata.dbt_version)
232273

233-
# TODO 1.4 support
234274
if dbt_version < parse_version(LOWER_DBT_V) or dbt_version >= parse_version(UPPER_DBT_V):
235275
raise Exception(
236276
f"Found dbt: v{dbt_version} Expected the dbt project's version to be >= {LOWER_DBT_V} and < {UPPER_DBT_V}"

data_diff/diff_tables.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from data_diff.info_tree import InfoTree, SegmentInfo
1616

17-
from .utils import run_as_daemon, safezip, getLogger
17+
from .utils import run_as_daemon, safezip, getLogger, truncate_error
1818
from .thread_utils import ThreadedYielder
1919
from .table_segment import TableSegment
2020
from .tracking import create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled
@@ -32,11 +32,6 @@ class Algorithm(Enum):
3232
DiffResult = Iterator[Tuple[str, tuple]] # Iterator[Tuple[Literal["+", "-"], tuple]]
3333

3434

35-
def truncate_error(error: str):
36-
first_line = error.split("\n", 1)[0]
37-
return re.sub("'(.*?)'", "'***'", first_line)
38-
39-
4035
@dataclass
4136
class ThreadBase:
4237
"Provides utility methods for optional threading"
@@ -124,7 +119,6 @@ def _get_stats(self) -> DiffStats:
124119
return DiffStats(diff_by_sign, table1_count, table2_count, unchanged, diff_percent)
125120

126121
def get_stats_string(self):
127-
128122
diff_stats = self._get_stats()
129123
string_output = ""
130124
string_output += f"{diff_stats.table1_count} rows in table A\n"
@@ -143,7 +137,6 @@ def get_stats_string(self):
143137
return string_output
144138

145139
def get_stats_dict(self):
146-
147140
diff_stats = self._get_stats()
148141
json_output = {
149142
"rows_A": diff_stats.table1_count,
@@ -190,7 +183,6 @@ def _diff_tables_wrapper(self, table1: TableSegment, table2: TableSegment, info_
190183
start = time.monotonic()
191184
error = None
192185
try:
193-
194186
# Query and validate schema
195187
table1, table2 = self._threaded_call("with_schema", [table1, table2])
196188
self._validate_and_adjust_columns(table1, table2)

data_diff/tracking.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,14 @@ def create_start_event_json(diff_options: Dict[str, Any]):
8585
def create_end_event_json(
8686
is_success: bool,
8787
runtime_seconds: float,
88-
db1: str,
89-
db2: str,
88+
data_source_1_type: str,
89+
data_source_2_type: str,
9090
table1_count: int,
9191
table2_count: int,
9292
diff_count: int,
9393
error: Optional[str],
94+
diff_id: Optional[int] = None,
95+
is_cloud: bool = False,
9496
):
9597
return {
9698
"event": "os_diff_run_end",
@@ -100,14 +102,16 @@ def create_end_event_json(
100102
"time": time(),
101103
"is_success": is_success,
102104
"runtime_seconds": runtime_seconds,
103-
"data_source_1_type": db1,
104-
"data_source_2_type": db2,
105+
"data_source_1_type": data_source_1_type,
106+
"data_source_2_type": data_source_2_type,
105107
"table_1_rows_cnt": table1_count,
106108
"table_2_rows_cnt": table2_count,
107109
"diff_rows_cnt": diff_count,
108110
"error_message": error,
109111
"data_diff_version:": __version__,
110112
"entrypoint_name": entrypoint_name,
113+
"is_cloud": is_cloud,
114+
"diff_id": diff_id,
111115
},
112116
}
113117

data_diff/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,8 @@ def get_timestamp(_match):
7272
return datetime.now().isoformat("_", "seconds").replace(":", "_")
7373

7474
return re.sub("%t", get_timestamp, name)
75+
76+
77+
def truncate_error(error: str):
78+
first_line = error.split("\n", 1)[0]
79+
return re.sub("'(.*?)'", "'***'", first_line)

tests/test_dbt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_get_models_bad_lower_dbt_version(self, mock_manifest_parser, mock_run_p
9494
mock_run_parser.assert_called_once_with(run_results={})
9595
mock_manifest_parser.assert_not_called()
9696
self.assertIn("version to be", ex.exception.args[0])
97-
97+
9898
@patch("builtins.open", new_callable=mock_open, read_data="{}")
9999
@patch("data_diff.dbt.parse_run_results")
100100
@patch("data_diff.dbt.parse_manifest")

0 commit comments

Comments
 (0)