Skip to content

Commit 773da76

Browse files
authored
[GOBBLIN-2050] Add settings to allow for full cleanup in GobblinYarnAppLauncher (#3931)
Allow for explicit path definitions for token locations and work directories to allow easy cleanup after job completion
1 parent a74d17a commit 773da76

File tree

5 files changed

+44
-10
lines changed

5 files changed

+44
-10
lines changed

gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public class GobblinClusterConfigurationKeys {
5050
public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
5151
// Root working directory for Gobblin cluster
5252
public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir";
53+
// Root working dir without appending the application name, keeping CLUSTER_WORK_DIR property for backward compatibility
54+
// This is used in scenarios where we want to encapsulate multiple files inside of this work dir without coupling it to the YARN application
55+
// Example: Yarn security token refresh location, gobblin cluster worker directories.
56+
// However for concurrent jobs need to ensure that this property is distinct for each job otherwise it can lead to folder conflicts and pre-emptive deletion of files.
57+
public static final String CLUSTER_EXACT_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "exact.workDir";
5358

5459
public static final String DISTRIBUTED_JOB_LAUNCHER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherEnabled";
5560
public static final boolean DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED = false;

gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,14 @@ public static String getHostname() throws UnknownHostException {
131131
*/
132132
public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
133133
String applicationName, String applicationId) {
134-
if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
134+
if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR)) {
135+
return new Path(new Path(fs.getUri()), config.getString(GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR));
136+
} else if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
135137
return new Path(new Path(fs.getUri()), PathUtils.combinePaths(config.getString(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR),
136138
getAppWorkDirPath(applicationName, applicationId)));
139+
} else {
140+
return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId));
137141
}
138-
return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId));
139142
}
140143

141144
/**
@@ -254,4 +257,13 @@ public static FileSystem buildFileSystem(Config config, Configuration conf)
254257
.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
255258
: FileSystem.get(conf);
256259
}
260+
261+
public static FileSystem createFileSystem(Config config, Configuration conf) throws IOException {
262+
Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
263+
//Add any Hadoop-specific overrides into the Configuration object
264+
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides), conf);
265+
return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
266+
.newInstance(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
267+
: FileSystem.newInstance(conf);
268+
}
257269
}

gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -870,8 +870,7 @@ private Path getHdfsLogDir(Path appWorkDir) throws IOException {
870870
* @throws IOException
871871
*/
872872
private AbstractTokenRefresher buildTokenRefreshManager() throws IOException {
873-
Path tokenFilePath = new Path(this.fs.getHomeDirectory(), this.applicationName + Path.SEPARATOR +
874-
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
873+
Path tokenFilePath = YarnContainerSecurityManager.getYarnTokenFilePath(this.config, this.fs);
875874
String securityManagerClassName = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS);
876875

877876
try {
@@ -892,10 +891,12 @@ private AbstractTokenRefresher buildTokenRefreshManager() throws IOException {
892891

893892
@VisibleForTesting
894893
void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException {
895-
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString());
896-
if (this.fs.exists(appWorkDir)) {
894+
// Create a new filesystem as this.fs may have been closed by the Yarn Application, and FS.get() will return a cached instance of the closed FS
895+
FileSystem fs = GobblinClusterUtils.createFileSystem(this.config, this.yarnConfiguration);
896+
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, fs, this.applicationName, applicationId.toString());
897+
if (fs.exists(appWorkDir)) {
897898
LOGGER.info("Deleting application working directory " + appWorkDir);
898-
this.fs.delete(appWorkDir, true);
899+
fs.delete(appWorkDir, true);
899900
}
900901
}
901902

gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public class GobblinYarnConfigurationKeys {
113113
public static final String KEYTAB_FILE_PATH = GOBBLIN_YARN_PREFIX + "keytab.file.path";
114114
public static final String KEYTAB_PRINCIPAL_NAME = GOBBLIN_YARN_PREFIX + "keytab.principal.name";
115115
public static final String TOKEN_FILE_NAME = ".token";
116+
public static final String TOKEN_FILE_PATH_KEY = GOBBLIN_YARN_PREFIX + "token.file.path";
116117
public static final String LOGIN_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + "login.interval.minutes";
117118
public static final Long DEFAULT_LOGIN_INTERVAL_IN_MINUTES = Long.MAX_VALUE;
118119
public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes";

gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.google.common.util.concurrent.AbstractIdleService;
2525
import com.typesafe.config.Config;
2626
import java.io.IOException;
27+
28+
import org.apache.gobblin.util.PathUtils;
2729
import org.apache.gobblin.util.logs.LogCopier;
2830
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
2931
import org.apache.hadoop.fs.FileSystem;
@@ -63,9 +65,7 @@ public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus event
6365

6466
public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus eventBus, LogCopier logCopier) {
6567
this.fs = fs;
66-
this.tokenFilePath = new Path(this.fs.getHomeDirectory(),
67-
config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY) + Path.SEPARATOR
68-
+ GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
68+
this.tokenFilePath = getYarnTokenFilePath(config, fs);
6969
this.eventBus = eventBus;
7070
this.logCopier = logCopier;
7171
}
@@ -111,4 +111,19 @@ void addCredentials(Credentials credentials) throws IOException {
111111
}
112112
UserGroupInformation.getCurrentUser().addCredentials(credentials);
113113
}
114+
115+
/**
116+
* A utility method to get the location of the generated security token
117+
* @param config - the configuration that contains the application name and the token file path
118+
* @param fs - the Filesystem that stores the security token
119+
* @return the path to the security token
120+
*/
121+
static Path getYarnTokenFilePath(Config config, FileSystem fs) {
122+
if (config.hasPath(GobblinYarnConfigurationKeys.TOKEN_FILE_PATH_KEY)) {
123+
return new Path(config.getString(GobblinYarnConfigurationKeys.TOKEN_FILE_PATH_KEY), GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
124+
}
125+
// Default to storing the token file in the home directory of the user
126+
return new Path(fs.getHomeDirectory(), PathUtils.combinePaths(config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY),
127+
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME));
128+
}
114129
}

0 commit comments

Comments
 (0)