Skip to content

Commit de9d47d

Browse files
committed
Fixes
1 parent c17c6b9 commit de9d47d

File tree

10 files changed

+360
-134
lines changed

10 files changed

+360
-134
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.connector.metric;
18+
19+
import org.apache.spark.annotation.Evolving;
20+
21+
/**
22+
* Data structure encapsulating the execution metrics of a merge operation to write connectors
23+
* that request it.
24+
*
25+
* @since 4.1.0
26+
*/
27+
@Evolving
28+
public interface MergeMetrics {
29+
30+
class Builder {
31+
private long numTargetRowsCopied = -1;
32+
private long numTargetRowsInserted = -1;
33+
private long numTargetRowsDeleted = -1;
34+
private long numTargetRowsUpdated = -1;
35+
private long numTargetRowsMatchedUpdated = -1;
36+
private long numTargetRowsMatchedDeleted = -1;
37+
private long numTargetRowsNotMatchedBySourceUpdated = -1;
38+
private long numTargetRowsNotMatchedBySourceDeleted = -1;
39+
40+
public Builder numTargetRowsCopied(long numTargetRowsCopied) {
41+
this.numTargetRowsCopied = numTargetRowsCopied;
42+
return this;
43+
}
44+
45+
public Builder numTargetRowsInserted(long numTargetRowsInserted) {
46+
this.numTargetRowsInserted = numTargetRowsInserted;
47+
return this;
48+
}
49+
50+
public Builder numTargetRowsDeleted(long numTargetRowsDeleted) {
51+
this.numTargetRowsDeleted = numTargetRowsDeleted;
52+
return this;
53+
}
54+
55+
public Builder numTargetRowsUpdated(long numTargetRowsUpdated) {
56+
this.numTargetRowsUpdated = numTargetRowsUpdated;
57+
return this;
58+
}
59+
60+
public Builder numTargetRowsMatchedUpdated(long numTargetRowsMatchedUpdated) {
61+
this.numTargetRowsMatchedUpdated = numTargetRowsMatchedUpdated;
62+
return this;
63+
}
64+
65+
public Builder numTargetRowsMatchedDeleted(long numTargetRowsMatchedDeleted) {
66+
this.numTargetRowsMatchedDeleted = numTargetRowsMatchedDeleted;
67+
return this;
68+
}
69+
70+
public Builder numTargetRowsNotMatchedBySourceUpdated(
71+
long numTargetRowsNotMatchedBySourceUpdated) {
72+
this.numTargetRowsNotMatchedBySourceUpdated = numTargetRowsNotMatchedBySourceUpdated;
73+
return this;
74+
}
75+
76+
public Builder numTargetRowsNotMatchedBySourceDeleted(
77+
long numTargetRowsNotMatchedBySourceDeleted) {
78+
this.numTargetRowsNotMatchedBySourceDeleted = numTargetRowsNotMatchedBySourceDeleted;
79+
return this;
80+
}
81+
82+
public MergeMetrics build() {
83+
return new MergeMetrics() {
84+
@Override
85+
public long numTargetRowsCopied() {
86+
return numTargetRowsCopied;
87+
}
88+
89+
@Override
90+
public long numTargetRowsInserted() {
91+
return numTargetRowsInserted;
92+
}
93+
94+
@Override
95+
public long numTargetRowsDeleted() {
96+
return numTargetRowsDeleted;
97+
}
98+
99+
@Override
100+
public long numTargetRowsUpdated() {
101+
return numTargetRowsUpdated;
102+
}
103+
104+
@Override
105+
public long numTargetRowsMatchedUpdated() {
106+
return numTargetRowsMatchedUpdated;
107+
}
108+
109+
@Override
110+
public long numTargetRowsMatchedDeleted() {
111+
return numTargetRowsMatchedDeleted;
112+
}
113+
114+
@Override
115+
public long numTargetRowsNotMatchedBySourceUpdated() {
116+
return numTargetRowsNotMatchedBySourceUpdated;
117+
}
118+
119+
@Override
120+
public long numTargetRowsNotMatchedBySourceDeleted() {
121+
return numTargetRowsNotMatchedBySourceDeleted;
122+
}
123+
};
124+
}
125+
}
126+
127+
/**
128+
* Returns a new builder for MergeMetrics.
129+
*/
130+
static Builder builder() {
131+
return new MergeMetrics.Builder();
132+
}
133+
134+
/**
135+
* Returns the number of target rows copied unmodified because they did not match any action.
136+
*/
137+
long numTargetRowsCopied();
138+
139+
/**
140+
* Returns the number of target rows inserted.
141+
*/
142+
long numTargetRowsInserted();
143+
144+
/**
145+
* Returns the number of target rows deleted.
146+
*/
147+
long numTargetRowsDeleted();
148+
149+
/**
150+
* Returns the number of target rows updated.
151+
*/
152+
long numTargetRowsUpdated();
153+
154+
/**
155+
* Returns the number of target rows matched and updated by a matched clause.
156+
*/
157+
long numTargetRowsMatchedUpdated();
158+
159+
/**
160+
* Returns the number of target rows matched and deleted by a matched clause.
161+
*/
162+
long numTargetRowsMatchedDeleted();
163+
164+
/**
165+
* Returns the number of target rows updated by a not matched by source clause.
166+
*/
167+
long numTargetRowsNotMatchedBySourceUpdated();
168+
169+
/**
170+
* Returns the number of target rows deleted by a not matched by source clause.
171+
*/
172+
long numTargetRowsNotMatchedBySourceDeleted();
173+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.connector.write;
1919

2020
import org.apache.spark.annotation.Evolving;
21-
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
21+
import org.apache.spark.sql.connector.metric.MergeMetrics;
2222

2323
/**
2424
* An interface that defines how to write the data to data source for batch processing.
@@ -106,18 +106,18 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
106106
*/
107107
void abort(WriterCommitMessage[] messages);
108108

109-
110109
/**
111-
* Whether this batch write requests execution metrics. Returns a row level operation command this batch write
112-
* is part of, if requested. Return null if not requested.
110+
* Whether this batch write requests merge execution metrics.
113111
*/
114-
default RowLevelOperation.Command requestExecMetrics() {
115-
return null;
112+
default boolean requestMergeMetrics() {
113+
return false;
116114
}
117115

118116
/**
119-
* Provides an array of query execution metrics to the batch write prior to commit.
120-
* @param metrics an array of execution metrics
117+
* Similar to {@link #commit(WriterCommitMessage[])}, but providing {@link MergeMetrics} to
118+
* this batch write in the aftermath of a merge operation.
121119
*/
122-
default void execMetrics(CustomTaskMetric[] metrics) {}
120+
default void commitWithMerge(WriterCommitMessage[] messages, MergeMetrics metrics) {
121+
commit(messages);
122+
}
123123
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,5 @@ default CustomMetric[] supportedCustomMetrics() {
8686
default CustomTaskMetric[] reportDriverMetrics() {
8787
return new CustomTaskMetric[]{};
8888
}
89+
8990
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -501,32 +501,13 @@ abstract class InMemoryBaseTable(
501501
options: CaseInsensitiveStringMap)
502502
extends BatchScanBaseClass(_data, readSchema, tableSchema) with SupportsRuntimeFiltering {
503503

504-
var setFilters = Array.empty[Filter]
505-
506-
override def reportDriverMetrics(): Array[CustomTaskMetric] =
507-
Array(new CustomTaskMetric{
508-
override def name(): String = "numSplits"
509-
override def value(): Long = 1L
510-
})
511-
512-
override def supportedCustomMetrics(): Array[CustomMetric] = {
513-
Array(new CustomMetric {
514-
override def name(): String = "numSplits"
515-
override def description(): String = "number of splits in the scan"
516-
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
517-
taskMetrics.sum.toString
518-
}
519-
})
520-
}
521-
522504
override def filterAttributes(): Array[NamedReference] = {
523505
val scanFields = readSchema.fields.map(_.name).toSet
524506
partitioning.flatMap(_.references)
525507
.filter(ref => scanFields.contains(ref.fieldNames.mkString(".")))
526508
}
527509

528510
override def filter(filters: Array[Filter]): Unit = {
529-
this.setFilters = filters
530511
if (partitioning.length == 1 && partitioning.head.references().length == 1) {
531512
val ref = partitioning.head.references().head
532513
filters.foreach {
@@ -779,14 +760,6 @@ private class BufferedRowsReader(
779760

780761
override def close(): Unit = {}
781762

782-
override def currentMetricsValues(): Array[CustomTaskMetric] =
783-
Array[CustomTaskMetric](
784-
new CustomTaskMetric {
785-
override def name(): String = "numSplits"
786-
override def value(): Long = 1
787-
}
788-
)
789-
790763
private def extractFieldValue(
791764
field: StructField,
792765
schema: StructType,

0 commit comments

Comments
 (0)