-
Notifications
You must be signed in to change notification settings - Fork 9.1k
MAPREDUCE-7508.FileInputFormat can throw ArrayIndexOutofBoundsException because of some concurrent execution. #7859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…on because of some concurrent execution.
LGTM! |
💔 -1 overall
This message was automatically generated. |
@@ -369,6 +369,9 @@ public InputSplit[] getSplits(JobConf job, int numSplits) | |||
} else { | |||
blkLocations = fs.getFileBlockLocations(file, 0, length); | |||
} | |||
if (blkLocations.length == 0){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contributions. Just wonder why it will meet length != 0
but blkLocations.length == 0
, some corner case that lead inconsistent metadata of NameNode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hexiaoqiao thanks for review.
It is a rare corner case, only happens when using spark streaming monitors a path where the upstream system sometimes starts two identical tasks that attempt to create and write to the same HDFS file simultaneously. This can lead to conflicts where a file is created and written to twice in quick succession.
When Spark scans for files, it uses the FileInputFormat.getSplits() method to split the file. The first step in getSplits is to retrieve the file's length. If the file length is not zero, the next step is to get the block locations array for that file. However, if the two upstream programs rapidly create and write to the same file (i.e., the file is overwritten or appended to almost simultaneously), a race condition may occur:
The file's length is already non-zero, but calling getFileBlockLocations() returns an empty array because the file is being overwritten or is not yet fully written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Got it. Make sense to me. +1 to check in.
BTW, the root cause here is invoke listStatus
to get file status and invoke another interface getFileBlockLocations
to get block location, but file has changed between this steps, right? If it is true, is it proper to use blkLocations
only as condition at L364 rather than file.length
which could be not the correct result here? Thanks again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Got it. Make sense to me. +1 to check in. BTW, the root cause here is invoke
listStatus
to get file status and invoke another interfacegetFileBlockLocations
to get block location, but file has changed between this steps, right? If it is true, is it proper to useblkLocations
only as condition at L364 rather thanfile.length
which could be not the correct result here? Thanks again.
Thanks for your review. It is a good solution to use blkLocations
only as condition at L364, if blkLocations array is empty, file.length
is also empty, this will not effect the later opretions.
…on because of some concurrent execution.
Description of PR
As Described in MAPREDUCE-7508
When Spark scans for files, it uses the FileInputFormat.getSplits() method to split the file. The first step in getSplits is to retrieve the file's length. If the file length is not zero, the next step is to get the block locations array for that file. However, if the two upstream programs rapidly create and write to the same file (i.e., the file is overwritten or appended to almost simultaneously), a race condition may occur:
The file's length is already non-zero,
but calling getFileBlockLocations() returns an empty array because the file is being overwritten or is not yet fully written.
When this happens, subsequent logic in getSplits (such as accessing the last element of the block locations array) will throw an ArrayIndexOutOfBoundsException because the block locations array is unexpectedly empty.
How was this patch tested?
I rebuild the project and ran on our cluster, spark did not throw Execptions.
For code changes:
If Array
blkLocations
is empty, it will continue to next iteration, so that it will now find the the last blockLocation of this file.LICENSE
,LICENSE-binary
,NOTICE-binary
files?