Skip to content

Commit f65f969

Browse files
feat(spark): support Hive DDL / Insert operations
Add support for DdlRel and WriteRel for Hive in Spark Signed-off-by: Andrew Coleman <[email protected]>
1 parent abdd39a commit f65f969

File tree

5 files changed

+264
-37
lines changed

5 files changed

+264
-37
lines changed

spark/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
metastore_db
2+
spark-warehouse
3+
/src/test/resources/write-a.csv
4+
derby.log

spark/build.gradle.kts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ dependencies {
116116
implementation(libs.scala.library)
117117
api(libs.spark.core)
118118
api(libs.spark.sql)
119+
implementation(libs.spark.hive)
119120
implementation(libs.spark.catalyst)
120121
implementation(libs.slf4j.api)
121122

@@ -148,6 +149,9 @@ tasks {
148149
test {
149150
dependsOn(":core:shadowJar")
150151
useJUnitPlatform { includeEngines("scalatest") }
151-
jvmArgs("--add-exports=java.base/sun.nio.ch=ALL-UNNAMED")
152+
jvmArgs(
153+
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED",
154+
"--add-opens=java.base/java.net=ALL-UNNAMED",
155+
)
152156
}
153157
}

spark/src/main/scala/io/substrait/spark/logical/ToLogicalPlan.scala

Lines changed: 79 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,31 @@ import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOute
2929
import org.apache.spark.sql.catalyst.plans.logical._
3030
import org.apache.spark.sql.catalyst.util.toPrettySQL
3131
import org.apache.spark.sql.execution.QueryExecution
32-
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, LeafRunnableCommand}
32+
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateTableCommand, DataWritingCommand, DropTableCommand, LeafRunnableCommand}
3333
import org.apache.spark.sql.execution.datasources.{FileFormat => SparkFileFormat, HadoopFsRelation, InMemoryFileIndex, InsertIntoHadoopFsRelationCommand, LogicalRelation, V1Writes}
3434
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3535
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
3636
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
37-
import org.apache.spark.sql.internal.SQLConf
37+
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
38+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
3839
import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
3940

4041
import io.substrait.`type`.{NamedStruct, StringTypeVisitor, Type}
4142
import io.substrait.{expression => exp}
4243
import io.substrait.expression.{Expression => SExpression}
4344
import io.substrait.plan.Plan
4445
import io.substrait.relation
45-
import io.substrait.relation.{ExtensionWrite, LocalFiles, NamedWrite}
46+
import io.substrait.relation.{ExtensionWrite, LocalFiles, NamedDdl, NamedWrite}
47+
import io.substrait.relation.AbstractDdlRel.{DdlObject, DdlOp}
4648
import io.substrait.relation.AbstractWriteRel.{CreateMode, WriteOp}
4749
import io.substrait.relation.Expand.{ConsistentField, SwitchingField}
4850
import io.substrait.relation.Set.SetOp
4951
import io.substrait.relation.files.FileFormat
5052
import io.substrait.util.EmptyVisitationContext
5153
import org.apache.hadoop.fs.Path
5254

55+
import java.net.URI
56+
5357
import scala.collection.JavaConverters.asScalaBufferConverter
5458
import scala.collection.mutable.ArrayBuffer
5559

@@ -437,35 +441,44 @@ class ToLogicalPlan(spark: SparkSession = SparkSession.builder().getOrCreate())
437441

438442
override def visit(write: NamedWrite, context: EmptyVisitationContext): LogicalPlan = {
439443
val child = write.getInput.accept(this, context)
440-
441-
val (table, database, catalog) = write.getNames.asScala match {
442-
case Seq(table) => (table, None, None)
443-
case Seq(database, table) => (table, Some(database), None)
444-
case Seq(catalog, database, table) => (table, Some(database), Some(catalog))
445-
case names =>
446-
throw new UnsupportedOperationException(
447-
s"NamedWrite requires up to three names ([[catalog,] database,] table): $names")
444+
val table = catalogTable(write.getNames.asScala)
445+
val isHive = spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) match {
446+
case "hive" => true
447+
case _ => false
448448
}
449-
val id = TableIdentifier(table, database, catalog)
450-
val catalogTable = CatalogTable(
451-
id,
452-
CatalogTableType.MANAGED,
453-
CatalogStorageFormat.empty,
454-
new StructType(),
455-
Some("parquet")
456-
)
457449
write.getOperation match {
458450
case WriteOp.CTAS =>
459451
withChild(child) {
460-
CreateDataSourceTableAsSelectCommand(
461-
catalogTable,
462-
saveMode(write.getCreateMode),
452+
if (isHive) {
453+
CreateHiveTableAsSelectCommand(
454+
table,
455+
child,
456+
write.getTableSchema.names().asScala,
457+
saveMode(write.getCreateMode)
458+
)
459+
} else {
460+
CreateDataSourceTableAsSelectCommand(
461+
table,
462+
saveMode(write.getCreateMode),
463+
child,
464+
write.getTableSchema.names().asScala
465+
)
466+
}
467+
}
468+
case WriteOp.INSERT =>
469+
withChild(child) {
470+
InsertIntoHiveTable(
471+
table,
472+
Map.empty,
463473
child,
474+
write.getCreateMode == CreateMode.REPLACE_IF_EXISTS,
475+
false,
464476
write.getTableSchema.names().asScala
465477
)
466478
}
467479
case op => throw new UnsupportedOperationException(s"Write mode $op not supported")
468480
}
481+
469482
}
470483

471484
override def visit(write: ExtensionWrite, context: EmptyVisitationContext): LogicalPlan = {
@@ -491,14 +504,7 @@ class ToLogicalPlan(spark: SparkSession = SparkSession.builder().getOrCreate())
491504
val (format, options) = convertFileFormat(file.getFileFormat.get)
492505

493506
val name = file.getPath.get.split('/').reverse.head
494-
val id = TableIdentifier(name)
495-
val table = CatalogTable(
496-
id,
497-
CatalogTableType.MANAGED,
498-
CatalogStorageFormat.empty,
499-
new StructType(),
500-
None
501-
)
507+
val table = catalogTable(Seq(name))
502508

503509
withChild(child) {
504510
V1Writes.apply(
@@ -519,6 +525,49 @@ class ToLogicalPlan(spark: SparkSession = SparkSession.builder().getOrCreate())
519525
}
520526
}
521527

528+
override def visit(ddl: NamedDdl, context: EmptyVisitationContext): LogicalPlan = {
529+
val table = catalogTable(ddl.getNames.asScala, ToSparkType.toStructType(ddl.getTableSchema))
530+
531+
(ddl.getOperation, ddl.getObject) match {
532+
case (DdlOp.CREATE, DdlObject.TABLE) => CreateTableCommand(table, false)
533+
case (DdlOp.DROP, DdlObject.TABLE) => DropTableCommand(table.identifier, false, false, false)
534+
case (DdlOp.DROP_IF_EXIST, DdlObject.TABLE) =>
535+
DropTableCommand(table.identifier, true, false, false)
536+
case op => throw new UnsupportedOperationException(s"Ddl operation $op not supported")
537+
}
538+
}
539+
540+
private def catalogTable(
541+
names: Seq[String],
542+
schema: StructType = new StructType()): CatalogTable = {
543+
val (table, database, catalog) = names match {
544+
case Seq(table) => (table, None, None)
545+
case Seq(database, table) => (table, Some(database), None)
546+
case Seq(catalog, database, table) => (table, Some(database), Some(catalog))
547+
case names =>
548+
throw new UnsupportedOperationException(
549+
s"NamedWrite requires up to three names ([[catalog,] database,] table): $names")
550+
}
551+
552+
val loc = spark.conf.get(StaticSQLConf.WAREHOUSE_PATH.key)
553+
val storage = CatalogStorageFormat(
554+
locationUri = Some(URI.create(f"$loc/$table")),
555+
inputFormat = None,
556+
outputFormat = None,
557+
serde = None,
558+
compressed = false,
559+
properties = Map.empty
560+
)
561+
val id = TableIdentifier(table, database, catalog)
562+
CatalogTable(
563+
id,
564+
CatalogTableType.MANAGED,
565+
storage,
566+
schema,
567+
Some("parquet")
568+
)
569+
}
570+
522571
private def saveMode(mode: CreateMode): SaveMode = mode match {
523572
case CreateMode.APPEND_IF_EXISTS => SaveMode.Append
524573
case CreateMode.REPLACE_IF_EXISTS => SaveMode.Overwrite

spark/src/main/scala/io/substrait/spark/logical/ToSubstraitRel.scala

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,31 @@ import io.substrait.spark.expression._
2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.SaveMode
2424
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
2526
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
2627
import org.apache.spark.sql.catalyst.expressions._
2728
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Sum}
2829
import org.apache.spark.sql.catalyst.plans._
2930
import org.apache.spark.sql.catalyst.plans.logical._
3031
import org.apache.spark.sql.execution.LogicalRDD
31-
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
32+
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateTableCommand, DropTableCommand}
3233
import org.apache.spark.sql.execution.datasources.{FileFormat => DSFileFormat, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation, V1WriteCommand, WriteFiles}
3334
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3435
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
3536
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
36-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
37+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, V2SessionCatalog}
38+
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
3739
import org.apache.spark.sql.types.{NullType, StructType}
3840

3941
import io.substrait.`type`.{NamedStruct, Type}
4042
import io.substrait.{proto, relation}
4143
import io.substrait.debug.TreePrinter
4244
import io.substrait.expression.{Expression => SExpression, ExpressionCreator}
45+
import io.substrait.expression.Expression.StructLiteral
4346
import io.substrait.extension.ExtensionCollector
4447
import io.substrait.hint.Hint
4548
import io.substrait.plan.Plan
49+
import io.substrait.relation.AbstractDdlRel.{DdlObject, DdlOp}
4650
import io.substrait.relation.AbstractWriteRel.{CreateMode, OutputMode, WriteOp}
4751
import io.substrait.relation.RelProtoConverter
4852
import io.substrait.relation.Set.SetOp
@@ -54,7 +58,7 @@ import io.substrait.utils.Util
5458
import java.util
5559
import java.util.{Collections, Optional}
5660

57-
import scala.collection.JavaConverters.asJavaIterableConverter
61+
import scala.collection.JavaConverters.{asJavaIterableConverter, seqAsJavaList}
5862
import scala.collection.mutable
5963
import scala.collection.mutable.ArrayBuffer
6064

@@ -75,9 +79,7 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
7579
override def default(p: LogicalPlan): relation.Rel = p match {
7680
case c: CommandResult => visit(c.commandLogicalPlan)
7781
case w: WriteFiles => visit(w.child)
78-
case c: V1WriteCommand => convertDataWritingCommand(c)
79-
case CreateDataSourceTableAsSelectCommand(table, mode, query, names) =>
80-
convertCTAS(table, mode, query, names)
82+
case c: Command => convertCommand(c)
8183
case p: LeafNode => convertReadOperator(p)
8284
case s: SubqueryAlias => visit(s.child)
8385
case v: View => visit(v.child)
@@ -566,6 +568,28 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
566568
}
567569
}
568570

571+
private def convertCommand(command: Command): relation.Rel = command match {
572+
case c: V1WriteCommand => convertDataWritingCommand(c)
573+
case CreateDataSourceTableAsSelectCommand(table, mode, query, names) =>
574+
convertCTAS(table, mode, query, names)
575+
case CreateHiveTableAsSelectCommand(table, query, names, mode) =>
576+
convertCTAS(table, mode, query, names)
577+
case CreateTableCommand(table, _) =>
578+
convertCreateTable(table.identifier.unquotedString.split("\\."), table.schema)
579+
case DropTableCommand(tableName, ifExists, _, _) =>
580+
convertDropTable(tableName.unquotedString.split("\\."), ifExists)
581+
case CreateTable(ResolvedIdentifier(c: V2SessionCatalog, id), tableSchema, _, _, _)
582+
if id.namespace().length > 0 =>
583+
val names = Seq(c.name(), id.namespace()(0), id.name())
584+
convertCreateTable(names, tableSchema)
585+
case DropTable(ResolvedIdentifier(c: V2SessionCatalog, id), ifExists, _)
586+
if id.namespace().length > 0 =>
587+
val names = Seq(c.name(), id.namespace()(0), id.name())
588+
convertDropTable(names, ifExists)
589+
case _ =>
590+
throw new UnsupportedOperationException(s"Unable to convert command: $command")
591+
}
592+
569593
private def convertDataWritingCommand(command: V1WriteCommand): relation.AbstractWriteRel =
570594
command match {
571595
case InsertIntoHadoopFsRelationCommand(
@@ -600,6 +624,16 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
600624
.tableSchema(outputSchema(child.output, outputColumnNames))
601625
.detail(FileHolder(file))
602626
.build()
627+
case InsertIntoHiveTable(table, _, child, overwrite, _, outputColumnNames, _, _, _, _, _) =>
628+
relation.NamedWrite
629+
.builder()
630+
.input(visit(child))
631+
.operation(WriteOp.INSERT)
632+
.outputMode(OutputMode.UNSPECIFIED)
633+
.createMode(if (overwrite) CreateMode.REPLACE_IF_EXISTS else CreateMode.ERROR_IF_EXISTS)
634+
.names(seqAsJavaList(table.identifier.unquotedString.split("\\.").toList))
635+
.tableSchema(outputSchema(child.output, outputColumnNames))
636+
.build()
603637
case _ =>
604638
throw new UnsupportedOperationException(s"Unable to convert command: ${command.getClass}")
605639
}
@@ -619,6 +653,29 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
619653
.tableSchema(outputSchema(query.output, outputColumnNames))
620654
.build()
621655

656+
private def convertCreateTable(names: Seq[String], schema: StructType): relation.NamedDdl = {
657+
relation.NamedDdl
658+
.builder()
659+
.operation(DdlOp.CREATE)
660+
.`object`(DdlObject.TABLE)
661+
.names(seqAsJavaList(names))
662+
.tableSchema(ToSubstraitType.toNamedStruct(schema))
663+
.tableDefaults(StructLiteral.builder.nullable(true).build())
664+
.build()
665+
}
666+
667+
private def convertDropTable(names: Seq[String], ifExists: Boolean): relation.NamedDdl = {
668+
relation.NamedDdl
669+
.builder()
670+
.operation(if (ifExists) DdlOp.DROP_IF_EXIST else DdlOp.DROP)
671+
.`object`(DdlObject.TABLE)
672+
.names(seqAsJavaList(names))
673+
.tableSchema(
674+
NamedStruct.builder().struct(Type.Struct.builder().nullable(true).build()).build())
675+
.tableDefaults(StructLiteral.builder.nullable(true).build())
676+
.build()
677+
}
678+
622679
private def createMode(mode: SaveMode): CreateMode = mode match {
623680
case SaveMode.Append => CreateMode.APPEND_IF_EXISTS
624681
case SaveMode.Overwrite => CreateMode.REPLACE_IF_EXISTS

0 commit comments

Comments
 (0)