-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19604. ABFS: Full blob md5 computation during flush change to be config driven #7853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a configurable flag to control full blob MD5 validation during PutBlockList (flush) operations in the Azure Blob File System (ABFS). Previously, full blob MD5 computation was always performed, leading to increased latency and CPU usage for large blobs.
Key changes:
- Added
FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION
configuration option (disabled by default) - Modified flush operations to conditionally compute full blob MD5 based on the configuration
- Updated test cases to accommodate the new configuration-driven behavior
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
AbfsConfiguration.java | Added configuration field and getter for full blob checksum validation |
ConfigurationKeys.java | Defined new configuration key for full blob checksum validation |
FileSystemConfigurations.java | Added default value (false) for full blob checksum validation |
AbfsClient.java | Added method to check if full blob checksum validation is enabled |
AbfsBlobClient.java | Modified flush operation to conditionally use full blob MD5 or block ID hash |
AbfsDfsClient.java | Updated flush to only add MD5 header when full blob validation is enabled |
AbfsOutputStream.java | Added helper methods and conditional MD5 computation based on configuration |
AzureBlobIngressHandler.java | Made full blob MD5 computation conditional in flush operations |
AzureDFSIngressHandler.java | Made full blob MD5 computation conditional in flush operations |
AzureBlobBlockManager.java | Added conditional check for MD5 reset |
AzureDFSBlockManager.java | Added conditional check for MD5 reset |
RenameAtomicity.java | Updated to use conditional MD5 computation |
Test files | Updated test cases to handle new configuration and mock behaviors |
@@ -1705,6 +1709,10 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) | |||
this.isChecksumValidationEnabled = isChecksumValidationEnabled; | |||
} | |||
|
|||
public boolean isFullBlobChecksumValidationEnabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getter method isFullBlobChecksumValidationEnabled()
lacks a corresponding setter method, which breaks the pattern established by other configuration properties like isChecksumValidationEnabled
. This could cause issues for programmatic configuration changes.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setter is not used anywhere
String md5Hash = null; | ||
if (leaseId != null) { | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); | ||
} | ||
if (blobMd5 != null) { | ||
if (isFullBlobChecksumValidationEnabled() && blobMd5 != null) { | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); | ||
} else { | ||
md5Hash = computeMD5Hash(buffer, 0, buffer.length); | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Hash)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable md5Hash
is declared but only used in the else branch. Consider declaring it within the else block to improve code clarity and reduce the scope of the variable.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used later in catch block hence declared outside
* 1. The full blob content MD5 (if full blob checksum validation is enabled), or | ||
* 2. The full block ID buffer MD5 (fallback if blob checksum validation is disabled) | ||
*/ | ||
if (getAbfsConfiguration().isFullBlobChecksumValidationEnabled() && blobMd5 != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configuration check getAbfsConfiguration().isFullBlobChecksumValidationEnabled()
is performed inside the response validation logic, but this same check was already done earlier in the method. Consider storing the result in a variable to avoid redundant method calls.
if (getAbfsConfiguration().isFullBlobChecksumValidationEnabled() && blobMd5 != null) { | |
if (fullBlobChecksumValidationEnabled && blobMd5 != null) { |
Copilot uses AI. Check for mistakes.
@@ -481,6 +481,8 @@ public void testResetCalledOnExceptionInRemoteFlush() throws Exception { | |||
//expected exception | |||
} | |||
// Verify that reset was called on the message digest | |||
Mockito.verify(mockMessageDigest, Mockito.times(1)).reset(); | |||
if (spiedClient.isChecksumValidationEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is checking isChecksumValidationEnabled()
but the code change suggests this should be checking isFullBlobChecksumValidationEnabled()
since the MD5 reset behavior is now conditional on full blob checksum validation, not general checksum validation.
if (spiedClient.isChecksumValidationEnabled()) { | |
if (spiedClient.isFullBlobChecksumValidationEnabled()) { |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
Outdated
Show resolved
Hide resolved
...-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
Outdated
Show resolved
Hide resolved
...-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
Outdated
Show resolved
Hide resolved
...-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
Outdated
Show resolved
Hide resolved
@@ -544,7 +547,12 @@ private void uploadBlockAsync(AbfsBlock blockToUpload, | |||
outputStreamStatistics.bytesToUpload(bytesLength); | |||
outputStreamStatistics.writeCurrentBuffer(); | |||
DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload(); | |||
String md5Hash = getMd5(); | |||
String md5Hash; | |||
if (getClient().getAbfsConfiguration().getIsChecksumValidationEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isChecksumValidationEnabled()
can be used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
AzureBlobFileSystem abfs = getFileSystem(); | ||
Configuration conf = getRawConfiguration(); | ||
conf.setBoolean(FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, true); | ||
FileSystem fileSystem = FileSystem.newInstance(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When creating new instance, we need to close it. Can you check for all the tests in this class and make sure any new instance sis getting closed within test itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -166,7 +166,12 @@ public static void setMockAbfsRestOperationForFlushOperation( | |||
requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, String.valueOf(buffer.length))); | |||
requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML)); | |||
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); | |||
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); | |||
if (spiedClient.isFullBlobChecksumValidationEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified by having a single statement for adding blobMd5 to request header.
blobMd5 value can be computed using a trilean operator.
Similar can be used in production code to make sure one of the way is used to compute Md5 and variable is never null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -481,6 +481,8 @@ public void testResetCalledOnExceptionInRemoteFlush() throws Exception { | |||
//expected exception | |||
} | |||
// Verify that reset was called on the message digest | |||
Mockito.verify(mockMessageDigest, Mockito.times(1)).reset(); | |||
if (spiedClient.isChecksumValidationEnabled()) { | |||
Mockito.verify(mockMessageDigest, Mockito.times(1)).reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: assert message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -481,6 +481,8 @@ public void testResetCalledOnExceptionInRemoteFlush() throws Exception { | |||
//expected exception | |||
} | |||
// Verify that reset was called on the message digest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a similar test where with config disabled we assert that methods to compute md5 hash were not called at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
/** | ||
* Conditions check for allowing checksum support for write operation. | ||
* Server will support this if client sends the MD5 Hash as a request header. | ||
* For azure stoage service documentation and more details refer to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: storage spelling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
String md5Hash = getMd5(); | ||
String md5Hash; | ||
if (getClient().getAbfsConfiguration().getIsChecksumValidationEnabled()) { | ||
md5Hash = getMd5(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change it to: String md5Hash = tisChecksumValidationEnabled() ? getMd5() : null;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -179,7 +179,10 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, | |||
tracingContextFlush.setIngressHandler(DFS_FLUSH); | |||
tracingContextFlush.setPosition(String.valueOf(offset)); | |||
} | |||
String fullBlobMd5 = computeFullBlobMd5(); | |||
String fullBlobMd5 = null; | |||
if (getClient().isFullBlobChecksumValidationEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the check getClient().isFullBlobChecksumValidationEnabled() is common in both DFS, Blob ingress handlers- we can add it as a common method in the abstract class itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would not result in any major improvement as it's just called at 2 places
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking in comments.
I have a few test code suggestions. Rest LGTM
*/ | ||
@Test | ||
public void testwriteFile() throws Exception { | ||
try (AzureBlobFileSystem abfs = getFileSystem()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Here we are not creating a new instance. We are using the instance created by base class and that will be autoclosed during tear down. try() is redundant here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
nativeFsStream.flush(); | ||
nativeFsStream.hsync(); | ||
} | ||
try (AzureBlobFileSystem abfs = getFileSystem()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above and a few places below, no need to close this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
assertEquals(path3, abfs.getWorkingDirectory()); | ||
try (AzureBlobFileSystem abfs = getFileSystem()) { | ||
// test only valid for non-namespace enabled account | ||
Assume.assumeFalse("Namespace enabled account does not support this test", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use base class method assumeHnsDisabled()
for better readability here and other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Also I suppose all the tests in this class need FNS account, may be we canmove this assume in constuctor itself to skip whole file instead of checking individual tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
nativeFsStream.flush(); | ||
nativeFsStream.hsync(); | ||
try (AzureBlobFileSystem abfs = getFileSystem()) { | ||
Assume.assumeFalse("Namespace enabled account does not support this test", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move all the assume into a common place and they could be first line in each method.
Configuration conf = getRawConfiguration(); | ||
conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); | ||
FileSystem fileSystem = FileSystem.newInstance(conf); | ||
AzureBlobFileSystem fs = (AzureBlobFileSystem) fileSystem; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try() needed here as it is a new Instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
public void testNoChecksumComputedWhenConfigFalse() throws Exception { | ||
Configuration conf = getRawConfiguration(); | ||
conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, false); | ||
FileSystem fileSystem = FileSystem.newInstance(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try() needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
This PR adds a configuration flag to control the use of full blob MD5 validation during the PutBlockList (flush) operation. The functionality to validate the MD5 hash of the entire blob already existed, but it could not be toggled. With this change, the feature is now configurable and is disabled by default. When the config is set to false, the system uses the default block ID hash for integrity checks. When set to true, it performs full blob MD5 validation. This config has been introduced because full blob MD5 computation can lead to increased latency and higher CPU usage, especially for large blobs.