Skip to content

Commit 269945c

Browse files
[!458] - Resolve "Parallel Writer plugin tests are very slow"
# New features and improvements - Running `sbt wasp-plugin-parallel-write-spark/test` takes 90s on a M1 Max MBP. # Breaking changes None. # Migration None. # Bug fixes None. # How this feature was tested Existing tests # Related issue Closes #574
1 parent 809f965 commit 269945c

File tree

2 files changed

+66
-13
lines changed

2 files changed

+66
-13
lines changed

plugin-parallel-write-spark/src/test/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/tools/ContinuousUpdateWriterSpec.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,21 @@ import it.agilelab.bigdata.wasp.consumers.spark.plugins.parallel.tools.utils.Par
66
import org.apache.spark.sql.execution.streaming.MemoryStream
77
import org.apache.spark.sql.functions.col
88
import org.apache.spark.sql.streaming.StreamingQueryException
9-
import org.apache.spark.sql.types.{ DataType, StructField, StructType }
9+
import org.apache.spark.sql.types.{DataType, StructField, StructType}
1010
import org.scalatest.FunSuite
11-
import org.scalatest.Matchers.{ an, be }
11+
import org.scalatest.Matchers.{an, be}
1212

1313
case class Schema(ordering: Int, column1: String, column2: String)
1414
case class Schema2(ordering: Int, column1: String, column2: String, column3: String)
1515
case class NotSupportedSchema(ordering: Int, column1: String)
1616
case class Schema3(ordering1: Int, ordering2: Float, column1: String, column2: String)
17-
case class Schema4(ordering1: Int, ordering2: Float, column1: String, column2: String, colThatIsNotPartOfDeltaTableSchema: String)
17+
case class Schema4(
18+
ordering1: Int,
19+
ordering2: Float,
20+
column1: String,
21+
column2: String,
22+
colThatIsNotPartOfDeltaTableSchema: String
23+
)
1824
case class CaseSensitiveTest(orDeRing: Int, column1: String, column2: String)
1925

2026
class ContinuousUpdateWriterSpec extends FunSuite with DeltaTableTest {
@@ -316,7 +322,9 @@ class ContinuousUpdateWriterSpec extends FunSuite with DeltaTableTest {
316322
}
317323
}
318324

319-
test("Data deduplication when enforcing schema involves dropping columns and the result of the ordering expression is ambiguous") {
325+
test(
326+
"Data deduplication when enforcing schema involves dropping columns and the result of the ordering expression is ambiguous"
327+
) {
320328
withServer(dispatcher) { serverData =>
321329
val data: Seq[Schema4] = Seq(
322330
Schema4(1, 0.4f, "key1", "value1", "thisColWillBeDroppedBeforeWriting1"),
Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,69 @@
11
package it.agilelab.bigdata.wasp.consumers.spark.plugins.parallel.tools
22

33
import it.agilelab.bigdata.wasp.consumers.spark.plugins.parallel.catalog.CatalogCoordinates
4-
import it.agilelab.bigdata.wasp.consumers.spark.plugins.parallel.model.{ContinuousUpdate, ParallelWrite, ParallelWriteModel}
4+
import it.agilelab.bigdata.wasp.consumers.spark.plugins.parallel.model.{
5+
ContinuousUpdate,
6+
ParallelWrite,
7+
ParallelWriteModel
8+
}
59

610
object TestModels {
711
val model1 = ParallelWriteModel(ParallelWrite("append"), entityDetails = CatalogCoordinates("default", "mock", "v1"))
8-
val model2 = ParallelWriteModel(ParallelWrite("overwrite"), entityDetails = CatalogCoordinates("default", "mock", "v1"))
12+
val model2 =
13+
ParallelWriteModel(ParallelWrite("overwrite"), entityDetails = CatalogCoordinates("default", "mock", "v1"))
914

10-
private val entityDetails: CatalogCoordinates = CatalogCoordinates("default", "mock", "v1")
11-
private val entityDetailsWithDB: CatalogCoordinates = CatalogCoordinates("default", "mock", "v1", Some("mock_db"))
12-
private val wrongDetails: CatalogCoordinates = CatalogCoordinates("default", "fake", "v1")
15+
private val entityDetails: CatalogCoordinates = CatalogCoordinates("default", "mock", "v1")
16+
private val entityDetailsWithDB: CatalogCoordinates = CatalogCoordinates("default", "mock", "v1", Some("mock_db"))
17+
private val wrongDetails: CatalogCoordinates = CatalogCoordinates("default", "fake", "v1")
1318
private val notExistingEntityDetails: CatalogCoordinates = CatalogCoordinates("default", "entity", "v1")
1419

1520
val modelWithDB = ParallelWriteModel(ParallelWrite("overwrite"), entityDetails = entityDetailsWithDB)
1621

17-
val continuousUpdateModel1 = ParallelWriteModel(ContinuousUpdate(keys = "column1" :: Nil, orderingExpression = "ordering", Some(100), Some(1), Some(168), Some(100)), entityDetails)
22+
val continuousUpdateModel1 = ParallelWriteModel(
23+
ContinuousUpdate(
24+
keys = "column1" :: Nil,
25+
orderingExpression = "ordering",
26+
compactFrequency = None,
27+
compactNumFile = None,
28+
retentionHours = None,
29+
vacuumFrequency = None
30+
),
31+
entityDetails
32+
)
1833

19-
val continuousUpdateModel2 = ParallelWriteModel(ContinuousUpdate(keys = "column1" :: Nil, orderingExpression = "-(ordering1 + ordering2)", Some(100), Some(1), Some(168), Some(100)), entityDetails)
34+
val continuousUpdateModel2 = ParallelWriteModel(
35+
ContinuousUpdate(
36+
keys = "column1" :: Nil,
37+
orderingExpression = "-(ordering1 + ordering2)",
38+
compactFrequency = Some(100),
39+
compactNumFile = Some(1),
40+
retentionHours = Some(168),
41+
vacuumFrequency = Some(100)
42+
),
43+
entityDetails
44+
)
2045

21-
val wrongModel = ParallelWriteModel(ContinuousUpdate(keys = "column1" :: Nil, orderingExpression = "-(ordering1 + ordering2)", Some(100), Some(1), Some(168), Some(100)), wrongDetails)
46+
val wrongModel = ParallelWriteModel(
47+
ContinuousUpdate(
48+
keys = "column1" :: Nil,
49+
orderingExpression = "-(ordering1 + ordering2)",
50+
compactFrequency = Some(100),
51+
compactNumFile = Some(1),
52+
retentionHours = Some(168),
53+
vacuumFrequency = Some(100)
54+
),
55+
wrongDetails
56+
)
2257

23-
val notExistingEntityModel = ParallelWriteModel(ContinuousUpdate(keys = "column1" :: Nil, orderingExpression = "-(ordering1 + ordering2)", Some(100), Some(1), Some(168), Some(100)), notExistingEntityDetails)
58+
val notExistingEntityModel = ParallelWriteModel(
59+
ContinuousUpdate(
60+
keys = "column1" :: Nil,
61+
orderingExpression = "-(ordering1 + ordering2)",
62+
compactFrequency = Some(100),
63+
compactNumFile = Some(1),
64+
retentionHours = Some(168),
65+
vacuumFrequency = Some(100)
66+
),
67+
notExistingEntityDetails
68+
)
2469
}

0 commit comments

Comments
 (0)