From 810c5cd9fb1ef6ce8d4a99512df89f541949a64e Mon Sep 17 00:00:00 2001 From: alekjarmov Date: Thu, 17 Jul 2025 14:50:23 +0200 Subject: [PATCH 1/5] Add withTiming utility to SQLMetrics --- .../execution/datasources/jdbc/JDBCRDD.scala | 23 ++++++++----------- .../sql/execution/metric/SQLMetrics.scala | 15 ++++++++++++ .../execution/metric/SQLMetricsSuite.scala | 12 ++++++++++ 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index fa3391f7e1cc8..0d0722ad540f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -301,20 +301,17 @@ class JDBCRDD( stmt.setFetchSize(options.fetchSize) stmt.setQueryTimeout(options.queryTimeout) - val startTime = System.nanoTime - rs = try { - stmt.executeQuery() - } catch { - case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => - throw new SparkException( - errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_QUERY_EXECUTION", - messageParameters = Map("jdbcQuery" -> sqlText), - cause = e) + rs = SQLMetrics.withTimingNs(queryExecutionTimeMetric) { + try { + stmt.executeQuery() + } catch { + case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => + throw new SparkException( + errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_QUERY_EXECUTION", + messageParameters = Map("jdbcQuery" -> sqlText), + cause = e) + } } - val endTime = System.nanoTime - - val executionTime = endTime - startTime - queryExecutionTimeMetric.add(executionTime) val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, dialect, schema, inputMetrics) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 065c8db7ac6f9..13f4d7926bea8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -221,4 +221,19 @@ object SQLMetrics { SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value))) } } + + /** + * Measures the time taken by the function `f` in nanoseconds and adds it to the provided metric. + * + * @param metric SQLMetric to record the time taken. + * @param f Function/Codeblock to execute and measure. + * @return The result of the function `f`. + */ + def withTimingNs[T](metric: SQLMetric)(f: => T): T = { + val startTime = System.nanoTime() + val result = f + val endTime = System.nanoTime() + metric.add(endTime - startTime) + result + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 29481599362a4..36604adbd48e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -987,6 +987,18 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1)) assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0)) } + + test("withTimingNs should time and return same result") { + val metric = SQLMetrics.createTimingMetric(sparkContext, name = "m") + + // Use a simple block that returns a value + val result = SQLMetrics.withTimingNs(metric) { + 42 + } + + assert(result === 42) + assert(!metric.isZero, "Metric was not increased") + } } case class CustomFileCommitProtocol( From e8b560ae86e00fdd5b2e95844340093aa984ec37 Mon Sep 17 00:00:00 2001 From: alekjarmov Date: Thu, 17 Jul 2025 16:04:58 +0200 Subject: [PATCH 2/5] add metric --- .../execution/datasources/jdbc/JDBCRDD.scala | 19 ++++++++++++++++++- .../datasources/jdbc/JdbcUtils.scala | 17 +++++++++++++---- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 0d0722ad540f0..1c0fbea8915f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -189,6 +189,17 @@ class JDBCRDD( sparkContext, name = "JDBC query execution time") + /** + * Time needed to fetch the data and transform it into Spark's InternalRow format. + * + * Usually this is spent in network transfer time, but it can be spent in transformation time + * as well if we are transforming some more complex datatype such as structs. + */ + val fetchAndTransformToInternalRowsMetric: SQLMetric = SQLMetrics.createNanoTimingMetric( + sparkContext, + // Message that user sees does not have to leak details about conversion + name = "Remote data fetch time over JDBC connection") + private lazy val dialect = JdbcDialects.get(url) def generateJdbcQuery(partition: Option[JDBCPartition]): String = { @@ -314,7 +325,12 @@ class JDBCRDD( } val rowsIterator = - JdbcUtils.resultSetToSparkInternalRows(rs, dialect, schema, inputMetrics) + JdbcUtils.resultSetToSparkInternalRows( + rs, + dialect, + schema, + inputMetrics, + Some(fetchAndTransformToInternalRowsMetric)) CompletionIterator[InternalRow, Iterator[InternalRow]]( new InterruptibleIterator(context, rowsIterator), close()) @@ -322,6 +338,7 @@ class JDBCRDD( override def getMetrics: Seq[(String, SQLMetric)] = { Seq( + "fetchAndTransformToInternalRowsNs" -> fetchAndTransformToInternalRowsMetric, "queryExecutionTime" -> queryExecutionTimeMetric ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 0077012e2b0e4..c8bc96f7c81b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -22,13 +22,11 @@ import java.nio.charset.StandardCharsets import java.sql.{Connection, Date, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, Time, Timestamp} import java.time.{Instant, LocalDate} import java.util - import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal - import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.{Logging, MDC} @@ -46,6 +44,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType, NoopDialect} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -357,7 +356,8 @@ object JdbcUtils extends Logging with SQLConfHelper { resultSet: ResultSet, dialect: JdbcDialect, schema: StructType, - inputMetrics: InputMetrics): Iterator[InternalRow] = { + inputMetrics: InputMetrics, + fetchAndTransformToInternalRowsMetric: Option[SQLMetric] = None): Iterator[InternalRow] = { new NextIterator[InternalRow] { private[this] val rs = resultSet private[this] val getters: Array[JDBCValueGetter] = makeGetters(dialect, schema) @@ -372,7 +372,7 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - override protected def getNext(): InternalRow = { + private def getNextWithoutTiming: InternalRow = { if (rs.next()) { inputMetrics.incRecordsRead(1) var i = 0 @@ -387,6 +387,15 @@ object JdbcUtils extends Logging with SQLConfHelper { null.asInstanceOf[InternalRow] } } + + override protected def getNext(): InternalRow = { + if (fetchAndTransformToInternalRowsMetric.isDefined) { + SQLMetrics.withTimingNs(fetchAndTransformToInternalRowsMetric.get) { + getNextWithoutTiming + } + } else { + getNextWithoutTiming + } } } From 6618a871fb79d124bdcfaba9be59ca3526f814a4 Mon Sep 17 00:00:00 2001 From: alekjarmov Date: Thu, 17 Jul 2025 16:06:26 +0200 Subject: [PATCH 3/5] revert line removal --- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index c8bc96f7c81b9..fd0efc98f4efa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -22,11 +22,13 @@ import java.nio.charset.StandardCharsets import java.sql.{Connection, Date, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, Time, Timestamp} import java.time.{Instant, LocalDate} import java.util + import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal + import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.{Logging, MDC} From 064698cc29353ad4799e711cbf9312011060dd0f Mon Sep 17 00:00:00 2001 From: alekjarmov Date: Thu, 17 Jul 2025 16:27:24 +0200 Subject: [PATCH 4/5] add missing brace --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index fd0efc98f4efa..f34299b08726c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -390,13 +390,14 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - override protected def getNext(): InternalRow = { - if (fetchAndTransformToInternalRowsMetric.isDefined) { - SQLMetrics.withTimingNs(fetchAndTransformToInternalRowsMetric.get) { + override protected def getNext(): InternalRow = { + if (fetchAndTransformToInternalRowsMetric.isDefined) { + SQLMetrics.withTimingNs(fetchAndTransformToInternalRowsMetric.get) { + getNextWithoutTiming + } + } else { getNextWithoutTiming } - } else { - getNextWithoutTiming } } } From e923c279a9150496099d1dd609d31a306e3865cf Mon Sep 17 00:00:00 2001 From: Alek Jarmov Date: Fri, 18 Jul 2025 20:15:13 +0200 Subject: [PATCH 5/5] Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala Co-authored-by: Wenchen Fan --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 1c0fbea8915f6..8aea4919ff5a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -198,7 +198,7 @@ class JDBCRDD( val fetchAndTransformToInternalRowsMetric: SQLMetric = SQLMetrics.createNanoTimingMetric( sparkContext, // Message that user sees does not have to leak details about conversion - name = "Remote data fetch time over JDBC connection") + name = "JDBC remote data fetch and translation time") private lazy val dialect = JdbcDialects.get(url)