diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedAppsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedAppsEvent.java index 93c350fa37f9b..aad7853cc6c97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedAppsEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedAppsEvent.java @@ -52,6 +52,11 @@ public enum Reason { /** * Application is killed by ResourceManager */ - BY_RESOURCEMANAGER + BY_RESOURCEMANAGER, + + /** + * Application is marked as Cleaned because NodeManager restart and Recovery, and it is already finished + */ + BY_NODEMANAGER } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5da709c49dc2b..930dd3d18fe3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.nodemanager; @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; @@ -102,13 +103,13 @@ import org.apache.hadoop.classification.VisibleForTesting; public class NodeStatusUpdaterImpl extends AbstractService implements - NodeStatusUpdater { + NodeStatusUpdater { public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS = - YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers"; + YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers"; private static final Logger LOG = - LoggerFactory.getLogger(NodeStatusUpdaterImpl.class); + LoggerFactory.getLogger(NodeStatusUpdaterImpl.class); private final Object heartbeatMonitor = new Object(); private final Object shutdownMonitor = new Object(); @@ -129,7 +130,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private long tokenRemovalDelayMs; /** Keeps track of when the next keep alive request should be sent for an app*/ private Map appTokenKeepAliveMap = - new HashMap(); + new HashMap(); private Random keepAliveDelayRandom = new Random(); // It will be used to track recently stopped containers on node manager, this // is to avoid the misleading no-such-container exception messages on NM, when @@ -164,7 +165,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private boolean timelineServiceV2Enabled; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; this.context = context; @@ -172,9 +173,9 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); this.pendingCompletedContainers = - new HashMap(); + new HashMap(); this.logAggregationReportForAppsTempList = - new ArrayList(); + new ArrayList(); } @Override @@ -192,9 +193,9 @@ protected void serviceInit(Configuration conf) throws Exception { this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf); long memoryMb = totalResource.getMemorySize(); float vMemToPMem = - conf.getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + conf.getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); long virtualMemoryMb = (long)Math.ceil(memoryMb * vMemToPMem); int virtualCores = totalResource.getVirtualCores(); @@ -209,52 +210,52 @@ protected void serviceInit(Configuration conf) throws Exception { long physicalMemoryMb = memoryMb; int physicalCores = virtualCores; ResourceCalculatorPlugin rcp = - ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf); + ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf); if (rcp != null) { physicalMemoryMb = rcp.getPhysicalMemorySize() / (1024 * 1024); physicalCores = rcp.getNumProcessors(); } this.physicalResource = - Resource.newInstance(physicalMemoryMb, physicalCores); + Resource.newInstance(physicalMemoryMb, physicalCores); this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf); this.tokenRemovalDelayMs = - conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); this.minimumResourceManagerVersion = conf.get( - YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, - YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION); + YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, + YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION); nodeLabelsHandler = - createNMNodeLabelsHandler(nodeLabelsProvider); + createNMNodeLabelsHandler(nodeLabelsProvider); nodeAttributesHandler = - createNMNodeAttributesHandler(nodeAttributesProvider); + createNMNodeAttributesHandler(nodeAttributesProvider); // Default duration to track stopped containers on nodemanager is 10Min. // This should not be assigned very large value as it will remember all the // containers stopped during that time. durationToTrackStoppedContainers = - conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, - 600000); + conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + 600000); if (durationToTrackStoppedContainers < 0) { String message = "Invalid configuration for " - + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default " - + "value is 10Min(600000)."; + + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default " + + "value is 10Min(600000)."; LOG.error(message); throw new YarnException(message); } LOG.debug("{} :{}", YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, - durationToTrackStoppedContainers); + durationToTrackStoppedContainers); super.serviceInit(conf); LOG.info("Initialized nodemanager with : physical-memory={} virtual-memory={} " + - "virtual-cores={}.", memoryMb, virtualMemoryMb, virtualCores); + "virtual-cores={}.", memoryMb, virtualMemoryMb, virtualCores); this.logAggregationEnabled = - conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); this.timelineServiceV2Enabled = YarnConfiguration. - timelineServiceV2Enabled(conf); + timelineServiceV2Enabled(conf); } @@ -270,7 +271,7 @@ protected void serviceStart() throws Exception { // Registration has to be in start so that ContainerManager can get the // perNM tokens needed to authenticate ContainerTokens. this.resourceTracker = getRMClient(); - registerWithRM(); + registerWithRM(true); super.serviceStart(); startStatusUpdater(); } catch (Exception e) { @@ -285,8 +286,8 @@ protected void serviceStop() throws Exception { // the isStopped check is for avoiding multiple unregistrations. synchronized(shutdownMonitor) { if (this.registeredWithRM && !this.isStopped - && !isNMUnderSupervisionWithRecoveryEnabled() - && !context.getDecommissioned() && !failedToConnect) { + && !isNMUnderSupervisionWithRecoveryEnabled() + && !context.getDecommissioned() && !failedToConnect) { unRegisterNM(); } // Interrupt the updater. @@ -299,15 +300,15 @@ protected void serviceStop() throws Exception { private boolean isNMUnderSupervisionWithRecoveryEnabled() { Configuration config = getConfig(); return config.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, - YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED) - && config.getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, + YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED) + && config.getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED); } private void unRegisterNM() { RecordFactory recordFactory = RecordFactoryPBImpl.get(); UnRegisterNodeManagerRequest request = recordFactory - .newRecordInstance(UnRegisterNodeManagerRequest.class); + .newRecordInstance(UnRegisterNodeManagerRequest.class); request.setNodeId(this.nodeId); try { resourceTracker.unRegisterNodeManager(request); @@ -328,7 +329,7 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { sendOutofBandHeartBeat(); try { statusUpdater.join(); - registerWithRM(); + registerWithRM(false); statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); this.isStopped = false; statusUpdater.start(); @@ -351,8 +352,8 @@ protected void stopRMProxy() { @Private protected boolean isTokenKeepAliveEnabled(Configuration conf) { return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) - && UserGroupInformation.isSecurityEnabled(); + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) + && UserGroupInformation.isSecurityEnabled(); } @VisibleForTesting @@ -362,27 +363,27 @@ protected ResourceTracker getRMClient() throws IOException { } private void updateConfiguredResourcesViaPlugins( - Resource configuredResource) throws YarnException { + Resource configuredResource) throws YarnException { ResourcePluginManager pluginManager = context.getResourcePluginManager(); if (pluginManager != null && pluginManager.getNameToPlugins() != null) { // Update configured resource for (ResourcePlugin resourcePlugin : pluginManager.getNameToPlugins() - .values()) { + .values()) { if (resourcePlugin.getNodeResourceHandlerInstance() != null) { resourcePlugin.getNodeResourceHandlerInstance() - .updateConfiguredResource(configuredResource); + .updateConfiguredResource(configuredResource); } } } } @VisibleForTesting - protected void registerWithRM() - throws YarnException, IOException { + protected void registerWithRM(boolean isStarted) + throws YarnException, IOException { RegisterNodeManagerResponse regNMResponse; Set nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration(); Set nodeAttributes = - nodeAttributesHandler.getNodeAttributesForRegistration(); + nodeAttributesHandler.getNodeAttributesForRegistration(); // Synchronize NM-RM registration with // ContainerManagerImpl#increaseContainersResource and @@ -390,11 +391,52 @@ protected void registerWithRM() // during RM recovery synchronized (this.context) { List containerReports = getNMContainerStatuses(); + List runningApplications = getRunningApplications(); + + if (isStarted) { + + Set recoverAppsToReport = new HashSet<>(); + List trueRunningApplications = new ArrayList<>(); + List recoverAppsToCleanup = new ArrayList<>(); + + // get not finish apps from not complete container + for (NMContainerStatus status : containerReports) { + ContainerId containerId = status.getContainerId(); + ApplicationId applicationId = containerId.getApplicationAttemptId() + .getApplicationId(); + if (status.getContainerState() != ContainerState.COMPLETE) { + recoverAppsToReport.add(applicationId); + } + } + // get not finish apps from nmTokenSecretManager + NMTokenSecretManagerInNM nmTokenSecretManager = this.context.getNMTokenSecretManager(); + Set appsFromTSM = nmTokenSecretManager.getAppToAppAttemptMap().keySet(); + + recoverAppsToReport.addAll(appsFromTSM); + + for (ApplicationId applicationId : runningApplications) { + if (!recoverAppsToReport.contains(applicationId)) { + recoverAppsToCleanup.add(applicationId); + } else { + trueRunningApplications.add(applicationId); + } + } + // clean finish apps before register + // Only start tracking for keepAlive on FINISH_APP + trackAppsForKeepAlive(recoverAppsToCleanup); + if (!recoverAppsToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedAppsEvent(recoverAppsToCleanup, + CMgrCompletedAppsEvent.Reason.BY_NODEMANAGER)); + } + runningApplications = trueRunningApplications; + } + NodeStatus nodeStatus = getNodeStatus(0); RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource, nodeAttributes, nodeStatus); + RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, + nodeManagerVersionId, containerReports, runningApplications, + nodeLabels, physicalResource, nodeAttributes, nodeStatus); if (containerReports != null && !containerReports.isEmpty()) { LOG.info("Registering with RM using containers.size : {}.", containerReports.size()); @@ -402,17 +444,17 @@ nodeManagerVersionId, containerReports, getRunningApplications(), if (logAggregationEnabled) { // pull log aggregation status for application running in this NM List logAggregationReports = - context.getNMLogAggregationStatusTracker() - .pullCachedLogAggregationReports(); + context.getNMLogAggregationStatusTracker() + .pullCachedLogAggregationReports(); if (logAggregationReports != null - && !logAggregationReports.isEmpty()) { + && !logAggregationReports.isEmpty()) { LOG.debug("The cache log aggregation status size:{}", - logAggregationReports.size()); + logAggregationReports.size()); request.setLogAggregationReportsForApps(logAggregationReports); } } regNMResponse = - resourceTracker.registerNodeManager(request); + resourceTracker.registerNodeManager(request); // Make sure rmIdentifier is set before we release the lock this.rmIdentifier = regNMResponse.getRMIdentifier(); } @@ -420,11 +462,11 @@ nodeManagerVersionId, containerReports, getRunningApplications(), // if the Resource Manager instructs NM to shutdown. if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { String message = - "Message from ResourceManager: " - + regNMResponse.getDiagnosticsMessage(); + "Message from ResourceManager: " + + regNMResponse.getDiagnosticsMessage(); throw new YarnRuntimeException( - "Received SHUTDOWN signal from Resourcemanager, Registration of NodeManager failed, " - + message); + "Received SHUTDOWN signal from Resourcemanager, Registration of NodeManager failed, " + + message); } // if ResourceManager version is too old then shutdown @@ -435,16 +477,16 @@ nodeManagerVersionId, containerReports, getRunningApplications(), String rmVersion = regNMResponse.getRMVersion(); if (rmVersion == null) { String message = "The Resource Manager's did not return a version. " - + "Valid version cannot be checked."; + + "Valid version cannot be checked."; throw new YarnRuntimeException("Shutting down the Node Manager. " - + message); + + message); } if (VersionUtil.compareVersions(rmVersion,minimumResourceManagerVersion) < 0) { String message = "The Resource Manager's version (" - + rmVersion +") is less than the minimum " - + "allowed version " + minimumResourceManagerVersion; + + rmVersion +") is less than the minimum " + + "allowed version " + minimumResourceManagerVersion; throw new YarnRuntimeException("Shutting down the Node Manager on RM " - + "version error, " + message); + + "version error, " + message); } } this.registeredWithRM = true; @@ -464,22 +506,22 @@ nodeManagerVersionId, containerReports, getRunningApplications(), StringBuilder successfullRegistrationMsg = new StringBuilder(); successfullRegistrationMsg.append("Registered with ResourceManager as ") - .append(this.nodeId); + .append(this.nodeId); Resource newResource = regNMResponse.getResource(); if (newResource != null) { updateNMResource(newResource); successfullRegistrationMsg.append(" with updated total resource of ") - .append(this.totalResource); + .append(this.totalResource); } else { successfullRegistrationMsg.append(" with total resource of ") - .append(this.totalResource); + .append(this.totalResource); } successfullRegistrationMsg.append(nodeLabelsHandler - .verifyRMRegistrationResponseForNodeLabels(regNMResponse)); + .verifyRMRegistrationResponseForNodeLabels(regNMResponse)); successfullRegistrationMsg.append(nodeAttributesHandler - .verifyRMRegistrationResponseForNodeAttributes(regNMResponse)); + .verifyRMRegistrationResponseForNodeAttributes(regNMResponse)); LOG.info(successfullRegistrationMsg.toString()); } @@ -491,7 +533,7 @@ private List createKeepAliveApplicationList() { List appList = new ArrayList(); for (Iterator> i = - this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) { + this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) { Entry e = i.next(); ApplicationId appId = e.getKey(); Long nextKeepAlive = e.getValue(); @@ -514,22 +556,22 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); nodeHealthStatus.setLastHealthReportTime(healthChecker - .getLastHealthReportTime()); + .getLastHealthReportTime()); LOG.debug("Node's health-status : {}, {}", - nodeHealthStatus.getIsNodeHealthy(), - nodeHealthStatus.getHealthReport()); + nodeHealthStatus.getIsNodeHealthy(), + nodeHealthStatus.getHealthReport()); List containersStatuses = getContainerStatuses(); ResourceUtilization containersUtilization = getContainersUtilization(); ResourceUtilization nodeUtilization = getNodeUtilization(); List increasedContainers - = getIncreasedContainers(); + = getIncreasedContainers(); NodeStatus nodeStatus = - NodeStatus.newInstance(nodeId, responseId, containersStatuses, - createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization, increasedContainers); + NodeStatus.newInstance(nodeId, responseId, containersStatuses, + createKeepAliveApplicationList(), nodeHealthStatus, + containersUtilization, nodeUtilization, increasedContainers); nodeStatus.setOpportunisticContainersStatus( - getOpportunisticContainersStatus()); + getOpportunisticContainersStatus()); return nodeStatus; } @@ -539,7 +581,7 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { */ private OpportunisticContainersStatus getOpportunisticContainersStatus() { OpportunisticContainersStatus status = - this.context.getContainerManager().getOpportunisticContainersStatus(); + this.context.getContainerManager().getOpportunisticContainersStatus(); return status; } @@ -549,7 +591,7 @@ private OpportunisticContainersStatus getOpportunisticContainersStatus() { */ private ResourceUtilization getContainersUtilization() { ContainersMonitor containersMonitor = - this.context.getContainerManager().getContainersMonitor(); + this.context.getContainerManager().getContainersMonitor(); return containersMonitor.getContainersUtilization(); } @@ -559,7 +601,7 @@ private ResourceUtilization getContainersUtilization() { */ private ResourceUtilization getNodeUtilization() { NodeResourceMonitorImpl nodeResourceMonitor = - (NodeResourceMonitorImpl) this.context.getNodeResourceMonitor(); + (NodeResourceMonitorImpl) this.context.getNodeResourceMonitor(); return nodeResourceMonitor.getUtilization(); } @@ -567,12 +609,12 @@ private ResourceUtilization getNodeUtilization() { * NM-RM heartbeat. */ private List - getIncreasedContainers() { + getIncreasedContainers() { List - increasedContainers = new ArrayList<>( + increasedContainers = new ArrayList<>( this.context.getIncreasedContainers().values()); for (org.apache.hadoop.yarn.api.records.Container - container : increasedContainers) { + container : increasedContainers) { this.context.getIncreasedContainers().remove(container.getId()); } return increasedContainers; @@ -585,7 +627,7 @@ private void updateNMResource(Resource resource) { // Update the containers monitor ContainersMonitor containersMonitor = - this.context.getContainerManager().getContainersMonitor(); + this.context.getContainerManager().getContainersMonitor(); containersMonitor.setAllocatedResourcesForContainers(totalResource); } @@ -598,13 +640,13 @@ protected List getContainerStatuses() throws IOException { for (Container container : this.context.getContainers().values()) { ContainerId containerId = container.getContainerId(); ApplicationId applicationId = containerId.getApplicationAttemptId() - .getApplicationId(); + .getApplicationId(); org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = - container.cloneAndGetContainerStatus(); + container.cloneAndGetContainerStatus(); if (containerStatus.getState() == ContainerState.COMPLETE) { if (isApplicationStopped(applicationId)) { LOG.debug("{} is completing, remove {} from NM context.", - applicationId, containerId); + applicationId, containerId); context.getContainers().remove(containerId); pendingCompletedContainers.put(containerId, containerStatus); } else { @@ -624,7 +666,7 @@ protected List getContainerStatuses() throws IOException { containerStatuses.addAll(pendingCompletedContainers.values()); if (!containerStatuses.isEmpty()) { LOG.debug("Sending out {} container statuses: {}", - containerStatuses.size(), containerStatuses); + containerStatuses.size(), containerStatuses); } return containerStatuses; @@ -633,9 +675,9 @@ protected List getContainerStatuses() throws IOException { private List getRunningApplications() { List runningApplications = new ArrayList(); for (Entry appEntry : this.context - .getApplications().entrySet()) { + .getApplications().entrySet()) { if (ApplicationState.FINISHED != appEntry.getValue() - .getApplicationState()) { + .getApplicationState()) { runningApplications.add(appEntry.getKey()); } } @@ -646,17 +688,17 @@ private List getRunningApplications() { // These NMContainerStatus are sent on NM registration and used by YARN only. private List getNMContainerStatuses() throws IOException { List containerStatuses = - new ArrayList(); + new ArrayList(); for (Container container : this.context.getContainers().values()) { ContainerId containerId = container.getContainerId(); ApplicationId applicationId = containerId.getApplicationAttemptId() - .getApplicationId(); + .getApplicationId(); if (!this.context.getApplications().containsKey(applicationId)) { context.getContainers().remove(containerId); continue; } NMContainerStatus status = - container.getNMContainerStatus(); + container.getNMContainerStatus(); containerStatuses.add(status); if (status.getContainerState() == ContainerState.COMPLETE) { // Adding to finished containers cache. Cache will keep it around at @@ -667,7 +709,7 @@ private List getNMContainerStatuses() throws IOException { } if (!containerStatuses.isEmpty()) { LOG.info("Sending out {} container NM container statuses: {}.", - containerStatuses.size(), containerStatuses); + containerStatuses.size(), containerStatuses); } return containerStatuses; } @@ -678,10 +720,10 @@ private boolean isApplicationStopped(ApplicationId applicationId) { } ApplicationState applicationState = this.context.getApplications().get( - applicationId).getApplicationState(); + applicationId).getApplicationState(); if (applicationState == ApplicationState.FINISHING_CONTAINERS_WAIT - || applicationState == ApplicationState.APPLICATION_RESOURCES_CLEANINGUP - || applicationState == ApplicationState.FINISHED) { + || applicationState == ApplicationState.APPLICATION_RESOURCES_CLEANINGUP + || applicationState == ApplicationState.FINISHED) { return true; } else { return false; @@ -694,7 +736,7 @@ public void addCompletedContainer(ContainerId containerId) { removeVeryOldStoppedContainersFromCache(); if (!recentlyStoppedContainers.containsKey(containerId)) { recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); + System.currentTimeMillis() + durationToTrackStoppedContainers); } } } @@ -702,7 +744,7 @@ public void addCompletedContainer(ContainerId containerId) { @VisibleForTesting @Private public void removeOrTrackCompletedContainersFromContext( - List containerIds) { + List containerIds) { Set removedContainers = new HashSet(); pendingContainersToRemove.addAll(containerIds); @@ -714,7 +756,7 @@ public void removeOrTrackCompletedContainersFromContext( if (nmContainer == null) { iter.remove(); } else if (nmContainer.getContainerState().equals( - org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { + org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { context.getContainers().remove(containerId); removedContainers.add(containerId); iter.remove(); @@ -739,8 +781,8 @@ private void trackAppForKeepAlive(ApplicationId appId) { // Next keepAlive request for app between 0.7 & 0.9 of when the token will // likely expire. long nextTime = System.currentTimeMillis() - + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs - * keepAliveDelayRandom.nextInt(100))/100); + + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs + * keepAliveDelayRandom.nextInt(100))/100); appTokenKeepAliveMap.put(appId, nextTime); } @@ -775,7 +817,7 @@ public void removeVeryOldStoppedContainersFromCache() { synchronized (recentlyStoppedContainers) { long currentTime = System.currentTimeMillis(); Iterator> i = - recentlyStoppedContainers.entrySet().iterator(); + recentlyStoppedContainers.entrySet().iterator(); while (i.hasNext()) { Entry mapEntry = i.next(); ContainerId cid = mapEntry.getKey(); @@ -784,7 +826,7 @@ public void removeVeryOldStoppedContainersFromCache() { } if (!context.getContainers().containsKey(cid)) { ApplicationId appId = - cid.getApplicationAttemptId().getApplicationId(); + cid.getApplicationAttemptId().getApplicationId(); if (isApplicationStopped(appId)) { i.remove(); try { @@ -804,9 +846,9 @@ public long getRMIdentifier() { } private static Map parseCredentials( - Map systemCredentials) throws IOException { + Map systemCredentials) throws IOException { Map map = - new HashMap(); + new HashMap(); for (Map.Entry entry : systemCredentials.entrySet()) { Credentials credentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -819,7 +861,7 @@ private static Map parseCredentials( if (LOG.isDebugEnabled()) { for (Map.Entry entry : map.entrySet()) { LOG.debug("Retrieved credentials from RM for {}: {}", - entry.getKey(), entry.getValue().getAllTokens()); + entry.getKey(), entry.getValue().getAllTokens()); } } return map; @@ -828,19 +870,19 @@ private static Map parseCredentials( protected void startStatusUpdater() { statusUpdaterRunnable = new StatusUpdaterRunnable(); statusUpdater = - new Thread(statusUpdaterRunnable, "Node Status Updater"); + new Thread(statusUpdaterRunnable, "Node Status Updater"); statusUpdater.start(); } private boolean handleShutdownOrResyncCommand( - NodeHeartbeatResponse response) { + NodeHeartbeatResponse response) { if (response.getNodeAction() == NodeAction.SHUTDOWN) { LOG.warn("Received SHUTDOWN signal from Resourcemanager as part of" - + " heartbeat, hence shutting down."); + + " heartbeat, hence shutting down."); LOG.warn("Message from ResourceManager: {}.", response.getDiagnosticsMessage()); context.setDecommissioned(true); dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); return true; } if (response.getNodeAction() == NodeAction.RESYNC) { @@ -848,9 +890,9 @@ private boolean handleShutdownOrResyncCommand( LOG.warn("Message from ResourceManager: {}.", response.getDiagnosticsMessage()); // Invalidate the RMIdentifier while resync NodeStatusUpdaterImpl.this.rmIdentifier = - ResourceManagerConstants.RM_INVALID_IDENTIFIER; + ResourceManagerConstants.RM_INVALID_IDENTIFIER; dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.RESYNC)); + new NodeManagerEvent(NodeManagerEventType.RESYNC)); pendingCompletedContainers.clear(); return true; } @@ -864,7 +906,7 @@ public void reportException(Exception ex) { } private List getLogAggregationReportsForApps( - ConcurrentLinkedQueue lastestLogAggregationStatus) { + ConcurrentLinkedQueue lastestLogAggregationStatus) { LogAggregationReport status; while ((status = lastestLogAggregationStatus.poll()) != null) { this.logAggregationReportForAppsTempList.add(status); @@ -875,12 +917,12 @@ private List getLogAggregationReportsForApps( } private NMNodeLabelsHandler createNMNodeLabelsHandler( - NodeLabelsProvider nodeLabelsProvider) { + NodeLabelsProvider nodeLabelsProvider) { if (nodeLabelsProvider == null) { return new NMCentralizedNodeLabelsHandler(); } else { return new NMDistributedNodeLabelsHandler(nodeLabelsProvider, - this.getConfig()); + this.getConfig()); } } @@ -891,7 +933,7 @@ private NMNodeLabelsHandler createNMNodeLabelsHandler( * @return attributes handler */ private NMNodeAttributesHandler createNMNodeAttributesHandler( - NodeAttributesProvider provider) { + NodeAttributesProvider provider) { if (provider == null) { return new NMCentralizedNodeAttributesHandler(); } else { @@ -907,7 +949,7 @@ private static abstract class CachedNodeDescriptorHandler { private boolean isValueSented; CachedNodeDescriptorHandler(T defaultValue, - long resyncInterval) { + long resyncInterval) { this.defaultValue = defaultValue; this.resyncInterval = resyncInterval; } @@ -966,7 +1008,7 @@ public T getValueForHeartbeat() { */ public boolean isResyncIntervalElapsed() { long elapsedTimeSinceLastSync = - System.currentTimeMillis() - lastSendMills; + System.currentTimeMillis() - lastSendMills; if (elapsedTimeSinceLastSync > resyncInterval) { return true; } @@ -1016,14 +1058,14 @@ private interface NMNodeAttributesHandler { * independently and returns empty string */ String verifyRMRegistrationResponseForNodeAttributes( - RegisterNodeManagerResponse regNMResponse); + RegisterNodeManagerResponse regNMResponse); /** * check whether if updated attributes sent to RM was accepted or not. * @param response */ void verifyRMHeartbeatResponseForNodeAttributes( - NodeHeartbeatResponse response); + NodeHeartbeatResponse response); } @@ -1032,7 +1074,7 @@ void verifyRMHeartbeatResponseForNodeAttributes( * the response. */ private static class NMCentralizedNodeAttributesHandler - implements NMNodeAttributesHandler { + implements NMNodeAttributesHandler { @Override public Set getNodeAttributesForHeartbeat() { return null; @@ -1045,27 +1087,27 @@ public Set getNodeAttributesForRegistration() { @Override public void verifyRMHeartbeatResponseForNodeAttributes( - NodeHeartbeatResponse response) { + NodeHeartbeatResponse response) { } @Override public String verifyRMRegistrationResponseForNodeAttributes( - RegisterNodeManagerResponse regNMResponse) { + RegisterNodeManagerResponse regNMResponse) { return ""; } } private static class NMDistributedNodeAttributesHandler - extends CachedNodeDescriptorHandler> - implements NMNodeAttributesHandler { + extends CachedNodeDescriptorHandler> + implements NMNodeAttributesHandler { private final NodeAttributesProvider attributesProvider; protected NMDistributedNodeAttributesHandler( - NodeAttributesProvider provider, Configuration conf) { + NodeAttributesProvider provider, Configuration conf) { super(Collections.unmodifiableSet(new HashSet<>(0)), - conf.getLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, - YarnConfiguration.DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL)); + conf.getLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, + YarnConfiguration.DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL)); this.attributesProvider = provider; } @@ -1086,7 +1128,7 @@ public Set getValueFromProvider() { @Override protected void validate(Set nodeAttributes) - throws IOException { + throws IOException { try { NodeLabelUtil.validateNodeAttributes(nodeAttributes); } catch (IOException e) { @@ -1102,36 +1144,36 @@ protected boolean isValueUpdated(Set value) { @Override public String verifyRMRegistrationResponseForNodeAttributes( - RegisterNodeManagerResponse regNMResponse) { + RegisterNodeManagerResponse regNMResponse) { StringBuilder successfulNodeAttributesRegistrationMsg = - new StringBuilder(); + new StringBuilder(); if (regNMResponse.getAreNodeAttributesAcceptedByRM()) { successfulNodeAttributesRegistrationMsg - .append(" and with following Node attribute(s) : {") - .append(getPreviousValue()).append("}"); + .append(" and with following Node attribute(s) : {") + .append(getPreviousValue()).append("}"); } else { // case where provider is set but RM did not accept the node attributes String errorMsgFromRM = regNMResponse.getDiagnosticsMessage(); LOG.error("Node attributes sent from NM while registration were" - + " rejected by RM. " + ((errorMsgFromRM == null) ? - "Seems like RM is configured with Centralized Attributes." : - "And with message " + regNMResponse.getDiagnosticsMessage())); + + " rejected by RM. " + ((errorMsgFromRM == null) ? + "Seems like RM is configured with Centralized Attributes." : + "And with message " + regNMResponse.getDiagnosticsMessage())); } return successfulNodeAttributesRegistrationMsg.toString(); } @Override public void verifyRMHeartbeatResponseForNodeAttributes( - NodeHeartbeatResponse response) { + NodeHeartbeatResponse response) { if (isValueSented()) { if (response.getAreNodeAttributesAcceptedByRM()) { LOG.debug("Node attributes {{}} were Accepted by RM ", - getPreviousValue()); + getPreviousValue()); } else { // case where updated node attributes from NodeAttributesProvider // is sent to RM and RM rejected the attributes LOG.error("NM node attributes [{}] were not accepted by RM and message from RM : {}.", - getPreviousValue(), response.getDiagnosticsMessage()); + getPreviousValue(), response.getDiagnosticsMessage()); } } } @@ -1149,7 +1191,7 @@ private static interface NMNodeLabelsHandler { * independently and returns empty string */ String verifyRMRegistrationResponseForNodeLabels( - RegisterNodeManagerResponse regNMResponse); + RegisterNodeManagerResponse regNMResponse); /** * If nodeLabels From Provider is different previous node labels then it @@ -1172,7 +1214,7 @@ String verifyRMRegistrationResponseForNodeLabels( * response */ private static class NMCentralizedNodeLabelsHandler - implements NMNodeLabelsHandler { + implements NMNodeLabelsHandler { @Override public Set getNodeLabelsForHeartbeat() { return null; @@ -1185,25 +1227,25 @@ public Set getNodeLabelsForRegistration() { @Override public void verifyRMHeartbeatResponseForNodeLabels( - NodeHeartbeatResponse response) { + NodeHeartbeatResponse response) { } @Override public String verifyRMRegistrationResponseForNodeLabels( - RegisterNodeManagerResponse regNMResponse) { + RegisterNodeManagerResponse regNMResponse) { return ""; } } private static class NMDistributedNodeLabelsHandler - extends CachedNodeDescriptorHandler> - implements NMNodeLabelsHandler { + extends CachedNodeDescriptorHandler> + implements NMNodeLabelsHandler { private NMDistributedNodeLabelsHandler( - NodeLabelsProvider nodeLabelsProvider, Configuration conf) { + NodeLabelsProvider nodeLabelsProvider, Configuration conf) { super(CommonNodeLabelsManager.EMPTY_NODELABEL_SET, - conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, - YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL)); + conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, + YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL)); this.nodeLabelsProvider = nodeLabelsProvider; } @@ -1216,20 +1258,20 @@ public Set getNodeLabelsForRegistration() { @Override public String verifyRMRegistrationResponseForNodeLabels( - RegisterNodeManagerResponse regNMResponse) { + RegisterNodeManagerResponse regNMResponse) { StringBuilder successfulNodeLabelsRegistrationMsg = new StringBuilder(""); if (regNMResponse.getAreNodeLabelsAcceptedByRM()) { successfulNodeLabelsRegistrationMsg - .append(" and with following Node label(s) : {") - .append(StringUtils.join(",", getPreviousValue())).append("}"); + .append(" and with following Node label(s) : {") + .append(StringUtils.join(",", getPreviousValue())).append("}"); } else { // case where provider is set but RM did not accept the Node Labels String errorMsgFromRM = regNMResponse.getDiagnosticsMessage(); LOG.error( - "NodeLabels sent from NM while registration were rejected by RM. " - + ((errorMsgFromRM == null) - ? "Seems like RM is configured with Centralized Labels." - : "And with message " + regNMResponse.getDiagnosticsMessage())); + "NodeLabels sent from NM while registration were rejected by RM. " + + ((errorMsgFromRM == null) + ? "Seems like RM is configured with Centralized Labels." + : "And with message " + regNMResponse.getDiagnosticsMessage())); } return successfulNodeLabelsRegistrationMsg.toString(); } @@ -1240,14 +1282,14 @@ public Set getNodeLabelsForHeartbeat() { } protected void validate(Set nodeLabels) - throws IOException { + throws IOException { Iterator iterator = nodeLabels.iterator(); boolean hasInvalidLabel = false; StringBuilder errorMsg = new StringBuilder(); while (iterator.hasNext()) { try { NodeLabelUtil - .checkAndThrowLabelName(iterator.next().getName()); + .checkAndThrowLabelName(iterator.next().getName()); } catch (IOException e) { errorMsg.append(e.getMessage()); errorMsg.append(" , "); @@ -1272,16 +1314,16 @@ protected boolean isValueUpdated(Set value) { @Override public void verifyRMHeartbeatResponseForNodeLabels( - NodeHeartbeatResponse response) { + NodeHeartbeatResponse response) { if (isValueSented()) { if (response.getAreNodeLabelsAcceptedByRM()) { LOG.debug("Node Labels {{}} were Accepted by RM", - StringUtils.join(",", getPreviousValue())); + StringUtils.join(",", getPreviousValue())); } else { // case where updated labels from NodeLabelsProvider is sent to RM and // RM rejected the labels LOG.error("NM node labels [{}] were not accepted by RM and message from RM : {}.", - StringUtils.join(",", getPreviousValue()), response.getDiagnosticsMessage()); + StringUtils.join(",", getPreviousValue()), response.getDiagnosticsMessage()); } } } @@ -1298,34 +1340,34 @@ public void run() { try { NodeHeartbeatResponse response = null; Set nodeLabelsForHeartbeat = - nodeLabelsHandler.getNodeLabelsForHeartbeat(); + nodeLabelsHandler.getNodeLabelsForHeartbeat(); Set nodeAttributesForHeartbeat = nodeAttributesHandler.getNodeAttributesForHeartbeat(); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); NodeHeartbeatRequest request = - NodeHeartbeatRequest.newInstance(nodeStatus, - NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey(), - NodeStatusUpdaterImpl.this.context - .getNMTokenSecretManager().getCurrentKey(), - nodeLabelsForHeartbeat, - nodeAttributesForHeartbeat, - NodeStatusUpdaterImpl.this.context - .getRegisteringCollectors()); + NodeHeartbeatRequest.newInstance(nodeStatus, + NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey(), + NodeStatusUpdaterImpl.this.context + .getNMTokenSecretManager().getCurrentKey(), + nodeLabelsForHeartbeat, + nodeAttributesForHeartbeat, + NodeStatusUpdaterImpl.this.context + .getRegisteringCollectors()); if (logAggregationEnabled) { // pull log aggregation status for application running in this NM List logAggregationReports = - getLogAggregationReportsForApps(context - .getLogAggregationStatusForApps()); + getLogAggregationReportsForApps(context + .getLogAggregationStatusForApps()); if (logAggregationReports != null - && !logAggregationReports.isEmpty()) { + && !logAggregationReports.isEmpty()) { request.setLogAggregationReportsForApps(logAggregationReports); } } request.setTokenSequenceNo( - NodeStatusUpdaterImpl.this.tokenSequenceNo); + NodeStatusUpdaterImpl.this.tokenSequenceNo); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -1333,9 +1375,9 @@ public void run() { if (!handleShutdownOrResyncCommand(response)) { nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels( - response); + response); nodeAttributesHandler - .verifyRMHeartbeatResponseForNodeAttributes(response); + .verifyRMHeartbeatResponseForNodeAttributes(response); // Explicitly put this method after checking the resync // response. We @@ -1344,7 +1386,7 @@ public void run() { // when NM re-registers with RM. // Only remove the cleanedup containers that are acked removeOrTrackCompletedContainersFromContext(response - .getContainersToBeRemovedFromNM()); + .getContainersToBeRemovedFromNM()); // If the last heartbeat was missed, it is possible that the // RM saw this one as a duplicate and did not process it. @@ -1356,42 +1398,42 @@ public void run() { pendingCompletedContainers.clear(); } else { LOG.info("skipped clearing pending completed containers due to " + - "missed heartbeat"); + "missed heartbeat"); missedHearbeat = false; } logAggregationReportForAppsTempList.clear(); lastHeartbeatID = response.getResponseId(); List containersToCleanup = response - .getContainersToCleanup(); + .getContainersToCleanup(); if (!containersToCleanup.isEmpty()) { dispatcher.getEventHandler().handle( - new CMgrCompletedContainersEvent(containersToCleanup, - CMgrCompletedContainersEvent.Reason - .BY_RESOURCEMANAGER)); + new CMgrCompletedContainersEvent(containersToCleanup, + CMgrCompletedContainersEvent.Reason + .BY_RESOURCEMANAGER)); } List appsToCleanup = - response.getApplicationsToCleanup(); + response.getApplicationsToCleanup(); //Only start tracking for keepAlive on FINISH_APP trackAppsForKeepAlive(appsToCleanup); if (!appsToCleanup.isEmpty()) { dispatcher.getEventHandler().handle( - new CMgrCompletedAppsEvent(appsToCleanup, - CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + new CMgrCompletedAppsEvent(appsToCleanup, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); } Map systemCredentials = - YarnServerBuilderUtils.convertFromProtoFormat( - response.getSystemCredentialsForApps()); + YarnServerBuilderUtils.convertFromProtoFormat( + response.getSystemCredentialsForApps()); if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context).setSystemCrendentialsForApps( - parseCredentials(systemCredentials)); + parseCredentials(systemCredentials)); context.getContainerManager().handleCredentialUpdate(); } List - containersToUpdate = response.getContainersToUpdate(); + containersToUpdate = response.getContainersToUpdate(); if (!containersToUpdate.isEmpty()) { dispatcher.getEventHandler().handle( - new CMgrUpdateContainersEvent(containersToUpdate)); + new CMgrUpdateContainersEvent(containersToUpdate)); } // SignalContainer request originally comes from end users via @@ -1399,15 +1441,15 @@ public void run() { // ContainerManager which will dispatch the event to // ContainerLauncher. List containersToSignal = response - .getContainersToSignalList(); + .getContainersToSignalList(); if (!containersToSignal.isEmpty()) { dispatcher.getEventHandler().handle( - new CMgrSignalContainersEvent(containersToSignal)); + new CMgrSignalContainersEvent(containersToSignal)); } // Update QueuingLimits if ContainerManager supports queuing ContainerQueuingLimit queuingLimit = - response.getContainerQueuingLimit(); + response.getContainerQueuingLimit(); if (queuingLimit != null) { context.getContainerManager().updateQueuingLimit(queuingLimit); } @@ -1426,11 +1468,11 @@ public void run() { } NodeStatusUpdaterImpl.this.tokenSequenceNo = - response.getTokenSequenceNo(); + response.getTokenSequenceNo(); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); // failed to connect to RM. failedToConnect = true; throw new YarnRuntimeException(e); @@ -1443,8 +1485,8 @@ public void run() { } finally { synchronized (heartbeatMonitor) { nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? - YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS : - nextHeartBeatInterval; + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS : + nextHeartBeatInterval; try { heartbeatMonitor.wait(nextHeartBeatInterval); } catch (InterruptedException e) { @@ -1456,17 +1498,17 @@ public void run() { } private void updateTimelineCollectorData( - NodeHeartbeatResponse response) { + NodeHeartbeatResponse response) { Map incomingCollectorsMap = - response.getAppCollectors(); + response.getAppCollectors(); if (incomingCollectorsMap == null) { LOG.debug("No collectors to update RM"); return; } Map knownCollectors = - context.getKnownCollectors(); + context.getKnownCollectors(); for (Map.Entry entry - : incomingCollectorsMap.entrySet()) { + : incomingCollectorsMap.entrySet()) { ApplicationId appId = entry.getKey(); AppCollectorData collectorData = entry.getValue(); @@ -1478,13 +1520,13 @@ private void updateTimelineCollectorData( AppCollectorData existingData = knownCollectors.get(appId); if (AppCollectorData.happensBefore(existingData, collectorData)) { LOG.debug("Sync a new collector address: {} for application: {}" - + " from RM.", collectorData.getCollectorAddr(), appId); + + " from RM.", collectorData.getCollectorAddr(), appId); // Update information for clients. NMTimelinePublisher nmTimelinePublisher = - context.getNMTimelinePublisher(); + context.getNMTimelinePublisher(); if (nmTimelinePublisher != null) { nmTimelinePublisher.setTimelineServiceAddress( - application.getAppId(), collectorData.getCollectorAddr()); + application.getAppId(), collectorData.getCollectorAddr()); } // Update information for the node manager itself. knownCollectors.put(appId, collectorData); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e07a0e1cc18e1..7d5bba8e869b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1759,6 +1759,8 @@ public void handle(ContainerManagerEvent event) { diagnostic = "Application killed on shutdown"; } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) { diagnostic = "Application killed by ResourceManager"; + } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_NODEMANAGER) { + diagnostic = "Application cleaned by NodeManager"; } this.dispatcher.getEventHandler().handle( new ApplicationFinishEvent(appID, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java index ab5678a678800..ae7ccef8f3afa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java @@ -68,7 +68,14 @@ public NMTokenSecretManagerInNM(NMStateStoreService stateStore) { new HashMap>(); this.stateStore = stateStore; } - + + public Map> getAppToAppAttemptMap() { + if (appToAppAttemptMap == null) { + return new HashMap>(); + } + return appToAppAttemptMap; + } + public synchronized void recover() throws IOException { RecoveredNMTokensState state = stateStore.loadNMTokensState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index ad7a1e7776cdf..38e1d812e71f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -477,8 +477,8 @@ public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, } @Override - protected void registerWithRM() throws YarnException, IOException { - super.registerWithRM(); + protected void registerWithRM(boolean isStarted) throws YarnException, IOException { + super.registerWithRM(false); registrationCount++; } @@ -565,8 +565,8 @@ public TestNodeStatusUpdaterImpl3(Context context, Dispatcher dispatcher, } @Override - protected void registerWithRM() throws YarnException, IOException { - super.registerWithRM(); + protected void registerWithRM(boolean isStarted) throws YarnException, IOException { + super.registerWithRM(false); registrationCount++; if (registrationCount > 1) { throw new YarnRuntimeException("Registration with RM failed.");