From 29f253bad017a9cec33b63ee1585bd6b24bdbb7f Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 3 Jul 2025 22:26:13 -0700 Subject: [PATCH 1/3] [SPARK-52683][SQL] Support ExternalCatalog alterTableSchema --- .../catalyst/catalog/ExternalCatalog.scala | 14 ++++++ .../catalog/ExternalCatalogWithListener.scala | 6 +++ .../catalyst/catalog/InMemoryCatalog.scala | 15 +++++++ .../sql/catalyst/catalog/SessionCatalog.scala | 19 ++++++++ .../spark/sql/catalyst/catalog/events.scala | 1 + .../catalog/ExternalCatalogEventSuite.scala | 6 +-- .../catalog/ExternalCatalogSuite.scala | 37 ++++++++++++++-- .../catalog/SessionCatalogSuite.scala | 18 +++++++- .../spark/sql/execution/command/tables.scala | 4 +- .../datasources/v2/V2SessionCatalog.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 43 +++++++++++++++++++ .../sql/hive/HiveExternalCatalogSuite.scala | 4 +- .../HiveIncompatibleColTypeChangeSuite.scala | 2 +- 13 files changed, 157 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 979613ae11266..52d1d7ca0039b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -120,9 +120,23 @@ trait ExternalCatalog { * @param db Database that table to alter schema for exists in * @param table Name of table to alter schema for * @param newDataSchema Updated data schema to be used for the table. + * @deprecated since 4.1.0 use `alterTableSchema` instead. */ def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit + /** + * Alter the schema of a table identified by the provided database and table name. + * + * All partition columns must be preserved. + * + * @param db Database that table to alter schema for exists in + * @param table Name of table to alter schema for + * @param newSchema Updated data schema to be used for the table. + */ + def alterTableSchema(db: String, table: String, newSchema: StructType): Unit = + throw new UnsupportedOperationException( + "alterTableSchema is not supported by the current external catalog implementation") + /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */ def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala index c2613ff74da4a..33f088079caa7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala @@ -125,6 +125,12 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog) postToAll(AlterTableEvent(db, table, AlterTableKind.DATASCHEMA)) } + override def alterTableSchema(db: String, table: String, newSchema: StructType): Unit = { + postToAll(AlterTablePreEvent(db, table, AlterTableKind.SCHEMA)) + delegate.alterTableSchema(db, table, newSchema) + postToAll(AlterTableEvent(db, table, AlterTableKind.SCHEMA)) + } + override def alterTableStats( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 284ca63d820fe..5d0184579faac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -331,6 +331,21 @@ class InMemoryCatalog( catalog(db).tables(table).table = origTable.copy(schema = newSchema) } + override def alterTableSchema( + db: String, + table: String, + newSchema: StructType): Unit = synchronized { + requireTableExists(db, table) + val origTable = catalog(db).tables(table).table + + val partCols = origTable.partitionColumnNames + assert(newSchema.map(_.name).takeRight(partCols.length) == partCols, + s"Partition columns ${partCols.mkString("[", ", ", "]")} are only supported at the end of " + + s"the new schema ${newSchema.catalogString} for now.") + + catalog(db).tables(table).table = origTable.copy(schema = newSchema) + } + override def alterTableStats( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 531b453213ed0..40b1809661640 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -507,6 +507,25 @@ class SessionCatalog( externalCatalog.alterTableDataSchema(db, table, newDataSchema) } + /** + * Alter the schema of a table identified by the provided table identifier. All partition columns + * must be preserved. + * + * @param identifier TableIdentifier + * @param newSchema Updated schema to be used for the table + */ + def alterTableSchema( + identifier: TableIdentifier, + newSchema: StructType): Unit = { + val qualifiedIdent = qualifyIdentifier(identifier) + val db = qualifiedIdent.database.get + val table = qualifiedIdent.table + requireDbExists(db) + requireTableExists(qualifiedIdent) + + externalCatalog.alterTableSchema(db, table, newSchema) + } + private def columnNameResolved( resolver: Resolver, schema: StructType, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala index e7d41644392d5..974c225afbae3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala @@ -126,6 +126,7 @@ case class RenameTableEvent( object AlterTableKind extends Enumeration { val TABLE = "table" val DATASCHEMA = "dataSchema" + val SCHEMA = "schema" val STATS = "stats" } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala index 366188c3327be..15858bf2cc691 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala @@ -128,9 +128,9 @@ class ExternalCatalogEventSuite extends SparkFunSuite { // ALTER schema val newSchema = new StructType().add("id", "long", nullable = false) - catalog.alterTableDataSchema("db5", "tbl1", newSchema) - checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: - AlterTableEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: Nil) + catalog.alterTableSchema("db5", "tbl1", newSchema) + checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.SCHEMA) :: + AlterTableEvent("db5", "tbl1", AlterTableKind.SCHEMA) :: Nil) // ALTER stats catalog.alterTableStats("db5", "tbl1", None) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index f4b0c232c25f6..5bb81873449cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -245,12 +245,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite { test("alter table schema") { val catalog = newBasicCatalog() - val newDataSchema = StructType(Seq( + val newSchema = StructType(Seq( StructField("col1", IntegerType), - StructField("new_field_2", StringType))) - catalog.alterTableDataSchema("db2", "tbl1", newDataSchema) + StructField("new_field_2", StringType), + StructField("a", IntegerType), + StructField("b", StringType))) + catalog.alterTableSchema("db2", "tbl1", newSchema) val newTbl1 = catalog.getTable("db2", "tbl1") - assert(newTbl1.dataSchema == newDataSchema) + assert(newTbl1.dataSchema == StructType(newSchema.take(2))) + assert(newTbl1.schema == newSchema) } test("alter table stats") { @@ -983,6 +986,32 @@ abstract class ExternalCatalogSuite extends SparkFunSuite { "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) assert(fs.exists(partPath)) } + + test("SPARK-52683: support alterTableSchema partitioned columns") { + val catalog = newBasicCatalog() + + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("c", StringType) + val table = CatalogTable( + identifier = TableIdentifier("t", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = storageFormat, + schema = schema, + partitionColumnNames = Seq("c"), + provider = Some("hive")) + catalog.createTable(table, ignoreIfExists = false) + + val newSchema = new StructType() + .add("b", LongType) + .add("a", IntegerType) + .add("c", StringType) + catalog.alterTableSchema("db1", "t", newSchema) + + assert(catalog.getTable("db1", "t").schema == newSchema) + assert(catalog.getTable("db1", "t").partitionColumnNames == Seq("c")) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index e532ebecc4422..f319a97d1133f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -564,7 +564,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { } } - test("alter table add columns") { + test("alter data schema add columns") { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") @@ -580,6 +580,22 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { } } + test("alter schema add columns") { + withBasicCatalog { sessionCatalog => + sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) + val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") + val newSchema = StructType(oldTab.dataSchema.fields ++ + Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema) + + sessionCatalog.alterTableSchema( + TableIdentifier("t1", Some("default")), + newSchema) + + val newTab = sessionCatalog.externalCatalog.getTable("default", "t1") + assert(newTab.schema == newSchema) + } + } + test("alter table drop columns") { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 092e6669338ee..b235c9a95b49f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -247,8 +247,8 @@ case class AlterTableAddColumnsCommand( } DDLUtils.checkTableColumns(catalogTable, StructType(colsWithProcessedDefaults)) - val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema) - catalog.alterTableDataSchema(table, StructType(existingSchema ++ colsWithProcessedDefaults)) + val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.schema) + catalog.alterTableSchema(table, StructType(existingSchema ++ colsWithProcessedDefaults)) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 1088e3f7a7206..891728e6f7e19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -309,7 +309,7 @@ class V2SessionCatalog(catalog: SessionCatalog) collation = collation, storage = storage)) } if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) { - catalog.alterTableDataSchema(ident.asTableIdentifier, schema) + catalog.alterTableSchema(ident.asTableIdentifier, schema) } } catch { case _: NoSuchTableException => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5c7a60151c496..5d4cdd7c78e7f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -735,6 +735,49 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + /** + * Alter the schema of a table identified by the provided database and table name. + */ + override def alterTableSchema( + db: String, + table: String, + newSchema: StructType): Unit = withClient { + requireTableExists(db, table) + val oldTable = getTable(db, table) + val schemaProps = { + tableMetaToTableProps(oldTable, StructType(newSchema)).toMap + } + + val partCols = oldTable.partitionColumnNames + assert(newSchema.map(_.name).takeRight(partCols.length) == partCols, + s"Partition columns ${partCols.mkString("[", ", ", "]")} are only supported at the end of " + + s"the new schema ${newSchema.catalogString} for now.") + + val newDataSchema = StructType(newSchema.filter( + f => !oldTable.partitionColumnNames.contains(f.name))) + val hiveSchema = removeCollation(newDataSchema) + + if (isDatasourceTable(oldTable)) { + // For data source tables, first try to write it with the schema set; if that does not work, + // try again with updated properties and the partition schema. This is a simplified version of + // what createDataSourceTable() does, and may leave the table in a state unreadable by Hive + // (for example, the schema does not match the data source schema, or does not match the + // storage descriptor). + try { + client.alterTableDataSchema(db, table, hiveSchema, schemaProps) + } catch { + case NonFatal(e) => + val warningMessage = log"Could not alter schema of table " + + log"${MDC(TABLE_NAME, oldTable.identifier.quotedString)} in a Hive compatible way. " + + log"Updating Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps) + } + } else { + client.alterTableDataSchema(db, table, hiveSchema, schemaProps) + } + } + private def removeCollation(schema: StructType): StructType = { // Since collated strings do not exist in Hive as a type we need to replace them with // the the regular string type. However, as we save the original schema in the table diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 655491d245356..db522b72e4cca 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -103,7 +103,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { """.stripMargin) val newSchema = new StructType().add("a", "string").add("b", "string").add("c", "string") - catalog.alterTableDataSchema("db1", "t", newSchema) + catalog.alterTableSchema("db1", "t", newSchema) assert(catalog.getTable("db1", "t").schema == newSchema) val bucketString = externalCatalog.client.runSqlHive("DESC FORMATTED db1.t") @@ -234,7 +234,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { val newSchema = StructType(Seq( StructField("col1", StringType("UTF8_LCASE")) )) - catalog.alterTableDataSchema("db1", tableName, newSchema) + catalog.alterTableSchema("db1", tableName, newSchema) val alteredRawTable = externalCatalog.getRawTable("db1", tableName) assert(DataTypeUtils.sameType(alteredRawTable.schema, noCollationsSchema)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveIncompatibleColTypeChangeSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveIncompatibleColTypeChangeSuite.scala index a362db1ba4ef8..bfbbc3bd4ff9c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveIncompatibleColTypeChangeSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveIncompatibleColTypeChangeSuite.scala @@ -99,7 +99,7 @@ class HiveIncompatibleColTypeChangeSuite extends SparkFunSuite with TestHiveSing spark.sql(createTableStmt) val oldTable = catalog.getTable("default", tableName) catalog.createTable(oldTable, true) - catalog.alterTableDataSchema("default", tableName, updatedSchema) + catalog.alterTableSchema("default", tableName, updatedSchema) val updatedTable = catalog.getTable("default", tableName) assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames) From 336740896870bc0e58f4c1f8051d8f16fdc30fb0 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 4 Jul 2025 00:15:08 -0700 Subject: [PATCH 2/3] Remove unsupported exception --- .../apache/spark/sql/catalyst/catalog/ExternalCatalog.scala | 4 +--- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 52d1d7ca0039b..d1f37020f2111 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -133,9 +133,7 @@ trait ExternalCatalog { * @param table Name of table to alter schema for * @param newSchema Updated data schema to be used for the table. */ - def alterTableSchema(db: String, table: String, newSchema: StructType): Unit = - throw new UnsupportedOperationException( - "alterTableSchema is not supported by the current external catalog implementation") + def alterTableSchema(db: String, table: String, newSchema: StructType): Unit /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */ def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 40b1809661640..c0d04d9d06fa1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -484,6 +484,7 @@ class SessionCatalog( * * @param identifier TableIdentifier * @param newDataSchema Updated data schema to be used for the table + * @deprecated since 4.1.0 use `alterTableSchema` instead. */ def alterTableDataSchema( identifier: TableIdentifier, From e2ee91385d8d6c948c22f62c0bec112572a9682f Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 4 Jul 2025 09:54:25 -0700 Subject: [PATCH 3/3] Review comment and fix test --- .../org/apache/spark/sql/execution/command/tables.scala | 5 +++-- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index b235c9a95b49f..d5dd934af2be1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -247,8 +247,9 @@ case class AlterTableAddColumnsCommand( } DDLUtils.checkTableColumns(catalogTable, StructType(colsWithProcessedDefaults)) - val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.schema) - catalog.alterTableSchema(table, StructType(existingSchema ++ colsWithProcessedDefaults)) + val existingDataSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema) + catalog.alterTableSchema(table, + StructType(existingDataSchema ++ colsWithProcessedDefaults ++ catalogTable.partitionSchema)) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 13e8d3721d81e..e43d1cdb19d4d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -3418,7 +3418,7 @@ class HiveDDLSuite assert(loaded.properties().get("foo") == "bar") verify(spyCatalog, times(1)).alterTable(any[CatalogTable]) - verify(spyCatalog, times(0)).alterTableDataSchema( + verify(spyCatalog, times(0)).alterTableSchema( any[String], any[String], any[StructType]) v2SessionCatalog.alterTable(identifier, @@ -3428,7 +3428,7 @@ class HiveDDLSuite assert(loaded2.columns.head.comment() == "comment2") verify(spyCatalog, times(1)).alterTable(any[CatalogTable]) - verify(spyCatalog, times(1)).alterTableDataSchema( + verify(spyCatalog, times(1)).alterTableSchema( any[String], any[String], any[StructType]) } }