Skip to content

Commit d527ee7

Browse files
committed
drop on full refresh
1 parent a1e6285 commit d527ee7

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,14 @@ object DatasetManager extends Logging {
178178
}
179179

180180
// Wipe the data if we need to
181-
if ((isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined) {
182-
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
181+
val dropTable = (isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined
182+
if (dropTable) {
183+
catalog.dropTable(identifier)
184+
// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}")
183185
}
184186

185187
// Alter the table if we need to
186-
if (existingTableOpt.isDefined) {
188+
if (existingTableOpt.isDefined && !dropTable) {
187189
val existingSchema = existingTableOpt.get.schema()
188190

189191
val targetSchema = if (table.isStreamingTable && !isFullRefresh) {
@@ -198,7 +200,7 @@ object DatasetManager extends Logging {
198200
}
199201

200202
// Create the table if we need to
201-
if (existingTableOpt.isEmpty) {
203+
if (dropTable || existingTableOpt.isEmpty) {
202204
catalog.createTable(
203205
identifier,
204206
new TableInfo.Builder()

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest {
434434

435435
val table2 = catalog.loadTable(identifier)
436436
assert(
437-
table2.columns() sameElements CatalogV2Util
438-
.structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType))
437+
table2.columns().toSet == CatalogV2Util
438+
.structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType))
439+
.toSet
439440
)
440441
assert(table2.partitioning().toSeq == Seq(Expressions.identity("x")))
441442

@@ -456,8 +457,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest {
456457

457458
val table3 = catalog.loadTable(identifier)
458459
assert(
459-
table3.columns() sameElements CatalogV2Util
460-
.structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType))
460+
table3.columns().toSet == CatalogV2Util
461+
.structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType))
462+
.toSet
461463
)
462464
assert(table3.partitioning().toSeq == Seq(Expressions.identity("x")))
463465
}

0 commit comments

Comments
 (0)