Skip to content

[SPARK-52683][SQL] Support ExternalCatalog alterTableSchema #51373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,21 @@ trait ExternalCatalog {
* @param db Database that table to alter schema for exists in
* @param table Name of table to alter schema for
* @param newDataSchema Updated data schema to be used for the table.
* @deprecated since 4.1.0 use `alterTableSchema` instead.
*/
def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit

/**
* Alter the schema of a table identified by the provided database and table name.
*
* All partition columns must be preserved.
*
* @param db Database that table to alter schema for exists in
* @param table Name of table to alter schema for
* @param newSchema Updated data schema to be used for the table.
*/
def alterTableSchema(db: String, table: String, newSchema: StructType): Unit

/** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
postToAll(AlterTableEvent(db, table, AlterTableKind.DATASCHEMA))
}

override def alterTableSchema(db: String, table: String, newSchema: StructType): Unit = {
postToAll(AlterTablePreEvent(db, table, AlterTableKind.SCHEMA))
delegate.alterTableSchema(db, table, newSchema)
postToAll(AlterTableEvent(db, table, AlterTableKind.SCHEMA))
}

override def alterTableStats(
db: String,
table: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,21 @@ class InMemoryCatalog(
catalog(db).tables(table).table = origTable.copy(schema = newSchema)
}

override def alterTableSchema(
db: String,
table: String,
newSchema: StructType): Unit = synchronized {
requireTableExists(db, table)
val origTable = catalog(db).tables(table).table

val partCols = origTable.partitionColumnNames
assert(newSchema.map(_.name).takeRight(partCols.length) == partCols,
s"Partition columns ${partCols.mkString("[", ", ", "]")} are only supported at the end of " +
s"the new schema ${newSchema.catalogString} for now.")

catalog(db).tables(table).table = origTable.copy(schema = newSchema)
}

override def alterTableStats(
db: String,
table: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ class SessionCatalog(
*
* @param identifier TableIdentifier
* @param newDataSchema Updated data schema to be used for the table
* @deprecated since 4.1.0 use `alterTableSchema` instead.
*/
def alterTableDataSchema(
identifier: TableIdentifier,
Expand All @@ -507,6 +508,25 @@ class SessionCatalog(
externalCatalog.alterTableDataSchema(db, table, newDataSchema)
}

/**
* Alter the schema of a table identified by the provided table identifier. All partition columns
* must be preserved.
*
* @param identifier TableIdentifier
* @param newSchema Updated schema to be used for the table
*/
def alterTableSchema(
identifier: TableIdentifier,
newSchema: StructType): Unit = {
val qualifiedIdent = qualifyIdentifier(identifier)
val db = qualifiedIdent.database.get
val table = qualifiedIdent.table
requireDbExists(db)
requireTableExists(qualifiedIdent)

externalCatalog.alterTableSchema(db, table, newSchema)
}

private def columnNameResolved(
resolver: Resolver,
schema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ case class RenameTableEvent(
object AlterTableKind extends Enumeration {
val TABLE = "table"
val DATASCHEMA = "dataSchema"
val SCHEMA = "schema"
val STATS = "stats"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ class ExternalCatalogEventSuite extends SparkFunSuite {

// ALTER schema
val newSchema = new StructType().add("id", "long", nullable = false)
catalog.alterTableDataSchema("db5", "tbl1", newSchema)
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) ::
AlterTableEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: Nil)
catalog.alterTableSchema("db5", "tbl1", newSchema)
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.SCHEMA) ::
AlterTableEvent("db5", "tbl1", AlterTableKind.SCHEMA) :: Nil)

// ALTER stats
catalog.alterTableStats("db5", "tbl1", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {

test("alter table schema") {
val catalog = newBasicCatalog()
val newDataSchema = StructType(Seq(
val newSchema = StructType(Seq(
StructField("col1", IntegerType),
StructField("new_field_2", StringType)))
catalog.alterTableDataSchema("db2", "tbl1", newDataSchema)
StructField("new_field_2", StringType),
StructField("a", IntegerType),
StructField("b", StringType)))
catalog.alterTableSchema("db2", "tbl1", newSchema)
val newTbl1 = catalog.getTable("db2", "tbl1")
assert(newTbl1.dataSchema == newDataSchema)
assert(newTbl1.dataSchema == StructType(newSchema.take(2)))
assert(newTbl1.schema == newSchema)
}

test("alter table stats") {
Expand Down Expand Up @@ -983,6 +986,32 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
"db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
assert(fs.exists(partPath))
}

test("SPARK-52683: support alterTableSchema partitioned columns") {
val catalog = newBasicCatalog()

val schema = new StructType()
.add("a", IntegerType)
.add("b", IntegerType)
.add("c", StringType)
val table = CatalogTable(
identifier = TableIdentifier("t", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = storageFormat,
schema = schema,
partitionColumnNames = Seq("c"),
provider = Some("hive"))
catalog.createTable(table, ignoreIfExists = false)

val newSchema = new StructType()
.add("b", LongType)
.add("a", IntegerType)
.add("c", StringType)
catalog.alterTableSchema("db1", "t", newSchema)

assert(catalog.getTable("db1", "t").schema == newSchema)
assert(catalog.getTable("db1", "t").partitionColumnNames == Seq("c"))
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}
}

test("alter table add columns") {
test("alter data schema add columns") {
withBasicCatalog { sessionCatalog =>
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
Expand All @@ -580,6 +580,22 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}
}

test("alter schema add columns") {
withBasicCatalog { sessionCatalog =>
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
val newSchema = StructType(oldTab.dataSchema.fields ++
Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)

sessionCatalog.alterTableSchema(
TableIdentifier("t1", Some("default")),
newSchema)

val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
assert(newTab.schema == newSchema)
}
}

test("alter table drop columns") {
withBasicCatalog { sessionCatalog =>
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ case class AlterTableAddColumnsCommand(
}
DDLUtils.checkTableColumns(catalogTable, StructType(colsWithProcessedDefaults))

val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
catalog.alterTableDataSchema(table, StructType(existingSchema ++ colsWithProcessedDefaults))
val existingDataSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
catalog.alterTableSchema(table,
StructType(existingDataSchema ++ colsWithProcessedDefaults ++ catalogTable.partitionSchema))
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
collation = collation, storage = storage))
}
if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) {
catalog.alterTableDataSchema(ident.asTableIdentifier, schema)
catalog.alterTableSchema(ident.asTableIdentifier, schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so the schema here is already the full schema

Copy link
Member Author

@szehon-ho szehon-ho Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i noticed and had a pr to fix that #51372 but we can fix both if we do this one.

}
} catch {
case _: NoSuchTableException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,49 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

/**
* Alter the schema of a table identified by the provided database and table name.
*/
override def alterTableSchema(
db: String,
table: String,
newSchema: StructType): Unit = withClient {
requireTableExists(db, table)
val oldTable = getTable(db, table)
val schemaProps = {
tableMetaToTableProps(oldTable, StructType(newSchema)).toMap
}

val partCols = oldTable.partitionColumnNames
assert(newSchema.map(_.name).takeRight(partCols.length) == partCols,
s"Partition columns ${partCols.mkString("[", ", ", "]")} are only supported at the end of " +
s"the new schema ${newSchema.catalogString} for now.")

val newDataSchema = StructType(newSchema.filter(
f => !oldTable.partitionColumnNames.contains(f.name)))
val hiveSchema = removeCollation(newDataSchema)

if (isDatasourceTable(oldTable)) {
// For data source tables, first try to write it with the schema set; if that does not work,
// try again with updated properties and the partition schema. This is a simplified version of
// what createDataSourceTable() does, and may leave the table in a state unreadable by Hive
// (for example, the schema does not match the data source schema, or does not match the
// storage descriptor).
try {
client.alterTableDataSchema(db, table, hiveSchema, schemaProps)
} catch {
case NonFatal(e) =>
val warningMessage = log"Could not alter schema of table " +
log"${MDC(TABLE_NAME, oldTable.identifier.quotedString)} in a Hive compatible way. " +
log"Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps)
}
} else {
client.alterTableDataSchema(db, table, hiveSchema, schemaProps)
}
}

private def removeCollation(schema: StructType): StructType = {
// Since collated strings do not exist in Hive as a type we need to replace them with
// the the regular string type. However, as we save the original schema in the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
""".stripMargin)

val newSchema = new StructType().add("a", "string").add("b", "string").add("c", "string")
catalog.alterTableDataSchema("db1", "t", newSchema)
catalog.alterTableSchema("db1", "t", newSchema)

assert(catalog.getTable("db1", "t").schema == newSchema)
val bucketString = externalCatalog.client.runSqlHive("DESC FORMATTED db1.t")
Expand Down Expand Up @@ -234,7 +234,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
val newSchema = StructType(Seq(
StructField("col1", StringType("UTF8_LCASE"))
))
catalog.alterTableDataSchema("db1", tableName, newSchema)
catalog.alterTableSchema("db1", tableName, newSchema)

val alteredRawTable = externalCatalog.getRawTable("db1", tableName)
assert(DataTypeUtils.sameType(alteredRawTable.schema, noCollationsSchema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3418,7 +3418,7 @@ class HiveDDLSuite
assert(loaded.properties().get("foo") == "bar")

verify(spyCatalog, times(1)).alterTable(any[CatalogTable])
verify(spyCatalog, times(0)).alterTableDataSchema(
verify(spyCatalog, times(0)).alterTableSchema(
any[String], any[String], any[StructType])

v2SessionCatalog.alterTable(identifier,
Expand All @@ -3428,7 +3428,7 @@ class HiveDDLSuite
assert(loaded2.columns.head.comment() == "comment2")

verify(spyCatalog, times(1)).alterTable(any[CatalogTable])
verify(spyCatalog, times(1)).alterTableDataSchema(
verify(spyCatalog, times(1)).alterTableSchema(
any[String], any[String], any[StructType])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class HiveIncompatibleColTypeChangeSuite extends SparkFunSuite with TestHiveSing
spark.sql(createTableStmt)
val oldTable = catalog.getTable("default", tableName)
catalog.createTable(oldTable, true)
catalog.alterTableDataSchema("default", tableName, updatedSchema)
catalog.alterTableSchema("default", tableName, updatedSchema)

val updatedTable = catalog.getTable("default", tableName)
assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)
Expand Down