Skip to content

Commit 929adf2

Browse files
committed
[SPARK-52683][SQL] Support ExternalCatalog alterTableSchema
1 parent 2b3eae0 commit 929adf2

File tree

13 files changed

+175
-14
lines changed

13 files changed

+175
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,23 @@ trait ExternalCatalog {
120120
* @param db Database that table to alter schema for exists in
121121
* @param table Name of table to alter schema for
122122
* @param newDataSchema Updated data schema to be used for the table.
123+
* @deprecated since 4.1.0 use `alterTableSchema` instead.
123124
*/
124125
def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit
125126

127+
/**
128+
* Alter the schema of a table identified by the provided database and table name.
129+
*
130+
* All partition columns must be preserved.
131+
*
132+
* @param db Database that table to alter schema for exists in
133+
* @param table Name of table to alter schema for
134+
* @param newSchema Updated data schema to be used for the table.
135+
*/
136+
def alterTableSchema(db: String, table: String, newSchema: StructType): Unit =
137+
throw new UnsupportedOperationException(
138+
"alterTableSchema is not supported by the current external catalog implementation")
139+
126140
/** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
127141
def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit
128142

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
125125
postToAll(AlterTableEvent(db, table, AlterTableKind.DATASCHEMA))
126126
}
127127

128+
override def alterTableSchema(db: String, table: String, newSchema: StructType): Unit = {
129+
postToAll(AlterTablePreEvent(db, table, AlterTableKind.SCHEMA))
130+
delegate.alterTableSchema(db, table, newSchema)
131+
postToAll(AlterTableEvent(db, table, AlterTableKind.SCHEMA))
132+
}
133+
128134
override def alterTableStats(
129135
db: String,
130136
table: String,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,21 @@ class InMemoryCatalog(
331331
catalog(db).tables(table).table = origTable.copy(schema = newSchema)
332332
}
333333

334+
override def alterTableSchema(
335+
db: String,
336+
table: String,
337+
newSchema: StructType): Unit = synchronized {
338+
requireTableExists(db, table)
339+
val origTable = catalog(db).tables(table).table
340+
341+
val partCols = origTable.partitionColumnNames
342+
assert(newSchema.map(_.name).takeRight(partCols.length) == partCols,
343+
s"Partition columns ${partCols.mkString("[", ", ", "]")} are only supported at the end of " +
344+
s"the new schema ${newSchema.catalogString} for now.")
345+
346+
catalog(db).tables(table).table = origTable.copy(schema = newSchema)
347+
}
348+
334349
override def alterTableStats(
335350
db: String,
336351
table: String,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,25 @@ class SessionCatalog(
507507
externalCatalog.alterTableDataSchema(db, table, newDataSchema)
508508
}
509509

510+
/**
511+
* Alter the schema of a table identified by the provided table identifier. All partition columns
512+
* must be preserved.
513+
*
514+
* @param identifier TableIdentifier
515+
* @param newSchema Updated schema to be used for the table
516+
*/
517+
def alterTableSchema(
518+
identifier: TableIdentifier,
519+
newSchema: StructType): Unit = {
520+
val qualifiedIdent = qualifyIdentifier(identifier)
521+
val db = qualifiedIdent.database.get
522+
val table = qualifiedIdent.table
523+
requireDbExists(db)
524+
requireTableExists(qualifiedIdent)
525+
526+
externalCatalog.alterTableSchema(db, table, newSchema)
527+
}
528+
510529
private def columnNameResolved(
511530
resolver: Resolver,
512531
schema: StructType,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ case class RenameTableEvent(
126126
object AlterTableKind extends Enumeration {
127127
val TABLE = "table"
128128
val DATASCHEMA = "dataSchema"
129+
val SCHEMA = "schema"
129130
val STATS = "stats"
130131
}
131132

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
128128

129129
// ALTER schema
130130
val newSchema = new StructType().add("id", "long", nullable = false)
131-
catalog.alterTableDataSchema("db5", "tbl1", newSchema)
132-
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) ::
133-
AlterTableEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: Nil)
131+
catalog.alterTableSchema("db5", "tbl1", newSchema)
132+
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.SCHEMA) ::
133+
AlterTableEvent("db5", "tbl1", AlterTableKind.SCHEMA) :: Nil)
134134

135135
// ALTER stats
136136
catalog.alterTableStats("db5", "tbl1", None)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
245245

246246
test("alter table schema") {
247247
val catalog = newBasicCatalog()
248-
val newDataSchema = StructType(Seq(
248+
val newSchema = StructType(Seq(
249249
StructField("col1", IntegerType),
250-
StructField("new_field_2", StringType)))
251-
catalog.alterTableDataSchema("db2", "tbl1", newDataSchema)
250+
StructField("new_field_2", StringType),
251+
StructField("a", IntegerType),
252+
StructField("b", StringType)))
253+
catalog.alterTableSchema("db2", "tbl1", newSchema)
252254
val newTbl1 = catalog.getTable("db2", "tbl1")
253-
assert(newTbl1.dataSchema == newDataSchema)
255+
assert(newTbl1.dataSchema == StructType(newSchema.take(2)))
256+
assert(newTbl1.schema == newSchema)
254257
}
255258

256259
test("alter table stats") {
@@ -983,6 +986,32 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
983986
"db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
984987
assert(fs.exists(partPath))
985988
}
989+
990+
test("SPARK-52683: support alterTableSchema partitioned columns") {
991+
val catalog = newBasicCatalog()
992+
993+
val schema = new StructType()
994+
.add("a", IntegerType)
995+
.add("b", IntegerType)
996+
.add("c", StringType)
997+
val table = CatalogTable(
998+
identifier = TableIdentifier("t", Some("db1")),
999+
tableType = CatalogTableType.MANAGED,
1000+
storage = storageFormat,
1001+
schema = schema,
1002+
partitionColumnNames = Seq("c"),
1003+
provider = Some("hive"))
1004+
catalog.createTable(table, ignoreIfExists = false)
1005+
1006+
val newSchema = new StructType()
1007+
.add("b", LongType)
1008+
.add("a", IntegerType)
1009+
.add("c", StringType)
1010+
catalog.alterTableSchema("db1", "t", newSchema)
1011+
1012+
assert(catalog.getTable("db1", "t").schema == newSchema)
1013+
assert(catalog.getTable("db1", "t").partitionColumnNames == Seq("c"))
1014+
}
9861015
}
9871016

9881017

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
564564
}
565565
}
566566

567-
test("alter table add columns") {
567+
test("alter data schema add columns") {
568568
withBasicCatalog { sessionCatalog =>
569569
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
570570
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
@@ -580,6 +580,22 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
580580
}
581581
}
582582

583+
test("alter schema add columns") {
584+
withBasicCatalog { sessionCatalog =>
585+
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
586+
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
587+
val newSchema = StructType(oldTab.dataSchema.fields ++
588+
Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
589+
590+
sessionCatalog.alterTableSchema(
591+
TableIdentifier("t1", Some("default")),
592+
newSchema)
593+
594+
val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
595+
assert(newTab.schema == newSchema)
596+
}
597+
}
598+
583599
test("alter table drop columns") {
584600
withBasicCatalog { sessionCatalog =>
585601
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,8 @@ case class AlterTableAddColumnsCommand(
247247
}
248248
DDLUtils.checkTableColumns(catalogTable, StructType(colsWithProcessedDefaults))
249249

250-
val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
251-
catalog.alterTableDataSchema(table, StructType(existingSchema ++ colsWithProcessedDefaults))
250+
val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.schema)
251+
catalog.alterTableSchema(table, StructType(existingSchema ++ colsWithProcessedDefaults))
252252
Seq.empty[Row]
253253
}
254254

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
309309
collation = collation, storage = storage))
310310
}
311311
if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) {
312-
catalog.alterTableDataSchema(ident.asTableIdentifier, schema)
312+
catalog.alterTableSchema(ident.asTableIdentifier, schema)
313313
}
314314
} catch {
315315
case _: NoSuchTableException =>

0 commit comments

Comments
 (0)