Skip to content

Commit 33df1b6

Browse files
committed
[SPARK-51874][SQL][FOLLOW-UP] Revert API changes of rebase methods in DataSourceUtils and AvroOptions
### What changes were proposed in this pull request? A similar followup PR of #52065 . This PR restores the rebase APIs in `DataSourceUtils` for compatibility with external Spark plugins, and also in `AvroOptions` to simplify the code. ### Why are the changes needed? External plugins may use `DataSourceUtils` directly, see https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala#L194 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #52074 from cloud-fan/follow. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent ee619d3 commit 33df1b6

File tree

5 files changed

+16
-20
lines changed

5 files changed

+16
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@ private[sql] class AvroDeserializer(
5858
def this(
5959
rootAvroType: Schema,
6060
rootCatalystType: DataType,
61-
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
61+
datetimeRebaseMode: String,
6262
useStableIdForUnionType: Boolean,
6363
stableIdPrefixForUnionType: String,
6464
recursiveFieldMaxDepth: Int) = {
6565
this(
6666
rootAvroType,
6767
rootCatalystType,
6868
positionalFieldMatch = false,
69-
RebaseSpec(datetimeRebaseMode),
69+
RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
7070
new NoopFilters,
7171
useStableIdForUnionType,
7272
stableIdPrefixForUnionType,

sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
2929
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
3030
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode}
3131
import org.apache.spark.sql.errors.QueryCompilationErrors
32-
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
32+
import org.apache.spark.sql.internal.SQLConf
3333

3434
/**
3535
* Options for Avro Reader and Writer stored in case insensitive manner.
@@ -129,9 +129,9 @@ private[sql] class AvroOptions(
129129
/**
130130
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
131131
*/
132-
val datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters
133-
.get(DATETIME_REBASE_MODE).map(LegacyBehaviorPolicy.withName)
134-
.getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
132+
val datetimeRebaseModeInRead: String = parameters
133+
.get(DATETIME_REBASE_MODE)
134+
.getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ).toString)
135135

136136
val useStableIdForUnionType: Boolean =
137137
parameters.get(STABLE_ID_FOR_UNION_TYPE).map(_.toBoolean).getOrElse(false)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ object DataSourceUtils extends PredicateHelper {
128128

129129
private def getRebaseSpec(
130130
lookupFileMeta: String => String,
131-
modeByConfig: LegacyBehaviorPolicy.Value,
131+
modeByConfig: String,
132132
minVersion: String,
133133
metadataKey: String): RebaseSpec = {
134134
val policy = if (Utils.isTesting &&
@@ -146,7 +146,7 @@ object DataSourceUtils extends PredicateHelper {
146146
} else {
147147
LegacyBehaviorPolicy.CORRECTED
148148
}
149-
}.getOrElse(modeByConfig)
149+
}.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
150150
}
151151
policy match {
152152
case LegacyBehaviorPolicy.LEGACY =>
@@ -157,7 +157,7 @@ object DataSourceUtils extends PredicateHelper {
157157

158158
def datetimeRebaseSpec(
159159
lookupFileMeta: String => String,
160-
modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = {
160+
modeByConfig: String): RebaseSpec = {
161161
getRebaseSpec(
162162
lookupFileMeta,
163163
modeByConfig,
@@ -167,7 +167,7 @@ object DataSourceUtils extends PredicateHelper {
167167

168168
def int96RebaseSpec(
169169
lookupFileMeta: String => String,
170-
modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = {
170+
modeByConfig: String): RebaseSpec = {
171171
getRebaseSpec(
172172
lookupFileMeta,
173173
modeByConfig,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
4444
import org.apache.spark.sql.errors.QueryExecutionErrors
4545
import org.apache.spark.sql.execution.datasources._
4646
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
47-
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
47+
import org.apache.spark.sql.internal.SQLConf
4848
import org.apache.spark.sql.sources._
4949
import org.apache.spark.sql.types._
5050
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
@@ -182,10 +182,8 @@ class ParquetFileFormat
182182
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
183183
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
184184
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
185-
val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
186-
parquetOptions.datetimeRebaseModeInRead)
187-
val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
188-
parquetOptions.int96RebaseModeInRead)
185+
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
186+
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
189187

190188
// Should always be set by FileSourceScanExec creating this.
191189
// Check conf before checking option, to allow working around an issue by changing conf.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec
3838
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator}
3939
import org.apache.spark.sql.execution.datasources.parquet._
4040
import org.apache.spark.sql.execution.datasources.v2._
41-
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
41+
import org.apache.spark.sql.internal.SQLConf
4242
import org.apache.spark.sql.sources.Filter
4343
import org.apache.spark.sql.types.StructType
4444
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -81,10 +81,8 @@ case class ParquetPartitionReaderFactory(
8181
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
8282
private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
8383
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
84-
private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
85-
options.datetimeRebaseModeInRead)
86-
private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
87-
options.int96RebaseModeInRead)
84+
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
85+
private val int96RebaseModeInRead = options.int96RebaseModeInRead
8886

8987
private val parquetReaderCallback = new ParquetReaderCallback()
9088

0 commit comments

Comments
 (0)