Skip to content

Commit 5712f18

Browse files
Z1Wuwuziyi
authored andcommitted
[feat] 1. add yarn resource manager delegation token support. 2setup HADOOP_TOKEN_FILE_LOCATION env for SparkProcessBuilder.
1 parent ea75fa8 commit 5712f18

File tree

10 files changed

+269
-29
lines changed

10 files changed

+269
-29
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2390,6 +2390,13 @@ object KyuubiConf {
23902390
.booleanConf
23912391
.createWithDefault(true)
23922392

2393+
val ENGINE_EXTERNAL_TOKEN_ENABLED: ConfigEntry[Boolean] =
2394+
buildConf("kyuubi.engine.external.token.enabled")
2395+
.doc("Whether to start Kerberized engine with external delegation tokens.")
2396+
.version("1.11.0")
2397+
.booleanConf
2398+
.createWithDefault(false)
2399+
23932400
val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("kyuubi.engine.share.level")
23942401
.doc("Engines will be shared in different levels, available configs are: <ul>" +
23952402
" <li>CONNECTION: the engine will not be shared but only used by the current client" +

kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@
1717

1818
org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider
1919
org.apache.kyuubi.credentials.HiveDelegationTokenProvider
20+
org.apache.kyuubi.credentials.YarnRMDelegationTokenProvider
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.credentials
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.io.Text
22+
import org.apache.hadoop.security.{Credentials, SecurityUtil}
23+
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
24+
import org.apache.hadoop.yarn.client.ClientRMProxy
25+
import org.apache.hadoop.yarn.client.api.YarnClient
26+
import org.apache.hadoop.yarn.conf.YarnConfiguration
27+
import org.apache.hadoop.yarn.util.ConverterUtils
28+
29+
import org.apache.kyuubi.Logging
30+
import org.apache.kyuubi.config.KyuubiConf
31+
import org.apache.kyuubi.config.KyuubiConf.ENGINE_EXTERNAL_TOKEN_ENABLED
32+
import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.doAsProxyUser
33+
34+
class YarnRMDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
35+
private var yarnConf: YarnConfiguration = _
36+
private var tokenService: Text = _
37+
private var required = false
38+
override def serviceName: String = "yarn"
39+
40+
def getTokenService(): Text = tokenService
41+
42+
// Only support engine and kyuubi server using same hadoop conf
43+
override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit = {
44+
if (SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE) {
45+
yarnConf = new YarnConfiguration(hadoopConf)
46+
tokenService = ClientRMProxy.getRMDelegationTokenService(yarnConf)
47+
required = kyuubiConf.get(ENGINE_EXTERNAL_TOKEN_ENABLED)
48+
}
49+
}
50+
51+
override def delegationTokensRequired(): Boolean = required
52+
53+
override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = {
54+
doAsProxyUser(owner) {
55+
var client: Option[YarnClient] = None
56+
try {
57+
client = Some(YarnClient.createYarnClient())
58+
client.foreach(client => {
59+
client.init(yarnConf)
60+
client.start()
61+
val yarnToken = ConverterUtils.convertFromYarn(
62+
client.getRMDelegationToken(new Text()),
63+
tokenService)
64+
info(s"Get Token from Resource Manager service ${tokenService}, " +
65+
s"token : ${yarnToken.toString}")
66+
creds.addToken(new Text(yarnToken.getService), yarnToken)
67+
})
68+
} catch {
69+
case e: Throwable => error("Error occurs when get delegation token", e)
70+
} finally {
71+
client.foreach(_.close())
72+
}
73+
}
74+
}
75+
}

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@ import scala.collection.JavaConverters._
2727
import com.google.common.collect.EvictingQueue
2828
import org.apache.commons.lang3.StringUtils
2929
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
30+
import org.apache.hadoop.conf.Configuration
3031

3132
import org.apache.kyuubi._
3233
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
3334
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME
35+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
3436
import org.apache.kyuubi.operation.log.OperationLog
35-
import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory}
37+
import org.apache.kyuubi.util.{JavaUtils, KyuubiHadoopUtils, NamedThreadFactory}
3638

37-
trait ProcBuilder {
39+
trait ProcBuilder extends Logging {
3840

3941
import ProcBuilder._
4042

@@ -168,6 +170,7 @@ trait ProcBuilder {
168170
private var logCaptureThread: Thread = _
169171
@volatile private[kyuubi] var process: Process = _
170172
@volatile private[kyuubi] var processLaunched: Boolean = false
173+
@volatile private[kyuubi] var tokenTempDir: java.nio.file.Path = _
171174

172175
// Set engine application manger info conf
173176
conf.set(
@@ -270,6 +273,14 @@ trait ProcBuilder {
270273
Utils.terminateProcess(process, engineStartupDestroyTimeout)
271274
process = null
272275
}
276+
if (tokenTempDir != null) {
277+
try {
278+
Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
279+
} catch {
280+
case e: Throwable =>
281+
error(s"Error deleting token temp dir: $tokenTempDir", e)
282+
}
283+
}
273284
}
274285

275286
def getError: Throwable = synchronized {
@@ -359,6 +370,19 @@ trait ProcBuilder {
359370
def waitEngineCompletion: Boolean = {
360371
!isClusterMode() || conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
361372
}
373+
374+
def generateEngineTokenFile: Option[String] = {
375+
conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
376+
val credentials = KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
377+
tokenTempDir = Utils.createTempDir()
378+
val file = s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
379+
credentials.writeTokenStorageFile(
380+
new org.apache.hadoop.fs.Path(s"file://$file"),
381+
new Configuration())
382+
info(s"Generated hadoop token file: $file")
383+
file
384+
}
385+
}
362386
}
363387

364388
object ProcBuilder extends Logging {

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,15 @@ import scala.collection.mutable
2424

2525
import com.google.common.annotations.VisibleForTesting
2626
import org.apache.commons.lang3.StringUtils
27-
import org.apache.hadoop.conf.Configuration
28-
import org.apache.hadoop.fs.Path
2927
import org.apache.hadoop.security.UserGroupInformation
3028

3129
import org.apache.kyuubi._
3230
import org.apache.kyuubi.config.KyuubiConf
3331
import org.apache.kyuubi.config.KyuubiConf._
34-
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_USER_KEY}
32+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
3533
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
3634
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
3735
import org.apache.kyuubi.operation.log.OperationLog
38-
import org.apache.kyuubi.util.KyuubiHadoopUtils
3936
import org.apache.kyuubi.util.command.CommandLineUtils._
4037

4138
/**
@@ -47,7 +44,7 @@ class FlinkProcessBuilder(
4744
override val conf: KyuubiConf,
4845
val engineRefId: String,
4946
val extraEngineLog: Option[OperationLog] = None)
50-
extends ProcBuilder with Logging {
47+
extends ProcBuilder {
5148

5249
@VisibleForTesting
5350
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
@@ -276,35 +273,20 @@ class FlinkProcessBuilder(
276273
}
277274
}
278275

279-
@volatile private var tokenTempDir: java.nio.file.Path = _
280276
private def generateTokenFile(): Option[(String, String)] = {
281277
if (conf.get(ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE)) {
282278
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token.
283279
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process.
284280
// TODO: Removed this after FLINK-35525 (1.20.0), delegation tokens will be passed
285281
// by `kyuubi` provider
286-
conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
287-
val credentials = KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
288-
tokenTempDir = Utils.createTempDir()
289-
val file = s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
290-
credentials.writeTokenStorageFile(new Path(s"file://$file"), new Configuration())
291-
info(s"Generated hadoop token file: $file")
292-
"HADOOP_TOKEN_FILE_LOCATION" -> file
293-
}
282+
generateEngineTokenFile.map(tokenFile => "HADOOP_TOKEN_FILE_LOCATION" -> tokenFile)
294283
} else {
295284
None
296285
}
297286
}
298287

299288
override def close(destroyProcess: Boolean): Unit = {
300289
super.close(destroyProcess)
301-
if (tokenTempDir != null) {
302-
try {
303-
Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
304-
} catch {
305-
case e: Throwable => error(s"Error deleting token temp dir: $tokenTempDir", e)
306-
}
307-
}
308290
}
309291

310292
override def shortName: String = "flink"

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class SparkProcessBuilder(
5151
override val conf: KyuubiConf,
5252
val engineRefId: String,
5353
val extraEngineLog: Option[OperationLog] = None)
54-
extends ProcBuilder with Logging {
54+
extends ProcBuilder {
5555

5656
@VisibleForTesting
5757
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
@@ -61,13 +61,24 @@ class SparkProcessBuilder(
6161
import SparkProcessBuilder._
6262

6363
private[kyuubi] val sparkHome = getEngineHome(shortName)
64+
private[kyuubi] val externalTokensEnabled = conf.get(ENGINE_EXTERNAL_TOKEN_ENABLED)
6465

6566
override protected val executable: String = {
6667
Paths.get(sparkHome, "bin", SPARK_SUBMIT_FILE).toFile.getCanonicalPath
6768
}
6869

6970
override def mainClass: String = "org.apache.kyuubi.engine.spark.SparkSQLEngine"
7071

72+
override def env: Map[String, String] = {
73+
val extraEnvs: Map[String, String] =
74+
if ((conf.getOption(PRINCIPAL).isEmpty || conf.getOption(KEYTAB).isEmpty)
75+
&& doAsEnabled && externalTokensEnabled) {
76+
Map(ENV_KERBEROS_TGT -> "", ENV_SPARK_PROXY_USER -> proxyUser) ++
77+
generateEngineTokenFile.map(tokenFile => HADOOP_TOKEN_FILE_LOCATION -> tokenFile)
78+
} else Map.empty
79+
conf.getEnvs ++ extraEnvs
80+
}
81+
7182
/**
7283
* Add `spark.master` if KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT
7384
* are defined. So we can deploy spark on kubernetes without setting `spark.master`
@@ -169,8 +180,10 @@ class SparkProcessBuilder(
169180
tryKeytab() match {
170181
case None if doAsEnabled =>
171182
setSparkUserName(proxyUser, buffer)
172-
buffer += PROXY_USER
173-
buffer += proxyUser
183+
if (!externalTokensEnabled) {
184+
buffer += PROXY_USER
185+
buffer += proxyUser
186+
}
174187
case None => // doAs disabled
175188
setSparkUserName(Utils.currentUser, buffer)
176189
case Some(name) =>
@@ -409,6 +422,9 @@ object SparkProcessBuilder {
409422
final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
410423
final val YARN_SUBMIT_WAIT_APP_COMPLETION = "spark.yarn.submit.waitAppCompletion"
411424
final val INTERNAL_RESOURCE = "spark-internal"
425+
final val HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
426+
final val ENV_KERBEROS_TGT = "KRB5CCNAME"
427+
final val ENV_SPARK_PROXY_USER = "HADOOP_PROXY_USER"
412428

413429
final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
414430
final val KUBERNETES_UPLOAD_PATH_PERMISSION =
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.security.UserGroupInformation
22+
23+
import org.apache.kyuubi.config.KyuubiConf
24+
import org.apache.kyuubi.server.MiniYarnService
25+
26+
trait WithSecuredYarnCluster extends KerberizedTestHelper {
27+
28+
private var miniYarnService: MiniYarnService = _
29+
30+
private def newSecuredConf(): Configuration = {
31+
val hdfsConf = new Configuration()
32+
hdfsConf.set("ignore.secure.ports.for.testing", "true")
33+
hdfsConf.set("hadoop.security.authentication", "kerberos")
34+
hdfsConf.set("yarn.resourcemanager.keytab", testKeytab)
35+
hdfsConf.set("yarn.resourcemanager.principal", testPrincipal)
36+
37+
hdfsConf.set("yarn.nodemanager.keytab", testPrincipal)
38+
hdfsConf.set("yarn.nodemanager.principal", testKeytab)
39+
40+
hdfsConf
41+
}
42+
43+
override def beforeAll(): Unit = {
44+
super.beforeAll()
45+
tryWithSecurityEnabled {
46+
UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
47+
miniYarnService = new MiniYarnService(newSecuredConf())
48+
miniYarnService.initialize(new KyuubiConf(false))
49+
miniYarnService.start()
50+
}
51+
}
52+
53+
override def afterAll(): Unit = {
54+
miniYarnService.stop()
55+
super.afterAll()
56+
}
57+
58+
def getHadoopConf: Configuration = miniYarnService.getYarnConf
59+
def getHadoopConfDir: String = miniYarnService.getYarnConfDir
60+
}

0 commit comments

Comments
 (0)