-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SDP] [SPARK-52576] Drop/recreate on full refresh and MV update #51280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, can change if we add support for drop Column for HMS in the V2SessionCatalog
val dropTable = (isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined | ||
if (dropTable) { | ||
catalog.dropTable(identifier) | ||
// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove? Optionally add comment about why not truncate/alter?
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
Outdated
Show resolved
Hide resolved
@@ -446,8 +446,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { | |||
|
|||
val table2 = catalog.loadTable(identifier) | |||
assert( | |||
table2.columns() sameElements CatalogV2Util | |||
.structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) | |||
table2.columns().toSet == CatalogV2Util |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering of columns does not appear to be deterministic (at least across different catalog implementations). Is that unexpected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for a table, the column order matters. I think we should keep the test as it is and fix the issues we found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we able to get some help with fixing this? What I'm observing is that, with Hive, when I create a table using the following:
catalog.createTable(
identifier,
new TableInfo.Builder()
.withProperties(mergedProperties.asJava)
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
.withPartitions(partitioning.toArray)
.build()
)
and then later fetch the columns using
catalog.loadTable(identifier).columns()
The columns are returned in a different order than they appear in outputSchema
.
This happens only with Hive, not the default catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strange, i can take a look
I ran the test case in HiveDDLSuite a few times and cant reproduce it.
val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
withTable("t1") {
val identifier = Identifier.of(Array("default"), "t1")
val outputSchema = new StructType()
.add("a", IntegerType, true, "comment1")
.add("b", IntegerType, true, "comment2")
.add("c", IntegerType, true, "comment3")
.add("d", IntegerType, true, "comment4")
catalog.createTable(
identifier,
new TableInfo.Builder()
.withProperties(Map.empty.asJava)
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
.withPartitions(Array.empty)
.build()
)
val cols = catalog.loadTable(identifier).columns()
assert(cols.length == 4)
assert(cols(0).name() == "a")
assert(cols(1).name() == "b")
assert(cols(2).name() == "c")
assert(cols(3).name() == "d")
}
Is it reproducible with this pr itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should fix the Hive catalog to respect the user-specified column order. For now, I'm fine with ignoring some test cases temporarily in the upcoming hive suite, by overriding def excluded
from SparkFunSuite
, or add column-order methods and override it in the hive suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be a breaking change for HiveCatalog, should we do it off by default and enable via flag ? Looks like some history: https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L820
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @sryza @gengliangwang i made a patch here, does it look like what we want? #51342
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I've updated this PR to bring back the asserts on the ordering, and we can deal with the Hive tests in the PR that introduces them. @cloud-fan mind taking another look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm for some reason the original order is working now. Switched it back and now the tests are passing.
@sryza so may not need #51342 after all? :) We can close it then if its not needed, let me know. At least , as part of it, we managed to make the underlying HiveExternalCatalog API better in #51373, that fixes a small issue (SPARK-52681). By the way, it also doesnt prevent you to drop the column and replace them now, can you give a try as well after this pr?
38c4d8d
to
c6280c4
Compare
there is a test failure in |
4fbcae0
to
6b5d7b4
Compare
Hmm for some reason the original order is working now. Switched it back and now the tests are passing. |
thanks, merging to master! |
### What changes were proposed in this pull request? Some pipeline runs result in wiping out and replacing all the data for a table: - Every run of a materialized view - Runs of streaming tables that have the "full refresh" flag In the current implementation, this "wipe out and replace" is implemented by: - Truncating the table - Altering the table to drop/update/add columns that don't match the columns in the DataFrame for the current run The reason that we want originally wanted to truncate + alter instead of drop / recreate is that dropping has some undesirable effects. E.g. it interrupts readers of the table and wipes away things like ACLs. However, we discovered that not all catalogs support dropping columns (e.g. Hive does not), and there’s no way to tell whether a catalog supports dropping columns or not. So this PR changes the implementation to drop/recreate the table instead of truncate/alter. ### Why are the changes needed? See section above. ### Does this PR introduce _any_ user-facing change? Yes, see section above. No releases contained the old behavior. ### How was this patch tested? - Tests in MaterializeTablesSuite - Ran the tests in MaterializeTablesSuite with Hive instead of the default catalog ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51280 from sryza/drop-on-full-refresh. Authored-by: Sandy Ryza <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Some pipeline runs result in wiping out and replacing all the data for a table: - Every run of a materialized view - Runs of streaming tables that have the "full refresh" flag In the current implementation, this "wipe out and replace" is implemented by: - Truncating the table - Altering the table to drop/update/add columns that don't match the columns in the DataFrame for the current run The reason that we want originally wanted to truncate + alter instead of drop / recreate is that dropping has some undesirable effects. E.g. it interrupts readers of the table and wipes away things like ACLs. However, we discovered that not all catalogs support dropping columns (e.g. Hive does not), and there’s no way to tell whether a catalog supports dropping columns or not. So this PR changes the implementation to drop/recreate the table instead of truncate/alter. ### Why are the changes needed? See section above. ### Does this PR introduce _any_ user-facing change? Yes, see section above. No releases contained the old behavior. ### How was this patch tested? - Tests in MaterializeTablesSuite - Ran the tests in MaterializeTablesSuite with Hive instead of the default catalog ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51280 from sryza/drop-on-full-refresh. Authored-by: Sandy Ryza <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Some pipeline runs result in wiping out and replacing all the data for a table: - Every run of a materialized view - Runs of streaming tables that have the "full refresh" flag In the current implementation, this "wipe out and replace" is implemented by: - Truncating the table - Altering the table to drop/update/add columns that don't match the columns in the DataFrame for the current run The reason that we want originally wanted to truncate + alter instead of drop / recreate is that dropping has some undesirable effects. E.g. it interrupts readers of the table and wipes away things like ACLs. However, we discovered that not all catalogs support dropping columns (e.g. Hive does not), and there’s no way to tell whether a catalog supports dropping columns or not. So this PR changes the implementation to drop/recreate the table instead of truncate/alter. ### Why are the changes needed? See section above. ### Does this PR introduce _any_ user-facing change? Yes, see section above. No releases contained the old behavior. ### How was this patch tested? - Tests in MaterializeTablesSuite - Ran the tests in MaterializeTablesSuite with Hive instead of the default catalog ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51280 from sryza/drop-on-full-refresh. Authored-by: Sandy Ryza <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Some pipeline runs result in wiping out and replacing all the data for a table:
In the current implementation, this "wipe out and replace" is implemented by:
The reason that we want originally wanted to truncate + alter instead of drop / recreate is that dropping has some undesirable effects. E.g. it interrupts readers of the table and wipes away things like ACLs.
However, we discovered that not all catalogs support dropping columns (e.g. Hive does not), and there’s no way to tell whether a catalog supports dropping columns or not. So this PR changes the implementation to drop/recreate the table instead of truncate/alter.
Why are the changes needed?
See section above.
Does this PR introduce any user-facing change?
Yes, see section above. No releases contained the old behavior.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No