Skip to content

Commit ee543d4

Browse files
szehon-hohuangxiaopingRD
authored andcommitted
[SPARK-53891][SQL] Model DSV2 Commit Write Summary API
### What changes were proposed in this pull request? apache#51377 added a DataSourceV2 API that sends operation metrics along with the commit, via a map of string, long. Change this to a proper model. Suggestion from aokolnychyi ### Why are the changes needed? It would be cleaner to model it as a proper object so that it is more clear what information Spark sends, and to handle future cases where metrics may not be long values. ### Does this PR introduce _any_ user-facing change? No, unreleased DSV2 API. ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52595 from szehon-ho/SPARK-53891. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent fbbf3f7 commit ee543d4

File tree

8 files changed

+234
-127
lines changed

8 files changed

+234
-127
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import org.apache.spark.annotation.Evolving;
2121

22-
import java.util.Map;
23-
2422
/**
2523
* An interface that defines how to write the data to data source for batch processing.
2624
* <p>
@@ -91,7 +89,7 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
9189
void commit(WriterCommitMessage[] messages);
9290

9391
/**
94-
* Commits this writing job with a list of commit messages and operation metrics.
92+
* Commits this writing job with a list of commit messages and write summary.
9593
* <p>
9694
* If this method fails (by throwing an exception), this writing job is considered to to have been
9795
* failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
@@ -105,31 +103,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
105103
* <p>
106104
* @param messages a list of commit messages from successful data writers, produced by
107105
* {@link DataWriter#commit()}.
108-
* @param metrics a map of operation metrics collected from the query producing write.
109-
* The keys will be prefixed by operation type, eg `merge`.
110-
* <p>
111-
* Currently supported metrics are:
112-
* <ul>
113-
* <li>Operation Type = `merge`
114-
* <ul>
115-
* <li>`numTargetRowsCopied`: number of target rows copied unmodified because
116-
* they did not match any action</li>
117-
* <li>`numTargetRowsDeleted`: number of target rows deleted</li>
118-
* <li>`numTargetRowsUpdated`: number of target rows updated</li>
119-
* <li>`numTargetRowsInserted`: number of target rows inserted</li>
120-
* <li>`numTargetRowsMatchedUpdated`: number of target rows updated by a
121-
* matched clause</li>
122-
* <li>`numTargetRowsMatchedDeleted`: number of target rows deleted by a
123-
* matched clause</li>
124-
* <li>`numTargetRowsNotMatchedBySourceUpdated`: number of target rows
125-
* updated by a not matched by source clause</li>
126-
* <li>`numTargetRowsNotMatchedBySourceDeleted`: number of target rows
127-
* deleted by a not matched by source clause</li>
128-
* </ul>
129-
* </li>
130-
* </ul>
106+
* @param summary an informational summary collected in a best-effort from the operation
107+
* producing write. Currently supported summary fields are provided through
108+
* implementations of {@link WriteSummary}.
131109
*/
132-
default void commit(WriterCommitMessage[] messages, Map<String, Long> metrics) {
110+
default void commit(WriterCommitMessage[] messages, WriteSummary summary) {
133111
commit(messages);
134112
}
135113

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
/**
23+
* Provides an informational summary of the MERGE operation producing write.
24+
*
25+
* @since 4.1.0
26+
*/
27+
@Evolving
28+
public interface MergeSummary extends WriteSummary {
29+
30+
/**
31+
* Returns the number of target rows copied unmodified because they did not match any action.
32+
*/
33+
long numTargetRowsCopied();
34+
35+
/**
36+
* Returns the number of target rows deleted.
37+
*/
38+
long numTargetRowsDeleted();
39+
40+
/**
41+
* Returns the number of target rows updated.
42+
*/
43+
long numTargetRowsUpdated();
44+
45+
/**
46+
* Returns the number of target rows inserted.
47+
*/
48+
long numTargetRowsInserted();
49+
50+
/**
51+
* Returns the number of target rows updated by a matched clause.
52+
*/
53+
long numTargetRowsMatchedUpdated();
54+
55+
/**
56+
* Returns the number of target rows deleted by a matched clause
57+
*/
58+
long numTargetRowsMatchedDeleted();
59+
60+
/**
61+
* Returns the number of target rows updated by a not matched by source clause.
62+
*/
63+
long numTargetRowsNotMatchedBySourceUpdated();
64+
65+
/**
66+
* Returns the number of target rows deleted by a not matched by source clause.
67+
*/
68+
long numTargetRowsNotMatchedBySourceDeleted();
69+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
/**
23+
* An informational summary of the operation producing write.
24+
*
25+
* @since 4.1.0
26+
*/
27+
@Evolving
28+
public interface WriteSummary {
29+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write
19+
20+
/**
21+
* Implementation of [[MergeSummary]] that provides MERGE operation summary.
22+
*/
23+
private[sql] case class MergeSummaryImpl(
24+
numTargetRowsCopied: Long,
25+
numTargetRowsDeleted: Long,
26+
numTargetRowsUpdated: Long,
27+
numTargetRowsInserted: Long,
28+
numTargetRowsMatchedUpdated: Long,
29+
numTargetRowsMatchedDeleted: Long,
30+
numTargetRowsNotMatchedBySourceUpdated: Long,
31+
numTargetRowsNotMatchedBySourceDeleted: Long)
32+
extends MergeSummary {
33+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -655,8 +655,6 @@ abstract class InMemoryBaseTable(
655655

656656
protected abstract class TestBatchWrite extends BatchWrite {
657657

658-
var commitProperties: mutable.Map[String, String] = mutable.Map.empty[String, String]
659-
660658
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
661659
new BufferedRowsWriterFactory(CatalogV2Util.v2ColumnsToStructType(columns()))
662660
}
@@ -668,8 +666,7 @@ abstract class InMemoryBaseTable(
668666

669667
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
670668
withData(messages.map(_.asInstanceOf[BufferedRows]))
671-
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
672-
commitProperties.clear()
669+
commits += Commit(Instant.now().toEpochMilli)
673670
}
674671
}
675672

@@ -678,17 +675,15 @@ abstract class InMemoryBaseTable(
678675
val newData = messages.map(_.asInstanceOf[BufferedRows])
679676
dataMap --= newData.flatMap(_.rows.map(getKey))
680677
withData(newData)
681-
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
682-
commitProperties.clear()
678+
commits += Commit(Instant.now().toEpochMilli)
683679
}
684680
}
685681

686682
class TruncateAndAppend(val info: LogicalWriteInfo) extends TestBatchWrite {
687683
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
688684
dataMap.clear()
689685
withData(messages.map(_.asInstanceOf[BufferedRows]))
690-
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
691-
commitProperties.clear()
686+
commits += Commit(Instant.now().toEpochMilli)
692687
}
693688
}
694689

@@ -1045,7 +1040,7 @@ class InMemoryCustomDriverTaskMetric(value: Long) extends CustomTaskMetric {
10451040
override def value(): Long = value
10461041
}
10471042

1048-
case class Commit(id: Long, properties: Map[String, String])
1043+
case class Commit(id: Long, writeSummary: Option[WriteSummary] = None)
10491044

10501045
sealed trait Operation
10511046
case object Write extends Operation

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,16 @@
1717

1818
package org.apache.spark.sql.connector.catalog
1919

20-
import java.{lang, util}
2120
import java.time.Instant
22-
23-
import scala.jdk.CollectionConverters._
21+
import java.util
2422

2523
import org.apache.spark.sql.catalyst.InternalRow
2624
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
2725
import org.apache.spark.sql.connector.catalog.constraints.Constraint
2826
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
2927
import org.apache.spark.sql.connector.expressions.{FieldReference, LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform}
3028
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
31-
import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite, DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering, RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo, SupportsDelta, Write, WriteBuilder, WriterCommitMessage}
29+
import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite, DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering, RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo, SupportsDelta, Write, WriteBuilder, WriterCommitMessage, WriteSummary}
3230
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
3331
import org.apache.spark.sql.types.StructType
3432
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -116,14 +114,9 @@ class InMemoryRowLevelOperationTable(
116114

117115
abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
118116

119-
override def commit(messages: Array[WriterCommitMessage],
120-
metrics: util.Map[String, lang.Long]): Unit = {
121-
metrics.asScala.map {
122-
case (key, value) => commitProperties += key -> String.valueOf(value)
123-
}
117+
override def commit(messages: Array[WriterCommitMessage], metrics: WriteSummary): Unit = {
124118
commit(messages)
125-
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
126-
commitProperties.clear()
119+
commits += Commit(Instant.now().toEpochMilli, Some(metrics))
127120
}
128121
}
129122

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import java.lang
21-
import java.util
22-
2320
import scala.jdk.CollectionConverters._
2421

2522
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
@@ -34,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER
3431
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege}
3532
import org.apache.spark.sql.connector.expressions.Transform
3633
import org.apache.spark.sql.connector.metric.CustomMetric
37-
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage}
34+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary}
3835
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3936
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
4037
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -455,9 +452,12 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
455452
}
456453
)
457454

458-
val operationMetrics = getOperationMetrics(query)
455+
val writeSummary = getWriteSummary(query)
459456
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.")
460-
batchWrite.commit(messages, operationMetrics)
457+
writeSummary match {
458+
case Some(summary) => batchWrite.commit(messages, summary)
459+
case None => batchWrite.commit(messages)
460+
}
461461
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.")
462462
commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
463463
} catch {
@@ -480,10 +480,20 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
480480
Nil
481481
}
482482

483-
private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = {
484-
collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
485-
n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) }
486-
}.getOrElse(Map.empty[String, lang.Long]).asJava
483+
private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
484+
collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
485+
val metrics = n.metrics
486+
MergeSummaryImpl(
487+
metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
488+
metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L),
489+
metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L),
490+
metrics.get("numTargetRowsInserted").map(_.value).getOrElse(-1L),
491+
metrics.get("numTargetRowsMatchedUpdated").map(_.value).getOrElse(-1L),
492+
metrics.get("numTargetRowsMatchedDeleted").map(_.value).getOrElse(-1L),
493+
metrics.get("numTargetRowsNotMatchedBySourceUpdated").map(_.value).getOrElse(-1L),
494+
metrics.get("numTargetRowsNotMatchedBySourceDeleted").map(_.value).getOrElse(-1L)
495+
)
496+
}
487497
}
488498
}
489499

0 commit comments

Comments
 (0)