Skip to content

Commit c2f235b

Browse files
committed
HIVE-29302: Wider Tez session open scope should be guarded to prevent local resource leaks
1 parent c18d0df commit c2f235b

File tree

1 file changed

+46
-28
lines changed

1 file changed

+46
-28
lines changed

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,6 @@ protected void openInternal(String[] additionalFilesNotFromConf,
272272
this.queueName = confQueueName;
273273
this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
274274

275-
final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(
276-
conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
277-
278275
// TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ?
279276
UserGroupInformation ugi = Utils.getUGI();
280277
user = ugi.getShortUserName();
@@ -292,12 +289,36 @@ protected void openInternal(String[] additionalFilesNotFromConf,
292289
LOG.info("Created new resources: " + this.resources);
293290
}
294291

295-
// unless already installed on all the cluster nodes, we'll have to
296-
// localize hive-exec.jar as well.
292+
// unless already installed on all the cluster nodes, we'll have to localize hive-exec.jar as well.
297293
appJarLr = createJarLocalResource(utils.getExecJarPathLocal(conf));
298294

299-
// configuration for the application master
300-
final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
295+
try {
296+
openInternalUnsafe(isAsync, console);
297+
} catch (Exception e) {
298+
LOG.info("Failed to open session, deleting scratch dir to prevent LocalResource leak...", e);
299+
cleanupScratchDir();
300+
throw e;
301+
}
302+
}
303+
304+
/**
305+
* Opens a Tez session without performing a complete rollback/cleanup on failure.
306+
*
307+
* <p><strong>Callers MUST guard this method with try/catch and perform cleanup</strong>
308+
* of partially initialized state (such as localized files in the scratch directory).
309+
* This method is not safe on its own.</p>
310+
*
311+
* @param isAsync whether to open the Tez session asynchronously in a separate thread
312+
* @param console a {@link LogHelper} used to log session startup events
313+
*
314+
* @throws TezException if the session fails to start (including failures during
315+
* container launch or session initialization)
316+
* @throws IOException if local resource localization or I/O setup fails
317+
*/
318+
private void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, IOException {
319+
final Map<String, LocalResource> commonLocalResources = new HashMap<>();
320+
final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
321+
301322
commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
302323
for (LocalResource lr : this.resources.localizedResources) {
303324
commonLocalResources.put(DagUtils.getBaseName(lr), lr);
@@ -312,7 +333,7 @@ protected void openInternal(String[] additionalFilesNotFromConf,
312333
}
313334

314335
// Create environment for AM.
315-
Map<String, String> amEnv = new HashMap<String, String>();
336+
Map<String, String> amEnv = new HashMap<>();
316337
MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv);
317338

318339
// and finally we're ready to create and start the session
@@ -383,27 +404,24 @@ protected void openInternal(String[] additionalFilesNotFromConf,
383404
startSessionAndContainers(session, conf, commonLocalResources, tezConfig, false);
384405
this.session = session;
385406
} else {
386-
FutureTask<TezClient> sessionFuture = new FutureTask<>(new Callable<TezClient>() {
387-
@Override
388-
public TezClient call() throws Exception {
389-
TezClient result = null;
390-
try {
391-
result = startSessionAndContainers(
392-
session, conf, commonLocalResources, tezConfig, true);
393-
} catch (Throwable t) {
394-
// The caller has already stopped the session.
395-
LOG.error("Failed to start Tez session", t);
396-
throw (t instanceof Exception) ? (Exception)t : new Exception(t);
397-
}
398-
// Check interrupt at the last moment in case we get cancelled quickly.
399-
// This is not bulletproof but should allow us to close session in most cases.
400-
if (Thread.interrupted()) {
401-
LOG.info("Interrupted while starting Tez session");
402-
closeAndIgnoreExceptions(result);
403-
return null;
404-
}
405-
return result;
407+
FutureTask<TezClient> sessionFuture = new FutureTask<>(() -> {
408+
TezClient result = null;
409+
try {
410+
result = startSessionAndContainers(
411+
session, conf, commonLocalResources, tezConfig, true);
412+
} catch (Throwable t) {
413+
// The caller has already stopped the session.
414+
LOG.error("Failed to start Tez session", t);
415+
throw (t instanceof Exception) ? (Exception)t : new Exception(t);
416+
}
417+
// Check interrupt at the last moment in case we get cancelled quickly.
418+
// This is not bulletproof but should allow us to close session in most cases.
419+
if (Thread.interrupted()) {
420+
LOG.info("Interrupted while starting Tez session");
421+
closeAndIgnoreExceptions(result);
422+
return null;
406423
}
424+
return result;
407425
});
408426
new Thread(sessionFuture, "Tez session start thread").start();
409427
// We assume here nobody will try to get session before open() returns.

0 commit comments

Comments
 (0)