Skip to content

Commit 3d9dde7

Browse files
Z1Wuwuziyi
authored andcommitted
[feat] add yarn resource manager delegation token support
1 parent b3377a7 commit 3d9dde7

File tree

5 files changed

+196
-2
lines changed

5 files changed

+196
-2
lines changed

kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@
1717

1818
org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider
1919
org.apache.kyuubi.credentials.HiveDelegationTokenProvider
20+
org.apache.kyuubi.credentials.YarnRMDelegationTokenProvider
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.credentials
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.io.Text
22+
import org.apache.hadoop.security.{Credentials, SecurityUtil}
23+
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
24+
import org.apache.hadoop.yarn.client.ClientRMProxy
25+
import org.apache.hadoop.yarn.client.api.YarnClient
26+
import org.apache.hadoop.yarn.conf.YarnConfiguration
27+
import org.apache.hadoop.yarn.util.ConverterUtils
28+
29+
import org.apache.kyuubi.Logging
30+
import org.apache.kyuubi.config.KyuubiConf
31+
import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.doAsProxyUser
32+
33+
class YarnRMDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
34+
private var yarnConf: YarnConfiguration = _
35+
private var tokenService: Text = _
36+
private var required = false
37+
override def serviceName: String = "yarn"
38+
39+
def getTokenService(): Text = tokenService
40+
41+
// Only support engine and kyuubi server using same hadoop conf
42+
override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit = {
43+
if (SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE) {
44+
yarnConf = new YarnConfiguration(hadoopConf)
45+
tokenService = ClientRMProxy.getRMDelegationTokenService(yarnConf)
46+
required = true
47+
}
48+
}
49+
50+
override def delegationTokensRequired(): Boolean = required
51+
52+
override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = {
53+
doAsProxyUser(owner) {
54+
var client: Option[YarnClient] = None
55+
try {
56+
client = Some(YarnClient.createYarnClient())
57+
client.foreach(client => {
58+
client.init(yarnConf)
59+
client.start()
60+
val yarnToken = ConverterUtils.convertFromYarn(
61+
client.getRMDelegationToken(new Text()),
62+
tokenService)
63+
info(s"Get Token from Resource Manager service ${tokenService}, " +
64+
s"token : ${yarnToken.toString}")
65+
creds.addToken(new Text(yarnToken.getService), yarnToken)
66+
})
67+
} catch {
68+
case e: Throwable => error("Error occurs when get delegation token", e)
69+
} finally {
70+
client.foreach(_.close())
71+
}
72+
}
73+
}
74+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.security.UserGroupInformation
22+
23+
import org.apache.kyuubi.config.KyuubiConf
24+
import org.apache.kyuubi.server.MiniYarnService
25+
26+
trait WithSecuredYarnCluster extends KerberizedTestHelper {
27+
28+
private var miniYarnService: MiniYarnService = _
29+
30+
private def newSecuredConf(): Configuration = {
31+
val hdfsConf = new Configuration()
32+
hdfsConf.set("ignore.secure.ports.for.testing", "true")
33+
hdfsConf.set("hadoop.security.authentication", "kerberos")
34+
hdfsConf.set("yarn.resourcemanager.keytab", testKeytab)
35+
hdfsConf.set("yarn.resourcemanager.principal", testPrincipal)
36+
37+
hdfsConf.set("yarn.nodemanager.keytab", testPrincipal)
38+
hdfsConf.set("yarn.nodemanager.principal", testKeytab)
39+
40+
hdfsConf
41+
}
42+
43+
override def beforeAll(): Unit = {
44+
super.beforeAll()
45+
tryWithSecurityEnabled {
46+
UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
47+
miniYarnService = new MiniYarnService(newSecuredConf())
48+
miniYarnService.initialize(new KyuubiConf(false))
49+
miniYarnService.start()
50+
}
51+
}
52+
53+
override def afterAll(): Unit = {
54+
miniYarnService.stop()
55+
super.afterAll()
56+
}
57+
58+
def getHadoopConf: Configuration = miniYarnService.getYarnConf
59+
def getHadoopConfDir: String = miniYarnService.getYarnConfDir
60+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.credentials
19+
20+
import org.apache.hadoop.io.Text
21+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
22+
import org.apache.hadoop.security.token.Token
23+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
24+
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier
25+
26+
import org.apache.kyuubi.WithSecuredYarnCluster
27+
import org.apache.kyuubi.config.KyuubiConf
28+
29+
class YarnDelegationTokenProviderSuite extends WithSecuredYarnCluster {
30+
31+
test("obtain yarn rm delegation tokens") {
32+
tryWithSecurityEnabled {
33+
UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
34+
35+
val hadoopConf = getHadoopConf
36+
val kyuubiConf = new KyuubiConf(false)
37+
38+
val provider = new YarnRMDelegationTokenProvider
39+
provider.initialize(hadoopConf, kyuubiConf)
40+
assert(provider.delegationTokensRequired())
41+
42+
val owner = "who"
43+
val credentials = new Credentials()
44+
provider.obtainDelegationTokens(owner, credentials)
45+
46+
val token = credentials
47+
.getToken(provider.getTokenService())
48+
.asInstanceOf[Token[AbstractDelegationTokenIdentifier]]
49+
assert(token != null)
50+
51+
val tokenIdent = token.decodeIdentifier()
52+
assertResult(RMDelegationTokenIdentifier.KIND_NAME)(token.getKind)
53+
assertResult(new Text(owner))(tokenIdent.getOwner)
54+
val currentUserName = UserGroupInformation.getCurrentUser.getUserName
55+
assertResult(new Text(currentUserName))(tokenIdent.getRealUser)
56+
}
57+
}
58+
}

kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ import org.apache.kyuubi.Utils
3131
import org.apache.kyuubi.config.KyuubiConf
3232
import org.apache.kyuubi.service.AbstractService
3333

34-
class MiniYarnService extends AbstractService("TestMiniYarnService") {
34+
class MiniYarnService(configuration: Configuration = new Configuration())
35+
extends AbstractService("TestMiniYarnService") {
3536

3637
private val yarnConfDir: File = Utils.createTempDir().toFile
3738
private var yarnConf: YarnConfiguration = {
38-
val yarnConfig = new YarnConfiguration()
39+
val yarnConfig = new YarnConfiguration(configuration)
3940
// Disable the disk utilization check to avoid the test hanging when people's disks are
4041
// getting full.
4142
yarnConfig.set(

0 commit comments

Comments
 (0)