diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index b89cac4e8137..79d6b3bc624c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -114,7 +114,8 @@ public class TezSessionState { private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName(); private final HiveConf conf; - private Path tezScratchDir; + @VisibleForTesting + Path tezScratchDir; private LocalResource appJarLr; private TezClient session; private Future sessionFuture; @@ -260,7 +261,8 @@ public void beginOpen(String[] additionalFiles, LogHelper console) openInternal(additionalFiles, true, console, null); } - protected void openInternal(String[] additionalFilesNotFromConf, + @VisibleForTesting + void openInternal(String[] additionalFilesNotFromConf, boolean isAsync, LogHelper console, HiveResources resources) throws IOException, URISyntaxException, TezException { // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two. @@ -272,9 +274,6 @@ protected void openInternal(String[] additionalFilesNotFromConf, this.queueName = confQueueName; this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); - final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar( - conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); - // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ? UserGroupInformation ugi = Utils.getUGI(); user = ugi.getShortUserName(); @@ -292,12 +291,37 @@ protected void openInternal(String[] additionalFilesNotFromConf, LOG.info("Created new resources: " + this.resources); } - // unless already installed on all the cluster nodes, we'll have to - // localize hive-exec.jar as well. + // Unless already installed on all the cluster nodes, we'll have to localize hive-exec.jar as well. appJarLr = createJarLocalResource(utils.getExecJarPathLocal(conf)); - // configuration for the application master - final Map commonLocalResources = new HashMap(); + try { + openInternalUnsafe(isAsync, console); + } catch (Exception e) { + LOG.info("Failed to open session, deleting scratch dir to prevent resource leak...", e); + cleanupScratchDir(); + throw e; + } + } + + /** + * Opens a Tez session without performing a complete rollback/cleanup on failure. + * + *

Callers MUST guard this method with try/catch and perform cleanup + * of partially initialized state (such as localized files in the scratch directory). + * This method is not safe on its own.

+ * + * @param isAsync whether to open the Tez session asynchronously in a separate thread + * @param console a {@link LogHelper} used to log session startup events + * + * @throws TezException if the session fails to start (including failures during + * container launch or session initialization) + * @throws IOException if local resource localization or I/O setup fails + */ + @VisibleForTesting + void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, IOException { + final Map commonLocalResources = new HashMap<>(); + final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr); for (LocalResource lr : this.resources.localizedResources) { commonLocalResources.put(DagUtils.getBaseName(lr), lr); @@ -312,7 +336,7 @@ protected void openInternal(String[] additionalFilesNotFromConf, } // Create environment for AM. - Map amEnv = new HashMap(); + Map amEnv = new HashMap<>(); MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv); // and finally we're ready to create and start the session diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java index 8e48c0f99987..df68a3218a5e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java @@ -16,17 +16,25 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfForTest; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.TezException; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestTezSessionState { + private static final Logger LOG = LoggerFactory.getLogger(TestTezSessionState.class.getName()); @Test public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception { @@ -39,8 +47,7 @@ public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception { Assert.assertTrue(Files.isSymbolicLink(symlinkPath)); - HiveConf hiveConf = new HiveConf(); - hiveConf.set(HiveConf.ConfVars.HIVE_JAR_DIRECTORY.varname, "/tmp"); + HiveConf hiveConf = new HiveConfForTest(getClass()); TezSessionState sessionState = new TezSessionState(DagUtils.getInstance(), hiveConf); @@ -50,4 +57,38 @@ public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception { // local resources point to the same original resource Assert.assertEquals(l1.getResource().toPath(), l2.getResource().toPath()); } + + @Test + public void testScratchDirDeletedInTheEventOfExceptionWhileOpeningSession() throws Exception { + HiveConf hiveConf = new HiveConfForTest(getClass()); + hiveConf.set("hive.security.authorization.manager", + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory"); + SessionState.start(hiveConf); + + final AtomicReference scratchDirPath = new AtomicReference<>(); + + TezSessionState sessionState = new TezSessionState(SessionState.get().getSessionId(), hiveConf) { + @Override + void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) throws TezException, IOException { + super.openInternalUnsafe(isAsync, console); + // save scratch dir here as it's nullified while calling the cleanup + scratchDirPath.set(tezScratchDir.toUri().getPath()); + throw new RuntimeException("fake exception in openInternalUnsafe"); + } + }; + + SessionState.LogHelper console = new SessionState.LogHelper(LoggerFactory.getLogger("TestTezSessionState")); + TezSessionState.HiveResources resources = + new TezSessionState.HiveResources(new org.apache.hadoop.fs.Path("/tmp")); + + try { + sessionState.openInternal(null, false, console, resources); + Assert.fail("An exception should have been thrown while calling openInternal"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().equalsIgnoreCase("fake exception in openInternalUnsafe")); + } + LOG.info("Checking if scratch dir exists: {}", scratchDirPath.get()); + Assert.assertFalse("Scratch dir is not supposed to exist after cleanup: " + scratchDirPath.get(), + Files.exists(Paths.get(scratchDirPath.get()))); + } } \ No newline at end of file