Skip to content

Commit ee619d3

Browse files
vanja-vujovic-dbcloud-fan
authored andcommitted
[SPARK-53260][SQL] Reducing number of JDBC overhead connections creation
### What changes were proposed in this pull request? JDBC connectors open more connections to remote engines than needed. After logging the results, from 2-4 connections open up while doing a sql(...) command. The 2 connections always invoked both come from JDBCTableCatalog, so connection sharing between those 2 is created. ### Why are the changes needed? To reduce query time. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Locally. ### Was this patch authored or co-authored using generative AI tooling? Closes #51991 from vanja-vujovic-db/connections. Authored-by: vanja-vujovic-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 967f2b6 commit ee619d3

File tree

2 files changed

+57
-42
lines changed

2 files changed

+57
-42
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@ object JDBCRDD extends Logging {
5454
* @throws java.sql.SQLException if the table specification is garbage.
5555
* @throws java.sql.SQLException if the table contains an unsupported type.
5656
*/
57-
def resolveTable(options: JDBCOptions): StructType = {
57+
def resolveTable(options: JDBCOptions, conn: Connection): StructType = {
5858
val url = options.url
5959
val prepareQuery = options.prepareQuery
6060
val table = options.tableOrQuery
6161
val dialect = JdbcDialects.get(url)
6262
val fullQuery = prepareQuery + dialect.getSchemaQuery(table)
6363

6464
try {
65-
getQueryOutputSchema(fullQuery, options, dialect)
65+
getQueryOutputSchema(fullQuery, options, dialect, conn)
6666
} catch {
6767
case e: SQLException if dialect.isSyntaxErrorBestEffort(e) =>
6868
throw new SparkException(
@@ -72,20 +72,31 @@ object JDBCRDD extends Logging {
7272
}
7373
}
7474

75+
def resolveTable(options: JDBCOptions): StructType = {
76+
JdbcUtils.withConnection(options) {
77+
resolveTable(options, _)
78+
}
79+
}
80+
7581
def getQueryOutputSchema(
76-
query: String, options: JDBCOptions, dialect: JdbcDialect): StructType = {
77-
Using.resource(dialect.createConnectionFactory(options)(-1)) { conn =>
78-
logInfo(log"Generated JDBC query to get scan output schema: ${MDC(SQL_TEXT, query)}")
79-
Using.resource(conn.prepareStatement(query)) { statement =>
80-
statement.setQueryTimeout(options.queryTimeout)
81-
Using.resource(statement.executeQuery()) { rs =>
82-
JdbcUtils.getSchema(conn, rs, dialect, alwaysNullable = true,
83-
isTimestampNTZ = options.preferTimestampNTZ)
84-
}
82+
query: String, options: JDBCOptions, dialect: JdbcDialect, conn: Connection): StructType = {
83+
logInfo(log"Generated JDBC query to get scan output schema: ${MDC(SQL_TEXT, query)}")
84+
Using.resource(conn.prepareStatement(query)) { statement =>
85+
statement.setQueryTimeout(options.queryTimeout)
86+
Using.resource(statement.executeQuery()) { rs =>
87+
JdbcUtils.getSchema(conn, rs, dialect, alwaysNullable = true,
88+
isTimestampNTZ = options.preferTimestampNTZ)
8589
}
8690
}
8791
}
8892

93+
def getQueryOutputSchema(
94+
query: String, options: JDBCOptions, dialect: JdbcDialect): StructType = {
95+
JdbcUtils.withConnection(options) {
96+
getQueryOutputSchema(query, options, dialect, _)
97+
}
98+
}
99+
89100
/**
90101
* Prune all but the specified columns from the specified Catalyst schema.
91102
*

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2.jdbc
1818

19-
import java.sql.SQLException
19+
import java.sql.{Connection, SQLException}
2020
import java.util
2121

2222
import scala.collection.mutable
@@ -93,19 +93,21 @@ class JDBCTableCatalog extends TableCatalog
9393
}
9494

9595
override def tableExists(ident: Identifier): Boolean = {
96+
JdbcUtils.withConnection(options)(tableExists(ident, _))
97+
}
98+
99+
private def tableExists(ident: Identifier, conn: Connection): Boolean = {
96100
checkNamespace(ident.namespace())
97101
val writeOptions = new JdbcOptionsInWrite(
98102
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
99-
JdbcUtils.withConnection(options) {
100-
JdbcUtils.classifyException(
101-
condition = "FAILED_JDBC.TABLE_EXISTS",
102-
messageParameters = Map(
103-
"url" -> options.getRedactUrl(),
104-
"tableName" -> toSQLId(ident)),
105-
dialect,
106-
description = s"Failed table existence check: $ident",
107-
isRuntime = false)(JdbcUtils.tableExists(_, writeOptions))
108-
}
103+
JdbcUtils.classifyException(
104+
condition = "FAILED_JDBC.TABLE_EXISTS",
105+
messageParameters = Map(
106+
"url" -> options.getRedactUrl(),
107+
"tableName" -> toSQLId(ident)),
108+
dialect,
109+
description = s"Failed table existence check: $ident",
110+
isRuntime = false)(JdbcUtils.tableExists(conn, writeOptions))
109111
}
110112

111113
override def dropTable(ident: Identifier): Boolean = {
@@ -138,28 +140,30 @@ class JDBCTableCatalog extends TableCatalog
138140
}
139141

140142
override def loadTable(ident: Identifier): Table = {
141-
if (!tableExists(ident)) {
142-
throw QueryCompilationErrors.noSuchTableError(ident)
143-
}
143+
JdbcUtils.withConnection(options) { conn =>
144+
if (!tableExists(ident, conn)) {
145+
throw QueryCompilationErrors.noSuchTableError(ident)
146+
}
144147

145-
val optionsWithTableName = new JDBCOptions(
146-
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
147-
JdbcUtils.classifyException(
148-
condition = "FAILED_JDBC.LOAD_TABLE",
149-
messageParameters = Map(
150-
"url" -> options.getRedactUrl(),
151-
"tableName" -> toSQLId(ident)),
152-
dialect,
153-
description = s"Failed to load table: $ident",
154-
isRuntime = false
155-
) {
156-
val remoteSchemaFetchMetric = JdbcUtils
157-
.createSchemaFetchMetric(SparkSession.active.sparkContext)
158-
val schema = SQLMetrics.withTimingNs(remoteSchemaFetchMetric) {
159-
JDBCRDD.resolveTable(optionsWithTableName)
148+
val optionsWithTableName = new JDBCOptions(
149+
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
150+
JdbcUtils.classifyException(
151+
condition = "FAILED_JDBC.LOAD_TABLE",
152+
messageParameters = Map(
153+
"url" -> options.getRedactUrl(),
154+
"tableName" -> toSQLId(ident)),
155+
dialect,
156+
description = s"Failed to load table: $ident",
157+
isRuntime = false
158+
) {
159+
val remoteSchemaFetchMetric = JdbcUtils
160+
.createSchemaFetchMetric(SparkSession.active.sparkContext)
161+
val schema = SQLMetrics.withTimingNs(remoteSchemaFetchMetric) {
162+
JDBCRDD.resolveTable(optionsWithTableName, conn)
163+
}
164+
JDBCTable(ident, schema, optionsWithTableName,
165+
Map(JDBCRelation.schemaFetchKey -> remoteSchemaFetchMetric))
160166
}
161-
JDBCTable(ident, schema, optionsWithTableName,
162-
Map(JDBCRelation.schemaFetchKey -> remoteSchemaFetchMetric))
163167
}
164168
}
165169

0 commit comments

Comments
 (0)