Skip to content

Commit 9193df4

Browse files
committed
make metrics smaller when there is a lot of data
1 parent 02c4242 commit 9193df4

File tree

39 files changed

+845
-32
lines changed

39 files changed

+845
-32
lines changed

spark-plugin/build.sbt

Lines changed: 93 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
11
import xerial.sbt.Sonatype._
2+
import sbtassembly.AssemblyPlugin.autoImport._
23

34
lazy val versionNum: String = "0.5.1"
4-
lazy val scala212 = "2.12.18"
5-
lazy val scala213 = "2.13.12"
5+
lazy val scala212 = "2.12.20"
6+
lazy val scala213 = "2.13.16"
67
lazy val supportedScalaVersions = List(scala212, scala213)
78

89
lazy val dataflint = project
910
.in(file("."))
1011
.aggregate(
1112
plugin,
13+
pluginspark3,
14+
pluginspark4,
1215
example_3_1_3,
1316
example_3_2_4,
1417
example_3_3_3,
1518
example_3_4_1,
1619
example_3_5_1,
17-
example_3_4_1_remote
20+
example_3_4_1_remote,
21+
example_4_0_1
1822
).settings(
1923
crossScalaVersions := Nil, // Aggregate project version must be Nil, see docs: https://www.scala-sbt.org/1.x/docs/Cross-Build.html
2024
publish / skip := true
2125
)
2226

2327
lazy val plugin = (project in file("plugin"))
2428
.settings(
25-
name := "spark",
29+
name := "dataflint-common",
2630
organization := "io.dataflint",
2731
crossScalaVersions := supportedScalaVersions,
2832
version := (if (git.gitCurrentTags.value.exists(_.startsWith("v"))) {
@@ -34,7 +38,76 @@ lazy val plugin = (project in file("plugin"))
3438
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1" % "provided",
3539
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.12.470" % "provided",
3640
libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.5" % "1.5.0" % "provided",
41+
publishTo := Some(Resolver.file("local-common", file("target/local-repo"))), // Publish locally only, not to remote repositories
42+
)
43+
44+
lazy val pluginspark3 = (project in file("pluginspark3"))
45+
.enablePlugins(AssemblyPlugin)
46+
.settings(
47+
name := "spark", // Keep the name as "spark" for compatibility with existing versions
48+
organization := "io.dataflint",
49+
crossScalaVersions := supportedScalaVersions,
50+
version := (if (git.gitCurrentTags.value.exists(_.startsWith("v"))) {
51+
versionNum
52+
} else {
53+
versionNum + "-SNAPSHOT"
54+
}),
55+
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1" % "provided",
56+
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1" % "provided",
57+
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.12.470" % "provided",
58+
libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.5" % "1.5.0" % "provided",
59+
60+
// Assembly configuration to create fat JAR with common code
61+
assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar",
62+
assembly / assemblyMergeStrategy := {
63+
case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat
64+
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
65+
case "application.conf" => MergeStrategy.concat
66+
case "reference.conf" => MergeStrategy.concat
67+
case _ => MergeStrategy.first
68+
},
69+
70+
// Publish the assembled JAR instead of the regular JAR
71+
Compile / packageBin := assembly.value,
3772
publishTo := sonatypePublishToBundle.value
73+
).dependsOn(plugin % "compile->compile;test->test")
74+
75+
lazy val pluginspark4 = (project in file("pluginspark4"))
76+
.enablePlugins(AssemblyPlugin)
77+
.settings(
78+
name := "dataflint-spark4",
79+
organization := "io.dataflint",
80+
scalaVersion := scala213,
81+
crossScalaVersions := List(scala213), // Only Scala 2.13 for Spark 4.x
82+
version := (if (git.gitCurrentTags.value.exists(_.startsWith("v"))) {
83+
versionNum
84+
} else {
85+
versionNum + "-SNAPSHOT"
86+
}),
87+
libraryDependencies += "org.apache.spark" %% "spark-core" % "4.0.1" % "provided",
88+
libraryDependencies += "org.apache.spark" %% "spark-sql" % "4.0.1" % "provided",
89+
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.12.470" % "provided",
90+
libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.5" % "1.5.0" % "provided",
91+
92+
// Assembly configuration to create fat JAR with common code
93+
assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar",
94+
assembly / assemblyMergeStrategy := {
95+
case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat
96+
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
97+
case "application.conf" => MergeStrategy.concat
98+
case "reference.conf" => MergeStrategy.concat
99+
case _ => MergeStrategy.first
100+
},
101+
102+
// Publish the assembled JAR instead of the regular JAR
103+
Compile / packageBin := assembly.value,
104+
publishTo := sonatypePublishToBundle.value,
105+
106+
// Include source from plugin directory for self-contained build
107+
Compile / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala",
108+
109+
// Include resources from plugin directory for static UI files
110+
Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value
38111
)
39112

40113
lazy val example_3_1_3 = (project in file("example_3_1_3"))
@@ -45,7 +118,7 @@ lazy val example_3_1_3 = (project in file("example_3_1_3"))
45118
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.3",
46119
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.3",
47120
publish / skip := true
48-
).dependsOn(plugin)
121+
).dependsOn(pluginspark3)
49122

50123
lazy val example_3_2_4 = (project in file("example_3_2_4"))
51124
.settings(
@@ -56,7 +129,7 @@ lazy val example_3_2_4 = (project in file("example_3_2_4"))
56129
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.4",
57130
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.12.470",
58131
publish / skip := true
59-
).dependsOn(plugin)
132+
).dependsOn(pluginspark3)
60133

61134
lazy val example_3_3_3 = (project in file("example_3_3_3"))
62135
.settings(
@@ -69,7 +142,7 @@ lazy val example_3_3_3 = (project in file("example_3_3_3"))
69142
libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.3" % "1.5.0",
70143
libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
71144
publish / skip := true
72-
).dependsOn(plugin)
145+
).dependsOn(pluginspark3)
73146

74147
lazy val example_3_4_1 = (project in file("example_3_4_1"))
75148
.settings(
@@ -81,7 +154,7 @@ lazy val example_3_4_1 = (project in file("example_3_4_1"))
81154
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.12.470",
82155
libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.4" % "1.5.0",
83156
publish / skip := true
84-
).dependsOn(plugin)
157+
).dependsOn(pluginspark3)
85158

86159
lazy val example_3_5_1 = (project in file("example_3_5_1"))
87160
.settings(
@@ -99,7 +172,7 @@ lazy val example_3_5_1 = (project in file("example_3_5_1"))
99172
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.17",
100173
libraryDependencies += "org.apache.datafusion" % "comet-spark-spark3.5_2.12" % "0.4.0",
101174
publish / skip := true
102-
).dependsOn(plugin)
175+
).dependsOn(pluginspark3)
103176

104177
lazy val example_3_4_1_remote = (project in file("example_3_4_1_remote"))
105178
.settings(
@@ -112,3 +185,14 @@ lazy val example_3_4_1_remote = (project in file("example_3_4_1_remote"))
112185
publish / skip := true
113186
).dependsOn()
114187

188+
lazy val example_4_0_1 = (project in file("example_4_0_1"))
189+
.settings(
190+
name := "DataflintSparkExample401",
191+
organization := "io.dataflint",
192+
scalaVersion := scala213,
193+
crossScalaVersions := List(scala213), // Only Scala 2.13 for Spark 4.x
194+
// there is no scala 2.12 version so we need to force 2.13 to make it compile
195+
libraryDependencies += "org.apache.spark" % "spark-core_2.13" % "4.0.1",
196+
libraryDependencies += "org.apache.spark" % "spark-sql_2.13" % "4.0.1",
197+
publish / skip := true
198+
).dependsOn(pluginspark4)

spark-plugin/clean-and-setup.sh

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#!/bin/bash
2+
3+
# Clean and setup script for DataFlint Spark plugin development
4+
# This resolves cross-version dependency conflicts in IntelliJ IDEA
5+
6+
echo "🧹 Cleaning local ivy cache and build artifacts..."
7+
8+
# Remove all local dataflint artifacts
9+
rm -rf ~/.ivy2/local/io.dataflint/
10+
11+
# Clean all target directories
12+
find . -name "target" -type d -exec rm -rf {} + 2>/dev/null || true
13+
14+
echo "📦 Publishing required versions..."
15+
16+
# Publish Scala 2.12 version for Spark 3.x projects
17+
echo "Publishing Scala 2.12 version..."
18+
sbt plugin/publishLocal
19+
20+
# Publish Scala 2.13 version for Spark 4.x projects
21+
echo "Publishing Scala 2.13 version..."
22+
sbt ++2.13.16 plugin/publishLocal
23+
24+
echo "🔨 Building fat JARs..."
25+
26+
# Build Spark 3 fat JAR
27+
echo "Building Spark 3 fat JAR..."
28+
sbt pluginspark3/assembly
29+
30+
# Build Spark 4 fat JAR
31+
echo "Building Spark 4 fat JAR..."
32+
sbt ++2.13.16 pluginspark4/assembly
33+
34+
echo "✅ Setup complete!"
35+
echo ""
36+
echo "📋 Next steps:"
37+
echo "1. Refresh your IntelliJ IDEA project (File -> Reload Gradle Project or similar)"
38+
echo "2. If you still get conflicts, try: File -> Invalidate Caches and Restart"
39+
echo ""
40+
echo "📦 Fat JARs created:"
41+
echo "- Spark 3.x: pluginspark3/target/scala-2.12/dataflint-spark3_2.12-0.5.1-SNAPSHOT.jar"
42+
echo "- Spark 4.x: pluginspark4/target/scala-2.13/dataflint-spark4_2.13-0.5.1-SNAPSHOT.jar"
43+
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.dataflint.example
2+
3+
import org.apache.spark.sql.{DataFrame, SparkSession}
4+
import org.apache.spark.sql.functions._
5+
6+
object ShakespeareSpark401 extends App {
7+
def df(spark: SparkSession): DataFrame = spark.read
8+
.format("csv")
9+
.option("sep", ";")
10+
.option("inferSchema", true)
11+
.load("./test_data/will_play_text.csv")
12+
.toDF("line_id", "play_name", "speech_number", "line_number", "speaker", "text_entry")
13+
.repartition(1000)
14+
15+
val spark = SparkSession
16+
.builder()
17+
.appName("Shakespeare Statistics")
18+
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin")
19+
.config("spark.dataflint.telemetry.enabled", false)
20+
.config("spark.ui.port", "10000")
21+
.master("local[*]")
22+
.getOrCreate()
23+
24+
import spark.implicits._
25+
26+
val shakespeareText = df(spark)
27+
28+
shakespeareText.printSchema()
29+
30+
val count = shakespeareText.count()
31+
println(s"number of records : $count")
32+
33+
val uniqueSpeakers = shakespeareText.select($"speaker").distinct().count()
34+
println(s"number of unique speakers : $uniqueSpeakers")
35+
36+
val uniqueWords = shakespeareText.select(explode(split($"text_entry", " "))).distinct().count()
37+
38+
println(s"number of unique words : $uniqueWords")
39+
40+
scala.io.StdIn.readLine()
41+
spark.stop()
42+
}

spark-plugin/plugin/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin

Lines changed: 0 additions & 1 deletion
This file was deleted.

spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUILoader.scala renamed to spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package org.apache.spark.dataflint
22

33
import org.apache.spark.SparkContext
4-
import org.apache.spark.dataflint.api.{DataFlintTab, DataflintApplicationInfoPage, DataflintCachedStoragePage, DataflintIcebergPage, DataflintJettyUtils, DataflintSQLMetricsPage, DataflintSQLPlanPage, DataflintSQLStagesRddPage}
4+
import org.apache.spark.dataflint.api.DataflintPageFactory
55
import org.apache.spark.dataflint.listener.{DataflintEnvironmentInfo, DataflintEnvironmentInfoEvent}
66
import org.apache.spark.dataflint.iceberg.ClassLoaderChecker
77
import org.apache.spark.dataflint.iceberg.ClassLoaderChecker.isMetricLoaderInRightClassLoader
@@ -14,13 +14,13 @@ import org.apache.spark.sql.execution.ui.SQLAppStatusListener
1414
import org.apache.spark.status.{ElementTrackingStore, LiveRDDsListener}
1515
import org.apache.spark.ui.SparkUI
1616

17-
class DataflintSparkUIInstaller extends Logging {
18-
def install(context: SparkContext): String = {
17+
class DataflintSparkUICommonInstaller extends Logging {
18+
def install(context: SparkContext, pageFactory: DataflintPageFactory): String = {
1919
if(context.ui.isEmpty) {
2020
logWarning("No UI detected, skipping installation...")
2121
return ""
2222
}
23-
val isDataFlintAlreadyInstalled = context.ui.get.getTabs.exists(_.name == "DataFlint")
23+
val isDataFlintAlreadyInstalled = pageFactory.getTabs(context.ui.get).exists(_.name == "DataFlint")
2424
if(isDataFlintAlreadyInstalled){
2525
logInfo("DataFlint UI is already installed, skipping installation...")
2626
return context.ui.get.webUrl
@@ -95,35 +95,46 @@ class DataflintSparkUIInstaller extends Logging {
9595
logWarning("Could not add DataFlint Listeners to listener bus", e)
9696
}
9797

98-
loadUI(context.ui.get, sqlListener)
98+
loadUI(context.ui.get, pageFactory, sqlListener)
9999
}
100100

101-
def loadUI(ui: SparkUI, sqlListener: () => Option[SQLAppStatusListener] = () => None): String = {
102-
val isDataFlintAlreadyInstalled = ui.getTabs.exists(_.name == "DataFlint")
101+
def loadUI(ui: SparkUI, pageFactory: DataflintPageFactory, sqlListener: () => Option[SQLAppStatusListener] = () => None): String = {
102+
val isDataFlintAlreadyInstalled = pageFactory.getTabs(ui).exists(_.name == "DataFlint")
103103
if (isDataFlintAlreadyInstalled) {
104104
logInfo("DataFlint UI is already installed, skipping installation...")
105105
return ui.webUrl
106106
}
107-
DataflintJettyUtils.addStaticHandler(ui, "io/dataflint/spark/static/ui", ui.basePath + "/dataflint")
107+
pageFactory.addStaticHandler(ui, "io/dataflint/spark/static/ui", ui.basePath + "/dataflint")
108108
val dataflintStore = new DataflintStore(store = ui.store.store)
109-
val tab = new DataFlintTab(ui)
110-
tab.attachPage(new DataflintSQLPlanPage(ui, dataflintStore, sqlListener))
111-
tab.attachPage(new DataflintSQLMetricsPage(ui, sqlListener))
112-
tab.attachPage(new DataflintSQLStagesRddPage(ui))
113-
tab.attachPage(new DataflintApplicationInfoPage(ui, dataflintStore))
114-
tab.attachPage(new DataflintIcebergPage(ui, dataflintStore))
115-
tab.attachPage(new DataflintCachedStoragePage(ui, dataflintStore))
109+
val tab = pageFactory.createDataFlintTab(ui)
110+
tab.attachPage(pageFactory.createSQLPlanPage(ui, dataflintStore, sqlListener))
111+
tab.attachPage(pageFactory.createSQLMetricsPage(ui, sqlListener))
112+
tab.attachPage(pageFactory.createSQLStagesRddPage(ui))
113+
tab.attachPage(pageFactory.createApplicationInfoPage(ui, dataflintStore))
114+
tab.attachPage(pageFactory.createIcebergPage(ui, dataflintStore))
115+
tab.attachPage(pageFactory.createCachedStoragePage(ui, dataflintStore))
116116
ui.attachTab(tab)
117117
ui.webUrl
118118
}
119119

120120
}
121-
object DataflintSparkUILoader {
121+
122+
object DataflintSparkUICommonLoader {
123+
124+
def install(context: SparkContext, pageFactory: DataflintPageFactory): String = {
125+
new DataflintSparkUICommonInstaller().install(context, pageFactory)
126+
}
127+
128+
def loadUI(ui: SparkUI, pageFactory: DataflintPageFactory): String = {
129+
new DataflintSparkUICommonInstaller().loadUI(ui, pageFactory)
130+
}
131+
132+
// Backward compatibility methods - these will be overridden in version-specific implementations
122133
def install(context: SparkContext): String = {
123-
new DataflintSparkUIInstaller().install(context)
134+
throw new UnsupportedOperationException("This method requires a version-specific implementation. Use pluginspark3 or pluginspark4.")
124135
}
125136

126137
def loadUI(ui: SparkUI): String = {
127-
new DataflintSparkUIInstaller().loadUI(ui)
138+
throw new UnsupportedOperationException("This method requires a version-specific implementation. Use pluginspark3 or pluginspark4.")
128139
}
129-
}
140+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.apache.spark.dataflint.api
2+
3+
import org.apache.spark.dataflint.listener.DataflintStore
4+
import org.apache.spark.sql.execution.ui.SQLAppStatusListener
5+
import org.apache.spark.ui.{SparkUI, WebUIPage, WebUITab}
6+
7+
/**
8+
* Abstract factory for creating Dataflint UI components.
9+
* This allows version-specific implementations for different Spark versions.
10+
*/
11+
abstract class DataflintPageFactory {
12+
13+
def createDataFlintTab(ui: SparkUI): WebUITab
14+
15+
def createApplicationInfoPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage
16+
17+
def createCachedStoragePage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage
18+
19+
def createIcebergPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage
20+
21+
def createSQLMetricsPage(ui: SparkUI, sqlListener: () => Option[SQLAppStatusListener]): WebUIPage
22+
23+
def createSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListener: () => Option[SQLAppStatusListener]): WebUIPage
24+
25+
def createSQLStagesRddPage(ui: SparkUI): WebUIPage
26+
27+
def addStaticHandler(ui: SparkUI, resourceBase: String, contextPath: String): Unit
28+
29+
def getTabs(ui: SparkUI): Seq[WebUITab]
30+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.deploy.history.DataFlintHistoryServerPlugin

spark-plugin/plugin/src/main/scala/io/dataflint/spark/SparkDataflint.scala renamed to spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflint.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.dataflint.spark
22

33
import org.apache.spark.SparkContext
4-
import org.apache.spark.dataflint.DataflintSparkUILoader
4+
import org.apache.spark.dataflint.{DataflintSparkUICommonLoader, DataflintSparkUILoader}
55

66
object SparkDataflint {
77
def install(context: SparkContext): Unit = {

0 commit comments

Comments
 (0)