From a55754ccbf82bd4484dce18147d735473606acf8 Mon Sep 17 00:00:00 2001 From: "eddy.cao" Date: Fri, 8 Aug 2025 19:04:01 +0800 Subject: [PATCH 1/2] Fix serial fsimage transfer during checkpoint with multiple namenodes --- .../hdfs/server/namenode/ha/StandbyCheckpointer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index ec848668d2561..aa6fc65c7fb85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -248,9 +248,10 @@ private void doCheckpoint() throws InterruptedException, IOException { // Do this in a separate thread to avoid blocking transition to active, but don't allow more // than the expected number of tasks to run or queue up // See HDFS-4816 - ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue(activeNNAddresses.size()), - uploadThreadFactory); + ExecutorService executor = + new ThreadPoolExecutor(activeNNAddresses.size(), activeNNAddresses.size(), 100, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue(activeNNAddresses.size()), + uploadThreadFactory); // for right now, just match the upload to the nn address by convention. There is no need to // directly tie them together by adding a pair class. HashMap> uploads = From 49266a63d58a96c196a6347998158503e6fc5754 Mon Sep 17 00:00:00 2001 From: "eddy.cao" Date: Sat, 9 Aug 2025 22:51:55 +0800 Subject: [PATCH 2/2] Add UT for thread count check --- .../namenode/ha/TestStandbyCheckpoints.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 8256caab762a9..75423f5cd866f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -451,6 +451,21 @@ public void testCheckpointCancellationDuringUpload() throws Exception { cluster.transitionToStandby(0); cluster.transitionToActive(1); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + int transferThreadCount = 0; + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threads = threadBean.getThreadInfo( + threadBean.getAllThreadIds(), 1); + for (ThreadInfo thread: threads) { + if (thread.getThreadName().startsWith("TransferFsImageUpload")) { + transferThreadCount++; + } + } + return transferThreadCount == NUM_NNS - 1; + } + }, 1000, 30000); // Wait to make sure background TransferFsImageUpload thread was cancelled. // This needs to be done before the next test in the suite starts, so that a