Skip to content

Commit 181900d

Browse files
committed
initial commit
1 parent 8530444 commit 181900d

File tree

8 files changed

+215
-13
lines changed

8 files changed

+215
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,10 @@ object Analyzer {
256256
"spark.sql.expressionTreeChangeLog.level"
257257
)
258258

259-
def retainResolutionConfigsForAnalysis(newConf: SQLConf, existingConf: SQLConf): Unit = {
259+
def retainResolutionConfigsForAnalysis(
260+
newConf: SQLConf,
261+
existingConf: SQLConf,
262+
createSparkVersion: String = ""): Unit = {
260263
val retainedConfigs = existingConf.getAllConfs.filter { case (key, _) =>
261264
// Also apply catalog configs
262265
RETAINED_ANALYSIS_FLAGS.contains(key) || key.startsWith("spark.sql.catalog.")
@@ -265,6 +268,25 @@ object Analyzer {
265268
retainedConfigs.foreach { case (k, v) =>
266269
newConf.settings.put(k, v)
267270
}
271+
272+
trySetAnsiValue(newConf, createSparkVersion)
273+
}
274+
275+
/**
276+
* In case ANSI value wasn't persisted for a view or a UDF, we set it to `true` in case Spark
277+
* version used to create the view is 4.0.0 or higher. We set it to `false` in case Spark version
278+
* is lower than 4.0.0 or if the Spark version wasn't stored (in that case we assume that the
279+
* value is `false`)
280+
*/
281+
def trySetAnsiValue(sqlConf: SQLConf, createSparkVersion: String = ""): Unit = {
282+
if (conf.getConf(SQLConf.ASSUME_ANSI_FALSE_IF_NOT_PERSISTED) &&
283+
!sqlConf.settings.containsKey(SQLConf.ANSI_ENABLED.key)) {
284+
if (createSparkVersion.startsWith("4.")) {
285+
sqlConf.settings.put(SQLConf.ANSI_ENABLED.key, "true")
286+
} else {
287+
sqlConf.settings.put(SQLConf.ANSI_ENABLED.key, "false")
288+
}
289+
}
268290
}
269291
}
270292

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewResolution.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,13 @@ object ViewResolution {
4343
view
4444
)
4545
}
46-
SQLConf.withExistingConf(View.effectiveSQLConf(view.desc.viewSQLConfigs, view.isTempView)) {
46+
SQLConf.withExistingConf(
47+
View.effectiveSQLConf(
48+
configs = view.desc.viewSQLConfigs,
49+
isTempView = view.isTempView,
50+
createSparkVersion = view.desc.createVersion
51+
)
52+
) {
4753
resolveChild(view.child)
4854
}
4955
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,7 +1002,13 @@ class SessionCatalog(
10021002
objectType = Some("VIEW"),
10031003
objectName = Some(metadata.qualifiedName)
10041004
)
1005-
val parsedPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView)) {
1005+
val parsedPlan = SQLConf.withExistingConf(
1006+
View.effectiveSQLConf(
1007+
configs = viewConfigs,
1008+
isTempView = isTempView,
1009+
createSparkVersion = metadata.createVersion
1010+
)
1011+
) {
10061012
CurrentOrigin.withOrigin(origin) {
10071013
parser.parseQuery(viewText)
10081014
}
@@ -1030,7 +1036,11 @@ class SessionCatalog(
10301036
// Note that, the column names may have duplication, e.g. `CREATE VIEW v(x, y) AS
10311037
// SELECT 1 col, 2 col`. We need to make sure that the matching attributes have the same
10321038
// number of duplications, and pick the corresponding attribute by ordinal.
1033-
val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView)
1039+
val viewConf = View.effectiveSQLConf(
1040+
configs = metadata.viewSQLConfigs,
1041+
isTempView = isTempView,
1042+
createSparkVersion = metadata.createVersion
1043+
)
10341044
val normalizeColName: String => String = if (viewConf.caseSensitiveAnalysis) {
10351045
identity
10361046
} else {
@@ -1642,6 +1652,7 @@ class SessionCatalog(
16421652
// Use captured SQL configs when parsing a SQL function.
16431653
val conf = new SQLConf()
16441654
function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) }
1655+
Analyzer.trySetAnsiValue(conf)
16451656
SQLConf.withExistingConf(conf) {
16461657
val inputParam = function.inputParam
16471658
val returnType = function.getScalarFuncReturnType

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,10 @@ case class View(
864864
}
865865

866866
object View {
867-
def effectiveSQLConf(configs: Map[String, String], isTempView: Boolean): SQLConf = {
867+
def effectiveSQLConf(
868+
configs: Map[String, String],
869+
isTempView: Boolean,
870+
createSparkVersion: String = ""): SQLConf = {
868871
val activeConf = SQLConf.get
869872
// For temporary view, we always use captured sql configs
870873
if (activeConf.useCurrentSQLConfigsForView && !isTempView) return activeConf
@@ -873,7 +876,12 @@ object View {
873876
for ((k, v) <- configs) {
874877
sqlConf.settings.put(k, v)
875878
}
876-
Analyzer.retainResolutionConfigsForAnalysis(newConf = sqlConf, existingConf = activeConf)
879+
Analyzer.retainResolutionConfigsForAnalysis(
880+
newConf = sqlConf,
881+
existingConf = activeConf,
882+
createSparkVersion = createSparkVersion
883+
)
884+
877885
sqlConf
878886
}
879887
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6243,6 +6243,15 @@ object SQLConf {
62436243
.createWithDefault(false)
62446244
}
62456245

6246+
val ASSUME_ANSI_FALSE_IF_NOT_PERSISTED =
6247+
buildConf("spark.sql.assumeAnsiFalseIfNotPersisted.enabled")
6248+
.internal()
6249+
.doc("If enabled, assume ANSI mode is false if not persisted during view or UDF " +
6250+
"creation. Otherwise use the default value.")
6251+
.version("4.0.1")
6252+
.booleanConf
6253+
.createWithDefault(true)
6254+
62466255
/**
62476256
* Holds information about keys that have been deprecated.
62486257
*

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CreateUserDefinedFunctionCommand.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command
1919

2020
import java.util.Locale
2121

22+
import scala.collection.mutable
23+
2224
import org.apache.spark.sql.AnalysisException
2325
import org.apache.spark.sql.catalyst.FunctionIdentifier
2426
import org.apache.spark.sql.catalyst.catalog.{LanguageSQL, RoutineLanguage, UserDefinedFunctionErrors}
@@ -87,10 +89,25 @@ object CreateUserDefinedFunctionCommand {
8789
* [[org.apache.spark.sql.catalyst.expressions.ExpressionInfo]], all SQL configs and other
8890
* function properties (such as the function parameters and the function return type)
8991
* are saved together in a property map.
92+
*
93+
* Here we only capture the SQL configs that are modifiable and should be captured, i.e. not in
94+
* the denyList and in the allowList. Besides mentioned ones we also capture `ANSI_ENABLED`.
95+
*
96+
* We need to always capture them to make sure we apply the same configs when querying the
97+
* function.
9098
*/
9199
def sqlConfigsToProps(conf: SQLConf): Map[String, String] = {
92100
val modifiedConfs = ViewHelper.getModifiedConf(conf)
93-
modifiedConfs.map { case (key, value) => s"$SQL_CONFIG_PREFIX$key" -> value }
101+
102+
val alwaysCaptured = Seq(SQLConf.ANSI_ENABLED)
103+
.filter(c => !modifiedConfs.contains(c.key))
104+
.map(c => (c.key, conf.getConf(c).toString))
105+
106+
val props = new mutable.HashMap[String, String]
107+
for ((key, value) <- modifiedConfs ++ alwaysCaptured) {
108+
props.put(s"$SQL_CONFIG_PREFIX$key", value)
109+
}
110+
props.toMap
94111
}
95112

96113
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -493,16 +493,19 @@ object ViewHelper extends SQLConfHelper with Logging {
493493
}
494494

495495
/**
496-
* Convert the view SQL configs to `properties`.
496+
* Convert the view SQL configs to `properties`. Here we only capture the SQL configs that are
497+
* modifiable and should be captured, i.e. not in the denyList and in the allowList. We also
498+
* capture `SESSION_LOCAL_TIMEZONE` whose default value relies on the JVM system timezone and
499+
* the `ANSI_ENABLED` value.
500+
*
501+
* We need to always capture them to make sure we apply the same configs when querying the view.
497502
*/
498503
private def sqlConfigsToProps(conf: SQLConf): Map[String, String] = {
499504
val modifiedConfs = getModifiedConf(conf)
500-
// Some configs have dynamic default values, such as SESSION_LOCAL_TIMEZONE whose
501-
// default value relies on the JVM system timezone. We need to always capture them to
502-
// to make sure we apply the same configs when reading the view.
503-
val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE)
505+
506+
val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE, SQLConf.ANSI_ENABLED)
504507
.filter(c => !modifiedConfs.contains(c.key))
505-
.map(c => (c.key, conf.getConf(c)))
508+
.map(c => (c.key, conf.getConf(c).toString))
506509

507510
val props = new mutable.HashMap[String, String]
508511
for ((key, value) <- modifiedConfs ++ alwaysCaptured) {
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.catalyst.analysis.SQLScalarFunction
23+
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SQLFunction}
24+
import org.apache.spark.sql.catalyst.expressions.Alias
25+
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, View}
26+
import org.apache.spark.sql.internal.SQLConf
27+
import org.apache.spark.sql.test.SharedSparkSession
28+
import org.apache.spark.sql.types.StructType
29+
30+
/**
31+
* This suite tests if default ANSI value is persisted for views and functions if not explicitly
32+
* set.
33+
*/
34+
class DefaultANSIValueSuite extends QueryTest with SharedSparkSession {
35+
36+
protected override def sparkConf: SparkConf = {
37+
super.sparkConf
38+
.set(SQLConf.ASSUME_ANSI_FALSE_IF_NOT_PERSISTED.key, "true")
39+
}
40+
41+
private val testViewName = "test_view"
42+
private val testFunctionName = "test_function"
43+
44+
test("Default ANSI value is stored for views") {
45+
withView(testViewName) {
46+
testView(expectedAnsiValue = true)
47+
}
48+
}
49+
50+
test("Explicitly set ANSI value is respected over default one for views") {
51+
withView(testViewName) {
52+
withSQLConf("spark.sql.ansi.enabled" -> "false") {
53+
testView(expectedAnsiValue = false)
54+
}
55+
}
56+
57+
withView(testViewName) {
58+
withSQLConf("spark.sql.ansi.enabled" -> "true") {
59+
testView(expectedAnsiValue = true)
60+
}
61+
}
62+
}
63+
64+
test("Default ANSI value is stored for functions") {
65+
withUserDefinedFunction(testFunctionName -> false) {
66+
testFunction(expectedAnsiValue = true)
67+
}
68+
}
69+
70+
test("Explicitly set ANSI value is respected over default one for functions") {
71+
withUserDefinedFunction(testFunctionName -> false) {
72+
withSQLConf("spark.sql.ansi.enabled" -> "false") {
73+
testFunction(expectedAnsiValue = false)
74+
}
75+
}
76+
77+
withUserDefinedFunction(testFunctionName -> false) {
78+
withSQLConf("spark.sql.ansi.enabled" -> "true") {
79+
testFunction(expectedAnsiValue = true)
80+
}
81+
}
82+
}
83+
84+
test("ANSI value is set to false if not persisted for views") {
85+
val catalogTable = new CatalogTable(
86+
identifier = TableIdentifier(testViewName),
87+
tableType = CatalogTableType.VIEW,
88+
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
89+
schema = new StructType(),
90+
properties = Map.empty[String, String]
91+
)
92+
val view = View(desc = catalogTable, isTempView = false, child = OneRowRelation())
93+
94+
val sqlConf = View.effectiveSQLConf(view.desc.viewSQLConfigs, view.isTempView)
95+
96+
assert(sqlConf.settings.get("spark.sql.ansi.enabled") == "false")
97+
}
98+
99+
private def testView(expectedAnsiValue: Boolean): Unit = {
100+
sql(s"CREATE VIEW $testViewName AS SELECT CAST('string' AS BIGINT) AS alias")
101+
102+
val viewMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testViewName))
103+
104+
assert(
105+
viewMetadata.properties("view.sqlConfig.spark.sql.ansi.enabled") == expectedAnsiValue.toString
106+
)
107+
}
108+
109+
private def testFunction(expectedAnsiValue: Boolean): Unit = {
110+
sql(
111+
s"""
112+
|CREATE OR REPLACE FUNCTION $testFunctionName()
113+
|RETURN SELECT CAST('string' AS BIGINT) AS alias
114+
|""".stripMargin)
115+
116+
val df = sql(s"select $testFunctionName()")
117+
118+
assert(
119+
df.queryExecution.analyzed.asInstanceOf[Project]
120+
.projectList.head.asInstanceOf[Alias]
121+
.child.asInstanceOf[SQLScalarFunction]
122+
.function.asInstanceOf[SQLFunction]
123+
.properties.get("sqlConfig.spark.sql.ansi.enabled").get == expectedAnsiValue.toString
124+
)
125+
}
126+
}

0 commit comments

Comments
 (0)