Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public class TezSessionState {
private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName();

private final HiveConf conf;
private Path tezScratchDir;
@VisibleForTesting
Path tezScratchDir;
private LocalResource appJarLr;
private TezClient session;
private Future<TezClient> sessionFuture;
Expand Down Expand Up @@ -260,7 +261,8 @@ public void beginOpen(String[] additionalFiles, LogHelper console)
openInternal(additionalFiles, true, console, null);
}

protected void openInternal(String[] additionalFilesNotFromConf,
@VisibleForTesting
void openInternal(String[] additionalFilesNotFromConf,
boolean isAsync, LogHelper console, HiveResources resources)
throws IOException, URISyntaxException, TezException {
// TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
Expand All @@ -272,9 +274,6 @@ protected void openInternal(String[] additionalFilesNotFromConf,
this.queueName = confQueueName;
this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);

final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(
conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));

// TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ?
UserGroupInformation ugi = Utils.getUGI();
user = ugi.getShortUserName();
Expand All @@ -292,12 +291,37 @@ protected void openInternal(String[] additionalFilesNotFromConf,
LOG.info("Created new resources: " + this.resources);
}

// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
// Unless already installed on all the cluster nodes, we'll have to localize hive-exec.jar as well.
appJarLr = createJarLocalResource(utils.getExecJarPathLocal(conf));

// configuration for the application master
final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
try {
openInternalUnsafe(isAsync, console);
} catch (Exception e) {
LOG.info("Failed to open session, deleting scratch dir to prevent resource leak...", e);
cleanupScratchDir();
throw e;
}
}

/**
* Opens a Tez session without performing a complete rollback/cleanup on failure.
*
* <p><strong>Callers MUST guard this method with try/catch and perform cleanup</strong>
* of partially initialized state (such as localized files in the scratch directory).
* This method is not safe on its own.</p>
*
* @param isAsync whether to open the Tez session asynchronously in a separate thread
* @param console a {@link LogHelper} used to log session startup events
*
* @throws TezException if the session fails to start (including failures during
* container launch or session initialization)
* @throws IOException if local resource localization or I/O setup fails
*/
@VisibleForTesting
void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, IOException {
final Map<String, LocalResource> commonLocalResources = new HashMap<>();
final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));

commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
for (LocalResource lr : this.resources.localizedResources) {
commonLocalResources.put(DagUtils.getBaseName(lr), lr);
Expand All @@ -312,7 +336,7 @@ protected void openInternal(String[] additionalFilesNotFromConf,
}

// Create environment for AM.
Map<String, String> amEnv = new HashMap<String, String>();
Map<String, String> amEnv = new HashMap<>();
MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv);

// and finally we're ready to create and start the session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.dag.api.TezException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestTezSessionState {
private static final Logger LOG = LoggerFactory.getLogger(TestTezSessionState.class.getName());

@Test
public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception {
Expand All @@ -39,8 +47,7 @@ public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception {

Assert.assertTrue(Files.isSymbolicLink(symlinkPath));

HiveConf hiveConf = new HiveConf();
hiveConf.set(HiveConf.ConfVars.HIVE_JAR_DIRECTORY.varname, "/tmp");
HiveConf hiveConf = new HiveConfForTest(getClass());

TezSessionState sessionState = new TezSessionState(DagUtils.getInstance(), hiveConf);

Expand All @@ -50,4 +57,38 @@ public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception {
// local resources point to the same original resource
Assert.assertEquals(l1.getResource().toPath(), l2.getResource().toPath());
}

@Test
public void testScratchDirDeletedInTheEventOfExceptionWhileOpeningSession() throws Exception {
HiveConf hiveConf = new HiveConfForTest(getClass());
hiveConf.set("hive.security.authorization.manager",
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory");
SessionState.start(hiveConf);

final AtomicReference<String> scratchDirPath = new AtomicReference<>();

TezSessionState sessionState = new TezSessionState(SessionState.get().getSessionId(), hiveConf) {
@Override
void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) throws TezException, IOException {
super.openInternalUnsafe(isAsync, console);
// save scratch dir here as it's nullified while calling the cleanup
scratchDirPath.set(tezScratchDir.toUri().getPath());
throw new RuntimeException("fake exception in openInternalUnsafe");
}
};

SessionState.LogHelper console = new SessionState.LogHelper(LoggerFactory.getLogger("TestTezSessionState"));
TezSessionState.HiveResources resources =
new TezSessionState.HiveResources(new org.apache.hadoop.fs.Path("/tmp"));

try {
sessionState.openInternal(null, false, console, resources);
Assert.fail("An exception should have been thrown while calling openInternal");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().equalsIgnoreCase("fake exception in openInternalUnsafe"));
}
LOG.info("Checking if scratch dir exists: {}", scratchDirPath.get());
Assert.assertFalse("Scratch dir is not supposed to exist after cleanup: " + scratchDirPath.get(),
Files.exists(Paths.get(scratchDirPath.get())));
}
}