Skip to content

Commit 667829e

Browse files
committed
review comments
1 parent 7b09f6d commit 667829e

File tree

3 files changed

+52
-21
lines changed

3 files changed

+52
-21
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,53 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
8787
* disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
8888
* tasks may have committed successfully and one successful commit message per task will be
8989
* passed to this commit method. The remaining commit messages are ignored by Spark.
90+
*
9091
*/
9192
void commit(WriterCommitMessage[] messages);
9293

94+
/**
95+
* Commits this writing job with a list of commit messages and operation metrics.
96+
* <p>
97+
* If this method fails (by throwing an exception), this writing job is considered to to have been
98+
* failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
99+
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
100+
* <p>
101+
* Note that speculative execution may cause multiple tasks to run for a partition. By default,
102+
* Spark uses the commit coordinator to allow at most one task to commit. Implementations can
103+
* disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
104+
* tasks may have committed successfully and one successful commit message per task will be
105+
* passed to this commit method. The remaining commit messages are ignored by Spark.
106+
* <p>
107+
* @param messages a list of commit messages from successful data writers, produced by
108+
* {@link DataWriter#commit()}.
109+
* @param metrics a map of operation metrics collected from the query producing write.
110+
* The keys will be prefixed by operation type, eg `merge`.
111+
* <p>
112+
* Currently supported metrics are:
113+
* <ul>
114+
* <li>Operation Type = `merge`
115+
* <ul>
116+
* <li>`numTargetRowsCopied`: number of target rows copied unmodified because
117+
* they did not match any action</li>
118+
* <li>`numTargetRowsDeleted`: number of target rows deleted</li>
119+
* <li>`numTargetRowsUpdated`: number of target rows updated</li>
120+
* <li>`numTargetRowsInserted`: number of target rows inserted</li>
121+
* <li>`numTargetRowsMatchedUpdated`: number of target rows updated by a
122+
* matched clause</li>
123+
* <li>`numTargetRowsMatchedDeleted`: number of target rows deleted by a
124+
* matched clause</li>
125+
* <li>`numTargetRowsNotMatchedBySourceUpdated`: number of target rows
126+
* updated by a not matched by source clause</li>
127+
* <li>`numTargetRowsNotMatchedBySourceDeleted`: number of target rows
128+
* deleted by a not matched by source clause</li>
129+
* </ul>
130+
* </li>
131+
* </ul>
132+
*/
133+
default void commit(WriterCommitMessage[] messages, Map<String, Long> metrics) {
134+
commit(messages);
135+
}
136+
93137
/**
94138
* Aborts this writing job because some data writers are failed and keep failing when retry,
95139
* or the Spark job fails with some unknown reasons,
@@ -106,14 +150,4 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
106150
* clean up the data left by data writers.
107151
*/
108152
void abort(WriterCommitMessage[] messages);
109-
110-
/**
111-
* Similar to {@link #commit(WriterCommitMessage[])}, but providing operation metrics to
112-
* this batch write.
113-
* @param metrics operation metrics. The keys will be prefixed by operation type, eg `merge`
114-
*/
115-
default void commitWithOperationMetrics(
116-
WriterCommitMessage[] messages, Map<String, Long> metrics) {
117-
commit(messages);
118-
}
119153
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class InMemoryRowLevelOperationTable(
116116

117117
abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
118118

119-
override def commitWithOperationMetrics(messages: Array[WriterCommitMessage],
119+
override def commit(messages: Array[WriterCommitMessage],
120120
metrics: util.Map[String, lang.Long]): Unit = {
121121
metrics.asScala.map {
122122
case (key, value) => commitProperties += key -> String.valueOf(value)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import java.lang.{Long => JLong}
20+
import java.lang
21+
import java.util
2122

2223
import scala.jdk.CollectionConverters._
2324

@@ -454,13 +455,9 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
454455
}
455456
)
456457

457-
val operationMetricOpt = getOperationMetrics(query)
458+
val operationMetrics = getOperationMetrics(query)
458459
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.")
459-
operationMetricOpt match {
460-
case Some(metrics) => batchWrite.commitWithOperationMetrics(messages,
461-
metrics.map{ case (name, value) => name -> JLong.valueOf(value) }.asJava)
462-
case None => batchWrite.commit(messages)
463-
}
460+
batchWrite.commit(messages, operationMetrics)
464461
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.")
465462
commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
466463
} catch {
@@ -483,10 +480,10 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
483480
Nil
484481
}
485482

486-
private def getOperationMetrics(query: SparkPlan): Option[Map[String, Long]] = {
483+
private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = {
487484
collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
488-
n.metrics.map { case (name, metric) => s"merge.$name" -> metric.value }
489-
}
485+
n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) }
486+
}.getOrElse(Map.empty[String, lang.Long]).asJava
490487
}
491488
}
492489

0 commit comments

Comments
 (0)