From 53a184492af7a4bbf4fd6ecdb5e80b431ff682ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=99=E8=89=AF?= Date: Thu, 7 Aug 2025 14:56:48 +0800 Subject: [PATCH 1/3] MAPREDUCE-7508.FileInputFormat can throw ArrayIndexOutofBoundsException because of some concurrent execution. --- .../main/java/org/apache/hadoop/mapred/FileInputFormat.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index 46bb4b629c816..8681bff9c5805 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -369,6 +369,9 @@ public InputSplit[] getSplits(JobConf job, int numSplits) } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } + if(blkLocations.length == 0){ + continue; + } if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); From d2ba0014d78acfdc1f301a97dc53c4ab0fbaefcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=99=E8=89=AF?= Date: Fri, 8 Aug 2025 11:21:36 +0800 Subject: [PATCH 2/3] reformat code --- .../src/main/java/org/apache/hadoop/mapred/FileInputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index 8681bff9c5805..d626e8336642f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -369,7 +369,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } - if(blkLocations.length == 0){ + if (blkLocations.length == 0){ continue; } if (isSplitable(fs, path)) { From 75ca4e18cd5c83d3df163b0abc229d86cb0f9de0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=99=E8=89=AF?= Date: Fri, 22 Aug 2025 17:47:41 +0800 Subject: [PATCH 3/3] modify the condition if we should skip empty files --- .../apache/hadoop/mapred/FileInputFormat.java | 75 +++++++++---------- 1 file changed, 35 insertions(+), 40 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index d626e8336642f..e1e7fb39f2877 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -361,50 +361,45 @@ public InputSplit[] getSplits(JobConf job, int numSplits) for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); - if (length != 0) { - FileSystem fs = path.getFileSystem(job); - BlockLocation[] blkLocations; - if (file instanceof LocatedFileStatus) { - blkLocations = ((LocatedFileStatus) file).getBlockLocations(); - } else { - blkLocations = fs.getFileBlockLocations(file, 0, length); - } - if (blkLocations.length == 0){ - continue; + FileSystem fs = path.getFileSystem(job); + BlockLocation[] blkLocations; + if (file instanceof LocatedFileStatus) { + blkLocations = ((LocatedFileStatus) file).getBlockLocations(); + } else { + blkLocations = fs.getFileBlockLocations(file, 0, length); + } + if (blkLocations.length == 0){ + continue; + } + if (isSplitable(fs, path)) { + long blockSize = file.getBlockSize(); + long splitSize = computeSplitSize(goalSize, minSize, blockSize); + + long bytesRemaining = length; + while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, + length-bytesRemaining, splitSize, clusterMap); + splits.add(makeSplit(path, length-bytesRemaining, splitSize, + splitHosts[0], splitHosts[1])); + bytesRemaining -= splitSize; } - if (isSplitable(fs, path)) { - long blockSize = file.getBlockSize(); - long splitSize = computeSplitSize(goalSize, minSize, blockSize); - - long bytesRemaining = length; - while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { - String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, - length-bytesRemaining, splitSize, clusterMap); - splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts[0], splitHosts[1])); - bytesRemaining -= splitSize; - } - if (bytesRemaining != 0) { - String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - - bytesRemaining, bytesRemaining, clusterMap); - splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts[0], splitHosts[1])); - } - } else { - if (LOG.isDebugEnabled()) { - // Log only if the file is big enough to be splitted - if (length > Math.min(file.getBlockSize(), minSize)) { - LOG.debug("File is not splittable so no parallelization " - + "is possible: " + file.getPath()); - } + if (bytesRemaining != 0) { + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length + - bytesRemaining, bytesRemaining, clusterMap); + splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, + splitHosts[0], splitHosts[1])); + } + } else { + if (LOG.isDebugEnabled()) { + // Log only if the file is big enough to be splitted + if (length > Math.min(file.getBlockSize(), minSize)) { + LOG.debug("File is not splittable so no parallelization " + + "is possible: " + file.getPath()); } - String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } - } else { - //Create empty hosts array for zero length files - splits.add(makeSplit(path, 0, length, new String[0])); + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } sw.stop();