Skip to content

Commit 570b325

Browse files
szehon-hoSzehon Ho
authored andcommitted
[SPARK-52689][SQL] Send DML Metrics to V2Write
### What changes were proposed in this pull request? Send some DML execution metrics (ie, MergeRowsExec) to the write of these data source, so they can persist them for debugging purpose. ### Why are the changes needed? DML row-level-operations, ie MERGE, UPDATE, DELETE are a critical functionality of V2 data sources (like Iceberg). It will be nice, if we can send some DML metrics to the commit of these data source, so they can persist them for debugging purpose on commit metadata. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #51377 from szehon-ho/metric_to_write. Lead-authored-by: Szehon Ho <[email protected]> Co-authored-by: Szehon Ho <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 3ff28ae commit 570b325

File tree

5 files changed

+205
-8
lines changed

5 files changed

+205
-8
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.spark.annotation.Evolving;
2121

22+
import java.util.Map;
23+
2224
/**
2325
* An interface that defines how to write the data to data source for batch processing.
2426
* <p>
@@ -88,6 +90,49 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
8890
*/
8991
void commit(WriterCommitMessage[] messages);
9092

93+
/**
94+
* Commits this writing job with a list of commit messages and operation metrics.
95+
* <p>
96+
* If this method fails (by throwing an exception), this writing job is considered to to have been
97+
* failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
98+
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
99+
* <p>
100+
* Note that speculative execution may cause multiple tasks to run for a partition. By default,
101+
* Spark uses the commit coordinator to allow at most one task to commit. Implementations can
102+
* disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
103+
* tasks may have committed successfully and one successful commit message per task will be
104+
* passed to this commit method. The remaining commit messages are ignored by Spark.
105+
* <p>
106+
* @param messages a list of commit messages from successful data writers, produced by
107+
* {@link DataWriter#commit()}.
108+
* @param metrics a map of operation metrics collected from the query producing write.
109+
* The keys will be prefixed by operation type, eg `merge`.
110+
* <p>
111+
* Currently supported metrics are:
112+
* <ul>
113+
* <li>Operation Type = `merge`
114+
* <ul>
115+
* <li>`numTargetRowsCopied`: number of target rows copied unmodified because
116+
* they did not match any action</li>
117+
* <li>`numTargetRowsDeleted`: number of target rows deleted</li>
118+
* <li>`numTargetRowsUpdated`: number of target rows updated</li>
119+
* <li>`numTargetRowsInserted`: number of target rows inserted</li>
120+
* <li>`numTargetRowsMatchedUpdated`: number of target rows updated by a
121+
* matched clause</li>
122+
* <li>`numTargetRowsMatchedDeleted`: number of target rows deleted by a
123+
* matched clause</li>
124+
* <li>`numTargetRowsNotMatchedBySourceUpdated`: number of target rows
125+
* updated by a not matched by source clause</li>
126+
* <li>`numTargetRowsNotMatchedBySourceDeleted`: number of target rows
127+
* deleted by a not matched by source clause</li>
128+
* </ul>
129+
* </li>
130+
* </ul>
131+
*/
132+
default void commit(WriterCommitMessage[] messages, Map<String, Long> metrics) {
133+
commit(messages);
134+
}
135+
91136
/**
92137
* Aborts this writing job because some data writers are failed and keep failing when retry,
93138
* or the Spark job fails with some unknown reasons,

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util
2323
import java.util.OptionalLong
2424

2525
import scala.collection.mutable
26+
import scala.collection.mutable.ListBuffer
2627
import scala.jdk.CollectionConverters._
2728

2829
import com.google.common.base.Objects
@@ -152,6 +153,8 @@ abstract class InMemoryBaseTable(
152153
// The key `Seq[Any]` is the partition values, value is a set of splits, each with a set of rows.
153154
val dataMap: mutable.Map[Seq[Any], Seq[BufferedRows]] = mutable.Map.empty
154155

156+
val commits: ListBuffer[Commit] = ListBuffer[Commit]()
157+
155158
def data: Array[BufferedRows] = dataMap.values.flatten.toArray
156159

157160
def rows: Seq[InternalRow] = dataMap.values.flatten.flatMap(_.rows).toSeq
@@ -616,6 +619,9 @@ abstract class InMemoryBaseTable(
616619
}
617620

618621
protected abstract class TestBatchWrite extends BatchWrite {
622+
623+
var commitProperties: mutable.Map[String, String] = mutable.Map.empty[String, String]
624+
619625
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
620626
BufferedRowsWriterFactory
621627
}
@@ -624,8 +630,11 @@ abstract class InMemoryBaseTable(
624630
}
625631

626632
class Append(val info: LogicalWriteInfo) extends TestBatchWrite {
633+
627634
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
628635
withData(messages.map(_.asInstanceOf[BufferedRows]))
636+
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
637+
commitProperties.clear()
629638
}
630639
}
631640

@@ -634,13 +643,17 @@ abstract class InMemoryBaseTable(
634643
val newData = messages.map(_.asInstanceOf[BufferedRows])
635644
dataMap --= newData.flatMap(_.rows.map(getKey))
636645
withData(newData)
646+
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
647+
commitProperties.clear()
637648
}
638649
}
639650

640651
class TruncateAndAppend(val info: LogicalWriteInfo) extends TestBatchWrite {
641652
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
642653
dataMap.clear()
643654
withData(messages.map(_.asInstanceOf[BufferedRows]))
655+
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
656+
commitProperties.clear()
644657
}
645658
}
646659

@@ -882,6 +895,8 @@ class InMemoryCustomDriverTaskMetric(value: Long) extends CustomTaskMetric {
882895
override def value(): Long = value
883896
}
884897

898+
case class Commit(id: Long, properties: Map[String, String])
899+
885900
sealed trait Operation
886901
case object Write extends Operation
887902
case object Delete extends Operation

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.spark.sql.connector.catalog
1919

20-
import java.util
20+
import java.{lang, util}
21+
import java.time.Instant
22+
23+
import scala.jdk.CollectionConverters._
2124

2225
import org.apache.spark.sql.catalyst.InternalRow
2326
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
@@ -111,7 +114,21 @@ class InMemoryRowLevelOperationTable(
111114
override def description(): String = "InMemoryPartitionReplaceOperation"
112115
}
113116

114-
private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) extends TestBatchWrite {
117+
abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
118+
119+
override def commit(messages: Array[WriterCommitMessage],
120+
metrics: util.Map[String, lang.Long]): Unit = {
121+
metrics.asScala.map {
122+
case (key, value) => commitProperties += key -> String.valueOf(value)
123+
}
124+
commit(messages)
125+
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
126+
commitProperties.clear()
127+
}
128+
}
129+
130+
private case class PartitionBasedReplaceData(scan: InMemoryBatchScan)
131+
extends RowLevelOperationBatchWrite {
115132

116133
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
117134
val newData = messages.map(_.asInstanceOf[BufferedRows])
@@ -165,7 +182,7 @@ class InMemoryRowLevelOperationTable(
165182
}
166183
}
167184

168-
private object TestDeltaBatchWrite extends DeltaBatchWrite {
185+
private object TestDeltaBatchWrite extends RowLevelOperationBatchWrite with DeltaBatchWrite{
169186
override def createBatchWriterFactory(info: PhysicalWriteInfo): DeltaWriterFactory = {
170187
DeltaBufferedRowsWriterFactory
171188
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20+
import java.lang
21+
import java.util
22+
2023
import scala.jdk.CollectionConverters._
2124

2225
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
@@ -34,6 +37,7 @@ import org.apache.spark.sql.connector.metric.CustomMetric
3437
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage}
3538
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3639
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
40+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3741
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics}
3842
import org.apache.spark.sql.types.StructType
3943
import org.apache.spark.util.{LongAccumulator, Utils}
@@ -398,7 +402,7 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec {
398402
/**
399403
* The base physical plan for writing data into data source v2.
400404
*/
401-
trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
405+
trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSparkPlanHelper {
402406
def query: SparkPlan
403407
def writingTask: WritingSparkTask[_] = DataWritingSparkTask
404408

@@ -451,8 +455,9 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
451455
}
452456
)
453457

458+
val operationMetrics = getOperationMetrics(query)
454459
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.")
455-
batchWrite.commit(messages)
460+
batchWrite.commit(messages, operationMetrics)
456461
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.")
457462
commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
458463
} catch {
@@ -474,6 +479,12 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
474479

475480
Nil
476481
}
482+
483+
private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = {
484+
collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
485+
n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) }
486+
}.getOrElse(Map.empty[String, lang.Long]).asJava
487+
}
477488
}
478489

479490
trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable {

sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.SparkRuntimeException
2121
import org.apache.spark.sql.{AnalysisException, Row}
2222
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, In, Not}
2323
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
24-
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, TableInfo}
24+
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, InMemoryTable, TableInfo}
2525
import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression, LiteralValue}
2626
import org.apache.spark.sql.execution.SparkPlan
2727
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -1811,6 +1811,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
18111811
Row(1, 1000, "hr"), // updated
18121812
Row(2, 200, "software"),
18131813
Row(3, 300, "hr")))
1814+
1815+
val table = catalog.loadTable(ident)
1816+
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
1817+
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "2"))
1818+
assert(commitProps("merge.numTargetRowsInserted") === "0")
1819+
assert(commitProps("merge.numTargetRowsUpdated") === "1")
1820+
assert(commitProps("merge.numTargetRowsDeleted") === "0")
1821+
assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
1822+
assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
1823+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0")
1824+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0")
18141825
}
18151826
}
18161827

@@ -1856,6 +1867,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
18561867
Row(2, 200, "software"),
18571868
Row(3, 300, "hr"),
18581869
Row(5, 400, "executive"))) // inserted
1870+
1871+
val table = catalog.loadTable(ident)
1872+
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
1873+
assert(commitProps("merge.numTargetRowsCopied") === "0")
1874+
assert(commitProps("merge.numTargetRowsInserted") === "1")
1875+
assert(commitProps("merge.numTargetRowsUpdated") === "0")
1876+
assert(commitProps("merge.numTargetRowsDeleted") === "0")
1877+
assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
1878+
assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
1879+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0")
1880+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0")
18591881
}
18601882
}
18611883

@@ -1883,7 +1905,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
18831905
|""".stripMargin
18841906
}
18851907

1886-
18871908
assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3)
18881909
assertMetric(mergeExec, "numTargetRowsInserted", 0)
18891910
assertMetric(mergeExec, "numTargetRowsUpdated", 2)
@@ -1901,6 +1922,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
19011922
Row(3, 300, "hr"),
19021923
Row(4, 400, "marketing"),
19031924
Row(5, -1, "executive"))) // updated
1925+
1926+
val table = catalog.loadTable(ident)
1927+
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
1928+
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
1929+
assert(commitProps("merge.numTargetRowsInserted") === "0")
1930+
assert(commitProps("merge.numTargetRowsUpdated") === "2")
1931+
assert(commitProps("merge.numTargetRowsDeleted") === "0")
1932+
assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
1933+
assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
1934+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "1")
1935+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0")
19041936
}
19051937
}
19061938

@@ -1947,6 +1979,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
19471979
Row(4, 400, "marketing"))
19481980
// Row(5, 500, "executive") deleted
19491981
)
1982+
1983+
val table = catalog.loadTable(ident)
1984+
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
1985+
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
1986+
assert(commitProps("merge.numTargetRowsInserted") === "0")
1987+
assert(commitProps("merge.numTargetRowsUpdated") === "0")
1988+
assert(commitProps("merge.numTargetRowsDeleted") === "2")
1989+
assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
1990+
assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
1991+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0")
1992+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "1")
19501993
}
19511994
}
19521995

@@ -1994,6 +2037,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
19942037
Row(4, 400, "marketing"),
19952038
Row(5, -1, "executive"), // updated
19962039
Row(6, -1, "dummy"))) // inserted
2040+
2041+
val table = catalog.loadTable(ident)
2042+
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
2043+
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
2044+
assert(commitProps("merge.numTargetRowsInserted") === "1")
2045+
assert(commitProps("merge.numTargetRowsUpdated") === "2")
2046+
assert(commitProps("merge.numTargetRowsDeleted") === "0")
2047+
assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
2048+
assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
2049+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "1")
2050+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0")
19972051
}
19982052
}
19992053

@@ -2032,7 +2086,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
20322086
assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
20332087
assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1)
20342088

2035-
20362089
checkAnswer(
20372090
sql(s"SELECT * FROM $tableNameAsString"),
20382091
Seq(
@@ -2042,6 +2095,62 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
20422095
Row(4, 400, "marketing"),
20432096
// Row(5, 500, "executive") deleted
20442097
Row(6, -1, "dummy"))) // inserted
2098+
2099+
val table = catalog.loadTable(ident)
2100+
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
2101+
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
2102+
assert(commitProps("merge.numTargetRowsInserted") === "1")
2103+
assert(commitProps("merge.numTargetRowsUpdated") === "0")
2104+
assert(commitProps("merge.numTargetRowsDeleted") === "2")
2105+
assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
2106+
assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
2107+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0")
2108+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "1")
2109+
}
2110+
}
2111+
2112+
test("SPARK-52689: V2 write metrics for merge") {
2113+
Seq("true", "false").foreach { aqeEnabled: String =>
2114+
withTempView("source") {
2115+
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
2116+
createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
2117+
"""{ "pk": 1, "salary": 100, "dep": "hr" }
2118+
|{ "pk": 2, "salary": 200, "dep": "software" }
2119+
|{ "pk": 3, "salary": 300, "dep": "hr" }
2120+
|{ "pk": 4, "salary": 400, "dep": "marketing" }
2121+
|{ "pk": 5, "salary": 500, "dep": "executive" }
2122+
|""".stripMargin)
2123+
2124+
val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
2125+
sourceDF.createOrReplaceTempView("source")
2126+
2127+
sql(
2128+
s"""MERGE INTO $tableNameAsString t
2129+
|USING source s
2130+
|ON t.pk = s.pk
2131+
|WHEN MATCHED AND salary < 200 THEN
2132+
| DELETE
2133+
|WHEN NOT MATCHED AND s.pk < 10 THEN
2134+
| INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
2135+
|WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
2136+
| DELETE
2137+
|""".stripMargin
2138+
)
2139+
2140+
val table = catalog.loadTable(ident)
2141+
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
2142+
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
2143+
assert(commitProps("merge.numTargetRowsInserted") === "1")
2144+
assert(commitProps("merge.numTargetRowsUpdated") === "0")
2145+
assert(commitProps("merge.numTargetRowsDeleted") === "2")
2146+
assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
2147+
assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
2148+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0")
2149+
assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "1")
2150+
2151+
sql(s"DROP TABLE $tableNameAsString")
2152+
}
2153+
}
20452154
}
20462155
}
20472156

0 commit comments

Comments
 (0)