Skip to content

Commit b3377a7

Browse files
committed
[feat] 1. setup HADOOP_TOKEN_FILE_LOCATION env for SparkProcessBuilder 2. minor refactors
1 parent ea75fa8 commit b3377a7

File tree

5 files changed

+92
-27
lines changed

5 files changed

+92
-27
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2390,6 +2390,13 @@ object KyuubiConf {
23902390
.booleanConf
23912391
.createWithDefault(true)
23922392

2393+
val ENGINE_EXTERNAL_ENABLED: ConfigEntry[Boolean] =
2394+
buildConf("kyuubi.engine.external.token.enabled")
2395+
.doc("start kerberos-enabled application with external delegation tokens")
2396+
.version("1.9.0")
2397+
.booleanConf
2398+
.createWithDefault(true)
2399+
23932400
val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("kyuubi.engine.share.level")
23942401
.doc("Engines will be shared in different levels, available configs are: <ul>" +
23952402
" <li>CONNECTION: the engine will not be shared but only used by the current client" +

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@ import scala.collection.JavaConverters._
2727
import com.google.common.collect.EvictingQueue
2828
import org.apache.commons.lang3.StringUtils
2929
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
30+
import org.apache.hadoop.conf.Configuration
3031

3132
import org.apache.kyuubi._
3233
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
3334
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME
35+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
3436
import org.apache.kyuubi.operation.log.OperationLog
35-
import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory}
37+
import org.apache.kyuubi.util.{JavaUtils, KyuubiHadoopUtils, NamedThreadFactory}
3638

37-
trait ProcBuilder {
39+
trait ProcBuilder extends Logging {
3840

3941
import ProcBuilder._
4042

@@ -168,6 +170,7 @@ trait ProcBuilder {
168170
private var logCaptureThread: Thread = _
169171
@volatile private[kyuubi] var process: Process = _
170172
@volatile private[kyuubi] var processLaunched: Boolean = false
173+
@volatile private[kyuubi] var tokenTempDir: java.nio.file.Path = _
171174

172175
// Set engine application manger info conf
173176
conf.set(
@@ -270,6 +273,14 @@ trait ProcBuilder {
270273
Utils.terminateProcess(process, engineStartupDestroyTimeout)
271274
process = null
272275
}
276+
if (tokenTempDir != null) {
277+
try {
278+
Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
279+
} catch {
280+
case e: Throwable =>
281+
error(s"Error deleting token temp dir: $tokenTempDir", e)
282+
}
283+
}
273284
}
274285

275286
def getError: Throwable = synchronized {
@@ -359,6 +370,19 @@ trait ProcBuilder {
359370
def waitEngineCompletion: Boolean = {
360371
!isClusterMode() || conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
361372
}
373+
374+
def generateEngineTokenFile: Option[String] = {
375+
conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
376+
val credentials = KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
377+
tokenTempDir = Utils.createTempDir()
378+
val file = s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
379+
credentials.writeTokenStorageFile(
380+
new org.apache.hadoop.fs.Path(s"file://$file"),
381+
new Configuration())
382+
info(s"Generated hadoop token file: $file")
383+
file
384+
}
385+
}
362386
}
363387

364388
object ProcBuilder extends Logging {

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,15 @@ import scala.collection.mutable
2424

2525
import com.google.common.annotations.VisibleForTesting
2626
import org.apache.commons.lang3.StringUtils
27-
import org.apache.hadoop.conf.Configuration
28-
import org.apache.hadoop.fs.Path
2927
import org.apache.hadoop.security.UserGroupInformation
3028

3129
import org.apache.kyuubi._
3230
import org.apache.kyuubi.config.KyuubiConf
3331
import org.apache.kyuubi.config.KyuubiConf._
34-
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_USER_KEY}
32+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
3533
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
3634
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
3735
import org.apache.kyuubi.operation.log.OperationLog
38-
import org.apache.kyuubi.util.KyuubiHadoopUtils
3936
import org.apache.kyuubi.util.command.CommandLineUtils._
4037

4138
/**
@@ -47,7 +44,7 @@ class FlinkProcessBuilder(
4744
override val conf: KyuubiConf,
4845
val engineRefId: String,
4946
val extraEngineLog: Option[OperationLog] = None)
50-
extends ProcBuilder with Logging {
47+
extends ProcBuilder {
5148

5249
@VisibleForTesting
5350
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
@@ -276,35 +273,20 @@ class FlinkProcessBuilder(
276273
}
277274
}
278275

279-
@volatile private var tokenTempDir: java.nio.file.Path = _
280276
private def generateTokenFile(): Option[(String, String)] = {
281277
if (conf.get(ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE)) {
282278
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token.
283279
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process.
284280
// TODO: Removed this after FLINK-35525 (1.20.0), delegation tokens will be passed
285281
// by `kyuubi` provider
286-
conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
287-
val credentials = KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
288-
tokenTempDir = Utils.createTempDir()
289-
val file = s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
290-
credentials.writeTokenStorageFile(new Path(s"file://$file"), new Configuration())
291-
info(s"Generated hadoop token file: $file")
292-
"HADOOP_TOKEN_FILE_LOCATION" -> file
293-
}
282+
generateEngineTokenFile.map(tokenFile => "HADOOP_TOKEN_FILE_LOCATION" -> tokenFile)
294283
} else {
295284
None
296285
}
297286
}
298287

299288
override def close(destroyProcess: Boolean): Unit = {
300289
super.close(destroyProcess)
301-
if (tokenTempDir != null) {
302-
try {
303-
Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
304-
} catch {
305-
case e: Throwable => error(s"Error deleting token temp dir: $tokenTempDir", e)
306-
}
307-
}
308290
}
309291

310292
override def shortName: String = "flink"

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class SparkProcessBuilder(
5151
override val conf: KyuubiConf,
5252
val engineRefId: String,
5353
val extraEngineLog: Option[OperationLog] = None)
54-
extends ProcBuilder with Logging {
54+
extends ProcBuilder {
5555

5656
@VisibleForTesting
5757
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
@@ -61,13 +61,24 @@ class SparkProcessBuilder(
6161
import SparkProcessBuilder._
6262

6363
private[kyuubi] val sparkHome = getEngineHome(shortName)
64+
private[kyuubi] val externalTokensEnabled = conf.get(ENGINE_EXTERNAL_ENABLED)
6465

6566
override protected val executable: String = {
6667
Paths.get(sparkHome, "bin", SPARK_SUBMIT_FILE).toFile.getCanonicalPath
6768
}
6869

6970
override def mainClass: String = "org.apache.kyuubi.engine.spark.SparkSQLEngine"
7071

72+
override def env: Map[String, String] = {
73+
val extraEnvs: Map[String, String] =
74+
if ((conf.getOption(PRINCIPAL).isEmpty || conf.getOption(KEYTAB).isEmpty)
75+
&& doAsEnabled && externalTokensEnabled) {
76+
Map(ENV_KERBEROS_TGT -> "", ENV_SPARK_PROXY_USER -> proxyUser) ++
77+
generateEngineTokenFile.map(tokenFile => HADOOP_TOKEN_FILE_LOCATION -> tokenFile)
78+
} else Map.empty
79+
conf.getEnvs ++ extraEnvs
80+
}
81+
7182
/**
7283
* Add `spark.master` if KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT
7384
* are defined. So we can deploy spark on kubernetes without setting `spark.master`
@@ -169,8 +180,10 @@ class SparkProcessBuilder(
169180
tryKeytab() match {
170181
case None if doAsEnabled =>
171182
setSparkUserName(proxyUser, buffer)
172-
buffer += PROXY_USER
173-
buffer += proxyUser
183+
if (!externalTokensEnabled) {
184+
buffer += PROXY_USER
185+
buffer += proxyUser
186+
}
174187
case None => // doAs disabled
175188
setSparkUserName(Utils.currentUser, buffer)
176189
case Some(name) =>
@@ -409,6 +422,9 @@ object SparkProcessBuilder {
409422
final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
410423
final val YARN_SUBMIT_WAIT_APP_COMPLETION = "spark.yarn.submit.waitAppCompletion"
411424
final val INTERNAL_RESOURCE = "spark-internal"
425+
final val HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
426+
final val ENV_KERBEROS_TGT = "KRB5CCNAME"
427+
final val ENV_SPARK_PROXY_USER = "HADOOP_PROXY_USER"
412428

413429
final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
414430
final val KUBERNETES_UPLOAD_PATH_PERMISSION =

kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ import org.scalatestplus.mockito.MockitoSugar
3030
import org.apache.kyuubi._
3131
import org.apache.kyuubi.config.KyuubiConf
3232
import org.apache.kyuubi.config.KyuubiConf._
33+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
3334
import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
34-
import org.apache.kyuubi.engine.spark.SparkProcessBuilder._
35+
import org.apache.kyuubi.engine.spark.SparkProcessBuilder.{PROXY_USER, _}
3536
import org.apache.kyuubi.ha.HighAvailabilityConf
3637
import org.apache.kyuubi.ha.client.AuthTypes
3738
import org.apache.kyuubi.service.ServiceUtils
@@ -484,6 +485,41 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
484485
val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
485486
assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady"))
486487
}
488+
489+
test("spark engine with external token file") {
490+
val conf1 = KyuubiConf(false)
491+
val tokenStr = "SERUUwAID2hhLWhkZnM6cmVkcG9sbD8ABmhhZG9vcAAfaGl2ZS9oaXZl" +
492+
"c2VydmVyQE5JRS5ORVRFQVNFLkNPTYoBmEC/zomKAZhkzFKJjEQmld6OB4YUpNYg2W5h9" +
493+
"+2R2gWTPCf0Ut/Qr9QVSERGU19ERUxFR0FUSU9OX1RPS0VOD2hhLWhkZnM6cmVkcG9sbB" +
494+
"FoYS1oZGZzOmZlbmdodWFuZz8ABmhhZG9vcAAfaGl2ZS9oaXZlc2VydmVyQE5JRS5ORVR" +
495+
"FQVNFLkNPTYoBmEC/zu2KAZhkzFLtjCqL5mKOEtAUbr+Tl2CACTcgnvac9RcxZAoMq28V" +
496+
"SERGU19ERUxFR0FUSU9OX1RPS0VOEWhhLWhkZnM6ZmVuZ2h1YW5ndnRocmlmdDovL3NkY" +
497+
"y5mZW5naHVhbmcud2FnZ2xlZGFuY2UuZ2RjLm5pZS5uZXRlYXNlLmNvbToyMDA1MCx0aH" +
498+
"JpZnQ6Ly8xMC4xOTEuNTkuMTYyOjIwMDUwLHRocmlmdDovLzEwLjE5MS41OS4xNjM6MjA" +
499+
"wNTApAAZoYWRvb3AEaGl2ZQRoaXZligGYQL/PVYoBmGTMU1WMAVM2x40BBgsUIKFw7GME" +
500+
"1U6N/ZhxObek/I1RA0sVSElWRV9ERUxFR0FUSU9OX1RPS0VOAA9oYS1oZGZzOmRpamlhb" +
501+
"mc/AAZoYWRvb3AAH2hpdmUvaGl2ZXNlcnZlckBOSUUuTkVURUFTRS5DT02KAZhAv89Hig" +
502+
"GYZMxTR4wBz4hHjgGVFOfgiMNZ/B2acCmYkV7P0euzVBXWFUhERlNfREVMRUdBVElPTl9" +
503+
"UT0tFTg9oYS1oZGZzOmRpamlhbmcPaGEtaGRmczpqaW5nd2VpPwAGaGFkb29wAB9oaXZl" +
504+
"L2hpdmVzZXJ2ZXJATklFLk5FVEVBU0UuQ09NigGYQL/PIYoBmGTMUyGMIPu5VY4WwBQH9" +
505+
"xpTeUCxroeNIOCW0908k1ZbSBVIREZTX0RFTEVHQVRJT05fVE9LRU4PaGEtaGRmczpqaW" +
506+
"5nd2VpEWhhLWhkZnM6aHVhbmdsb25nPwAGaGFkb29wAB9oaXZlL2hpdmVzZXJ2ZXJATkl" +
507+
"FLk5FVEVBU0UuQ09NigGYQL/PAIoBmGTMUwCMHv0cmI4bCBQOKo3I+WxPcz+nrXlGdbZ/" +
508+
"nCPKyRVIREZTX0RFTEVHQVRJT05fVE9LRU4RaGEtaGRmczpodWFuZ2xvbmcSaGEtaGRmc" +
509+
"zpnZGNjbHVzdGVyPwAGaGFkb29wAB9oaXZlL2hpdmVzZXJ2ZXJATklFLk5FVEVBU0UuQ0" +
510+
"9NigGYQL/OvYoBmGTMUr2Ef72TD44URBSQajq5T6KZuMgk7cFuIHz7PAvGixVIREZTX0R" +
511+
"FTEVHQVRJT05fVE9LRU4SaGEtaGRmczpnZGNjbHVzdGVyE2hhLWhkZnM6Z2RjY2x1c3Rl" +
512+
"cjI/AAZoYWRvb3AAH2hpdmUvaGl2ZXNlcnZlckBOSUUuTkVURUFTRS5DT02KAZhAv88Qi" +
513+
"gGYZMxTEIwSFo7AjgepFEwXJHRr7YVk7fxXxfRcbElkLI1QFUhERlNfREVMRUdBVElPTl" +
514+
"9UT0tFThNoYS1oZGZzOmdkY2NsdXN0ZXIyAA=="
515+
conf1.set(KYUUBI_ENGINE_CREDENTIALS_KEY, tokenStr)
516+
conf1.set(ENGINE_EXTERNAL_ENABLED, true)
517+
val builder1 = new SparkProcessBuilder("", true, conf1)
518+
assert(builder1.env.contains(HADOOP_TOKEN_FILE_LOCATION))
519+
assert(builder1.env.contains(ENV_KERBEROS_TGT) &&
520+
builder1.env(ENV_KERBEROS_TGT).isEmpty)
521+
assert(builder1.commands.forall(e => !e.contains(PROXY_USER)))
522+
}
487523
}
488524

489525
class FakeSparkProcessBuilder(config: KyuubiConf)

0 commit comments

Comments
 (0)