Skip to content

Commit 5cac81e

Browse files
committed
HIVE-29302: Wider Tez session open scope should be guarded to prevent local resource leaks
1 parent c18d0df commit 5cac81e

File tree

2 files changed

+77
-12
lines changed

2 files changed

+77
-12
lines changed

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public class TezSessionState {
114114
private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName();
115115

116116
private final HiveConf conf;
117-
private Path tezScratchDir;
117+
@VisibleForTesting
118+
Path tezScratchDir;
118119
private LocalResource appJarLr;
119120
private TezClient session;
120121
private Future<TezClient> sessionFuture;
@@ -260,7 +261,8 @@ public void beginOpen(String[] additionalFiles, LogHelper console)
260261
openInternal(additionalFiles, true, console, null);
261262
}
262263

263-
protected void openInternal(String[] additionalFilesNotFromConf,
264+
@VisibleForTesting
265+
void openInternal(String[] additionalFilesNotFromConf,
264266
boolean isAsync, LogHelper console, HiveResources resources)
265267
throws IOException, URISyntaxException, TezException {
266268
// TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
@@ -272,9 +274,6 @@ protected void openInternal(String[] additionalFilesNotFromConf,
272274
this.queueName = confQueueName;
273275
this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
274276

275-
final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(
276-
conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
277-
278277
// TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ?
279278
UserGroupInformation ugi = Utils.getUGI();
280279
user = ugi.getShortUserName();
@@ -292,12 +291,37 @@ protected void openInternal(String[] additionalFilesNotFromConf,
292291
LOG.info("Created new resources: " + this.resources);
293292
}
294293

295-
// unless already installed on all the cluster nodes, we'll have to
296-
// localize hive-exec.jar as well.
294+
// Unless already installed on all the cluster nodes, we'll have to localize hive-exec.jar as well.
297295
appJarLr = createJarLocalResource(utils.getExecJarPathLocal(conf));
298296

299-
// configuration for the application master
300-
final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
297+
try {
298+
openInternalUnsafe(isAsync, console);
299+
} catch (Exception e) {
300+
LOG.info("Failed to open session, deleting scratch dir to prevent resource leak...", e);
301+
cleanupScratchDir();
302+
throw e;
303+
}
304+
}
305+
306+
/**
307+
* Opens a Tez session without performing a complete rollback/cleanup on failure.
308+
*
309+
* <p><strong>Callers MUST guard this method with try/catch and perform cleanup</strong>
310+
* of partially initialized state (such as localized files in the scratch directory).
311+
* This method is not safe on its own.</p>
312+
*
313+
* @param isAsync whether to open the Tez session asynchronously in a separate thread
314+
* @param console a {@link LogHelper} used to log session startup events
315+
*
316+
* @throws TezException if the session fails to start (including failures during
317+
* container launch or session initialization)
318+
* @throws IOException if local resource localization or I/O setup fails
319+
*/
320+
@VisibleForTesting
321+
void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, IOException {
322+
final Map<String, LocalResource> commonLocalResources = new HashMap<>();
323+
final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
324+
301325
commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
302326
for (LocalResource lr : this.resources.localizedResources) {
303327
commonLocalResources.put(DagUtils.getBaseName(lr), lr);
@@ -312,7 +336,7 @@ protected void openInternal(String[] additionalFilesNotFromConf,
312336
}
313337

314338
// Create environment for AM.
315-
Map<String, String> amEnv = new HashMap<String, String>();
339+
Map<String, String> amEnv = new HashMap<>();
316340
MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv);
317341

318342
// and finally we're ready to create and start the session

ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,25 @@
1616
*/
1717
package org.apache.hadoop.hive.ql.exec.tez;
1818

19+
import java.io.IOException;
1920
import java.nio.file.Files;
2021
import java.nio.file.Path;
2122
import java.nio.file.Paths;
2223
import java.nio.file.StandardOpenOption;
24+
import java.util.concurrent.atomic.AtomicReference;
2325

2426
import org.apache.hadoop.hive.conf.HiveConf;
27+
import org.apache.hadoop.hive.conf.HiveConfForTest;
28+
import org.apache.hadoop.hive.ql.session.SessionState;
2529
import org.apache.hadoop.yarn.api.records.LocalResource;
30+
import org.apache.tez.dag.api.TezException;
2631
import org.junit.Assert;
2732
import org.junit.Test;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
2835

2936
public class TestTezSessionState {
37+
private static final Logger LOG = LoggerFactory.getLogger(TestTezSessionState.class.getName());
3038

3139
@Test
3240
public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception {
@@ -39,8 +47,7 @@ public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception {
3947

4048
Assert.assertTrue(Files.isSymbolicLink(symlinkPath));
4149

42-
HiveConf hiveConf = new HiveConf();
43-
hiveConf.set(HiveConf.ConfVars.HIVE_JAR_DIRECTORY.varname, "/tmp");
50+
HiveConf hiveConf = new HiveConfForTest(getClass());
4451

4552
TezSessionState sessionState = new TezSessionState(DagUtils.getInstance(), hiveConf);
4653

@@ -50,4 +57,38 @@ public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception {
5057
// local resources point to the same original resource
5158
Assert.assertEquals(l1.getResource().toPath(), l2.getResource().toPath());
5259
}
60+
61+
@Test
62+
public void testScratchDirDeletedInTheEventOfExceptionWhileOpeningSession() throws Exception {
63+
HiveConf hiveConf = new HiveConfForTest(getClass());
64+
hiveConf.set("hive.security.authorization.manager",
65+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory");
66+
SessionState.start(hiveConf);
67+
68+
final AtomicReference<String> scratchDirPath = new AtomicReference<>();
69+
70+
TezSessionState sessionState = new TezSessionState(SessionState.get().getSessionId(), hiveConf) {
71+
@Override
72+
void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) throws TezException, IOException {
73+
super.openInternalUnsafe(isAsync, console);
74+
// save scratch dir here as it's nullified while calling the cleanup
75+
scratchDirPath.set(tezScratchDir.toUri().getPath());
76+
throw new RuntimeException("fake exception in openInternalUnsafe");
77+
}
78+
};
79+
80+
SessionState.LogHelper console = new SessionState.LogHelper(LoggerFactory.getLogger("TestTezSessionState"));
81+
TezSessionState.HiveResources resources =
82+
new TezSessionState.HiveResources(new org.apache.hadoop.fs.Path("/tmp"));
83+
84+
try {
85+
sessionState.openInternal(null, false, console, resources);
86+
Assert.fail("An exception should have been thrown while calling openInternal");
87+
} catch (Exception e) {
88+
Assert.assertTrue(e.getMessage().equalsIgnoreCase("fake exception in openInternalUnsafe"));
89+
}
90+
LOG.info("Checking if scratch dir exists: {}", scratchDirPath.get());
91+
Assert.assertFalse("Scratch dir is not supposed to exist after cleanup: " + scratchDirPath.get(),
92+
Files.exists(Paths.get(scratchDirPath.get())));
93+
}
5394
}

0 commit comments

Comments
 (0)