4
4
import sys
5
5
import tempfile
6
6
import copy
7
-
8
-
9
7
from pyspark .sql .functions import lit , expr
10
8
import pyspark .sql .types as T
11
9
from pyspark .sql import DataFrame
20
18
dlt = MagicMock ()
21
19
dlt .expect_all_or_drop = MagicMock ()
22
20
dlt .apply_changes_from_snapshot = MagicMock ()
23
-
24
-
25
-
26
-
27
-
28
21
raw_delta_table_stream = MagicMock ()
29
22
30
23
@@ -198,12 +191,6 @@ def test_invoke_dlt_pipeline_silver_positive(self, run_dlt):
198
191
f"{ database } .{ silver_dataflow_table } " ,
199
192
)
200
193
self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
201
-
202
-
203
-
204
-
205
-
206
-
207
194
options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
208
195
customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
209
196
(customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -229,7 +216,6 @@ def test_run_dlt_pipeline_silver_positive(self, read):
229
216
silver_spec_map .update (source_details )
230
217
silver_dataflow_spec = SilverDataflowSpec (** silver_spec_map )
231
218
self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
232
-
233
219
options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
234
220
customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
235
221
(customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -291,9 +277,6 @@ def test_get_silver_schema_positive(self):
291
277
silver_spec_map .update (source_details )
292
278
silver_dataflow_spec = SilverDataflowSpec (** silver_spec_map )
293
279
self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
294
-
295
-
296
-
297
280
options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
298
281
customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
299
282
(customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -318,9 +301,6 @@ def test_get_silver_schema_where_clause(self):
318
301
silver_dataflow_spec = SilverDataflowSpec (** silver_spec_map )
319
302
320
303
self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
321
-
322
-
323
-
324
304
options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
325
305
customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
326
306
(customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -354,9 +334,6 @@ def test_read_silver_positive(self):
354
334
}
355
335
silver_spec_map .update (source_details )
356
336
self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
357
-
358
-
359
-
360
337
options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
361
338
customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
362
339
(customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -400,9 +377,6 @@ def test_read_silver_with_where(self, get_silver_schema):
400
377
}
401
378
silver_spec_map .update (source_details )
402
379
self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
403
-
404
-
405
-
406
380
options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
407
381
customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
408
382
(customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -464,7 +438,6 @@ def test_cdc_apply_changes_scd_type2(self, cdc_apply_changes):
464
438
silver_dataflow_spec = SilverDataflowSpec (** silver_spec_map )
465
439
silver_dataflow_spec .cdcApplyChanges = json .dumps (self .silver_cdc_apply_changes_scd2 )
466
440
self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
467
-
468
441
options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
469
442
customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
470
443
(customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -1338,4 +1311,3 @@ def test_get_silver_schema_uc_disabled(self, mock_read_stream):
1338
1311
# format="delta"
1339
1312
# )
1340
1313
# mock_read_stream.load.return_value.selectExpr.assert_called_once_with(*silver_dataflow_spec.selectExp)
1341
-
0 commit comments