Skip to content

Commit b5baf5c

Browse files
sryzahaoyangeng-db
authored andcommitted
Revert "[SPARK-52576][SDP] Drop/recreate on full refresh and MV update"
This reverts commit 8b43757. ### What changes were proposed in this pull request? Reverts SPARK-52576. I.e. truncates + alters instead of drop + recreate, for materialized views and full refreshes. ### Why are the changes needed? Some pipeline runs result in wiping out and replacing all the data for a table: - Every run of a materialized view - Runs of streaming tables that have the "full refresh" flag Prior to SPARK-52576, this "wipe out and replace" was implemented by: - Truncating the table - Altering the table to drop/update/add columns that don't match the columns in the DataFrame for the current run However, we discovered that this didn't work on Hive. So we moved to drop + recreate, which did work on Hive. However, compared to truncate + alter, drop + recreate has some undesirable effects. E.g. it interrupts readers of the table and wipes away things like ACLs. This Hive behavior was fixed here: apache#51007. So now we can switch back to truncate + alter. ### Does this PR introduce _any_ user-facing change? Yes, described above ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? Closes apache#51497 from sryza/revert-drop-recreate. Authored-by: Sandy Ryza <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 108b022 commit b5baf5c

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,12 @@ object DatasetManager extends Logging {
179179
}
180180

181181
// Wipe the data if we need to
182-
val dropTable = (isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined
183-
if (dropTable) {
184-
catalog.dropTable(identifier)
182+
if ((isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined) {
183+
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
185184
}
186185

187186
// Alter the table if we need to
188-
if (existingTableOpt.isDefined && !dropTable) {
187+
if (existingTableOpt.isDefined) {
189188
val existingSchema = v2ColumnsToStructType(existingTableOpt.get.columns())
190189

191190
val targetSchema = if (table.isStreamingTable && !isFullRefresh) {
@@ -200,7 +199,7 @@ object DatasetManager extends Logging {
200199
}
201200

202201
// Create the table if we need to
203-
if (dropTable || existingTableOpt.isEmpty) {
202+
if (existingTableOpt.isEmpty) {
204203
catalog.createTable(
205204
identifier,
206205
new TableInfo.Builder()

0 commit comments

Comments
 (0)