Skip to content

Commit 77413d4

Browse files
committed
[SPARK-51874][SQL][FOLLOW-UP] Revert ParquetOptions rebase methods to return string type
### What changes were proposed in this pull request? This is a followup of #50674 . In that PR, we made it easier to define sql configs with enum values, and we also refactored some code to make things simpler. This PR reverts the API changes of `ParquetOptions`. Ideally we can change private APIs, but Parquet is a very popular format and there are third-party spark plugins that use Parquet related private APIs in Spark. We can of course ask these Spark plugins to update their code or add shim layers, but it's more friendly to avoid breaking certain private APIs if easy. ### Why are the changes needed? avoid breaking private APIs that used by Spark plugins, such as https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala#L150 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #52065 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent af0b444 commit 77413d4

File tree

3 files changed

+16
-13
lines changed

3 files changed

+16
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
4444
import org.apache.spark.sql.errors.QueryExecutionErrors
4545
import org.apache.spark.sql.execution.datasources._
4646
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
47-
import org.apache.spark.sql.internal.SQLConf
47+
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
4848
import org.apache.spark.sql.sources._
4949
import org.apache.spark.sql.types._
5050
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
@@ -182,8 +182,10 @@ class ParquetFileFormat
182182
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
183183
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
184184
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
185-
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
186-
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
185+
val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
186+
parquetOptions.datetimeRebaseModeInRead)
187+
val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
188+
parquetOptions.int96RebaseModeInRead)
187189

188190
// Should always be set by FileSourceScanExec creating this.
189191
// Check conf before checking option, to allow working around an issue by changing conf.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
2424
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
2525
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2626
import org.apache.spark.sql.errors.QueryExecutionErrors
27-
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
27+
import org.apache.spark.sql.internal.SQLConf
2828

2929
/**
3030
* Options for the Parquet data source.
@@ -74,16 +74,15 @@ class ParquetOptions(
7474
/**
7575
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
7676
*/
77-
def datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters
77+
def datetimeRebaseModeInRead: String = parameters
7878
.get(DATETIME_REBASE_MODE)
79-
.map(LegacyBehaviorPolicy.withName)
80-
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ))
79+
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ).toString)
8180
/**
8281
* The rebasing mode for INT96 timestamp values in reads.
8382
*/
84-
def int96RebaseModeInRead: LegacyBehaviorPolicy.Value = parameters
85-
.get(INT96_REBASE_MODE).map(LegacyBehaviorPolicy.withName)
86-
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ))
83+
def int96RebaseModeInRead: String = parameters
84+
.get(INT96_REBASE_MODE)
85+
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ).toString)
8786
}
8887

8988

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec
3838
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator}
3939
import org.apache.spark.sql.execution.datasources.parquet._
4040
import org.apache.spark.sql.execution.datasources.v2._
41-
import org.apache.spark.sql.internal.SQLConf
41+
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
4242
import org.apache.spark.sql.sources.Filter
4343
import org.apache.spark.sql.types.StructType
4444
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -81,8 +81,10 @@ case class ParquetPartitionReaderFactory(
8181
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
8282
private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
8383
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
84-
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
85-
private val int96RebaseModeInRead = options.int96RebaseModeInRead
84+
private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
85+
options.datetimeRebaseModeInRead)
86+
private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
87+
options.int96RebaseModeInRead)
8688

8789
private val parquetReaderCallback = new ParquetReaderCallback()
8890

0 commit comments

Comments
 (0)