Skip to content

HADOOP-19604. ABFS: Full blob md5 computation during flush change to be config driven #7853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
59a4ea3
ABFS WASB Compatibility testing
anmolanmol1234 Jun 30, 2025
a8e0488
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Jul 1, 2025
92c975c
Fix md5 test
anmolanmol1234 Jul 1, 2025
a161e06
fix unused param
anmolanmol1234 Jul 1, 2025
538a2cd
Test changes
anmolanmol1234 Jul 3, 2025
2bd7ce0
remove unused imports
anmolanmol1234 Jul 3, 2025
6f24907
fix build issue
anmolanmol1234 Jul 3, 2025
6b138c0
merge trunk
anmolanmol1234 Jul 3, 2025
8292192
fix main
anmolanmol1234 Jul 3, 2025
f15c5e6
fix javadocs
anmolanmol1234 Jul 3, 2025
7f8c465
checktsyle fixes
anmolanmol1234 Jul 3, 2025
c42ac75
fix test
anmolanmol1234 Jul 3, 2025
d0f2aea
PR comments
anmolanmol1234 Jul 9, 2025
07c396c
revert config change
anmolanmol1234 Jul 9, 2025
5e6298a
remove unintended changes
anmolanmol1234 Jul 9, 2025
d951cc0
fix comments
anmolanmol1234 Jul 9, 2025
9ae0198
PR comments
anmolanmol1234 Jul 9, 2025
5f48139
unused import
anmolanmol1234 Jul 10, 2025
ef1db63
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Jul 10, 2025
4b4b7a4
PR review comments
anmolanmol1234 Jul 18, 2025
513511a
checkstyle
anmolanmol1234 Jul 18, 2025
dbb743f
Variable name correction
anmolanmol1234 Jul 21, 2025
b28abe2
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Jul 31, 2025
3c249d9
md5 config changes
anmolanmol1234 Aug 1, 2025
4c24330
checkstyle fixes
anmolanmol1234 Aug 5, 2025
1200cb2
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Aug 5, 2025
493484e
Merge branch 'apache:trunk' into HADOOP-19604
anmolanmol1234 Aug 5, 2025
144ba1a
PR commnets
anmolanmol1234 Aug 8, 2025
f8a0a64
Merge branch 'HADOOP-19604' of https://github.com/anmolanmol1234/hado…
anmolanmol1234 Aug 8, 2025
20f3582
null checks
anmolanmol1234 Aug 8, 2025
4fd400b
Resolve merge conflicts
anmolanmol1234 Aug 20, 2025
c5f32be
PR comments
anmolanmol1234 Aug 20, 2025
f9a3529
remove unused import
anmolanmol1234 Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ public class AbfsConfiguration{
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
private boolean isChecksumValidationEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION)
private boolean isFullBlobChecksumValidationEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;
Expand Down Expand Up @@ -1705,6 +1709,10 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled)
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}

public boolean isFullBlobChecksumValidationEnabled() {
Copy link
Preview

Copilot AI Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getter method isFullBlobChecksumValidationEnabled() lacks a corresponding setter method, which breaks the pattern established by other configuration properties like isChecksumValidationEnabled. This could cause issues for programmatic configuration changes.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setter is not used anywhere

return isFullBlobChecksumValidationEnabled;
}

public long getBlobCopyProgressPollWaitMillis() {
return blobCopyProgressPollWaitMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ public final class ConfigurationKeys {
/** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";

/** Add extra layer of verification of the integrity of the full blob request content during transport: {@value}. */
public static final String FS_AZURE_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION = "fs.azure.enable.full.blob.checksum.validation";

public static String accountProperty(String property, String account) {
return property + DOT + account;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false;
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;
public static final boolean DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION = false;

/**
* Limit of queued block upload operations before writes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,9 +1076,10 @@ public AbfsRestOperation flush(byte[] buffer,
if (leaseId != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
}
if (blobMd5 != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5));
}
String md5Value = (isFullBlobChecksumValidationEnabled() && blobMd5 != null)
? blobMd5
: computeMD5Hash(buffer, 0, buffer.length);
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Value));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
Expand All @@ -1103,7 +1104,12 @@ public AbfsRestOperation flush(byte[] buffer,
AbfsRestOperation op1 = getPathStatus(path, true, tracingContext,
contextEncryptionAdapter);
String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5);
if (blobMd5 != null && !blobMd5.equals(metadataMd5)) {
/*
* Validate the response by comparing the server's MD5 metadata against either:
* 1. The full blob content MD5 (if full blob checksum validation is enabled), or
* 2. The full block ID list buffer MD5 (fallback if blob checksum validation is disabled)
*/
if (md5Value != null && !md5Value.equals(metadataMd5)) {
throw ex;
}
return op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1415,14 +1415,25 @@ protected boolean isChecksumValidationEnabled(List<AbfsHttpHeader> requestHeader
/**
* Conditions check for allowing checksum support for write operation.
* Server will support this if client sends the MD5 Hash as a request header.
* For azure stoage service documentation and more details refer to
* For azure storage service documentation and more details refer to
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">Path - Update Azure Rest API</a>.
* @return true if checksum validation enabled.
*/
protected boolean isChecksumValidationEnabled() {
return getAbfsConfiguration().getIsChecksumValidationEnabled();
}

/**
* Conditions check for allowing checksum support for write operation.
* Server will support this if client sends the MD5 Hash as a request header.
* For azure storage service documentation and more details refer to
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">Path - Update Azure Rest API</a>.
* @return true if full blob checksum validation enabled.
*/
protected boolean isFullBlobChecksumValidationEnabled() {
return getAbfsConfiguration().isFullBlobChecksumValidationEnabled();
}

/**
* Compute MD5Hash of the given byte array starting from given offset up to given length.
* @param data byte array from which data is fetched to compute MD5 Hash.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,10 +867,9 @@ public AbfsRestOperation flush(final String path,
if (leaseId != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
}
if (isChecksumValidationEnabled() && blobMd5 != null) {
if (isFullBlobChecksumValidationEnabled() && blobMd5 != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5));
}

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
md5 = MessageDigest.getInstance(MD5);
fullBlobContentMd5 = MessageDigest.getInstance(MD5);
} catch (NoSuchAlgorithmException e) {
if (client.isChecksumValidationEnabled()) {
if (isChecksumValidationEnabled()) {
throw new IOException("MD5 algorithm not available", e);
}
}
Expand Down Expand Up @@ -464,10 +464,13 @@ public synchronized void write(final byte[] data, final int off, final int lengt
AbfsBlock block = createBlockIfNeeded(position);
int written = bufferData(block, data, off, length);
// Update the incremental MD5 hash with the written data.
getMessageDigest().update(data, off, written);

if (isChecksumValidationEnabled()) {
getMessageDigest().update(data, off, written);
}
// Update the full blob MD5 hash with the written data.
getFullBlobContentMd5().update(data, off, written);
if (isFullBlobChecksumValidationEnabled()) {
getFullBlobContentMd5().update(data, off, written);
}
int remainingCapacity = block.remainingCapacity();

if (written < length) {
Expand Down Expand Up @@ -544,7 +547,7 @@ private void uploadBlockAsync(AbfsBlock blockToUpload,
outputStreamStatistics.bytesToUpload(bytesLength);
outputStreamStatistics.writeCurrentBuffer();
DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
String md5Hash = getMd5();
String md5Hash = getClient().isChecksumValidationEnabled() ? getMd5() : null;
final Future<Void> job =
executorService.submit(() -> {
AbfsPerfTracker tracker =
Expand Down Expand Up @@ -1222,6 +1225,20 @@ public MessageDigest getFullBlobContentMd5() {
return fullBlobContentMd5;
}

/**
* @return true if checksum validation is enabled.
*/
public boolean isChecksumValidationEnabled() {
return getClient().isChecksumValidationEnabled();
}

/**
* @return true if full blob checksum validation is enabled.
*/
public boolean isFullBlobChecksumValidationEnabled() {
return getClient().isFullBlobChecksumValidationEnabled();
}

/**
* Returns the Base64-encoded MD5 checksum based on the current digest state.
* This finalizes the digest calculation. Returns null if the digest is empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ protected synchronized AbfsBlock createBlockInternal(long position)
setBlockCount(getBlockCount() + 1);
AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), position, getBlockIdLength(), getBlockCount());
activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), activeBlock.getOffset()));
getAbfsOutputStream().getMessageDigest().reset();
if (getAbfsOutputStream().isChecksumValidationEnabled()) {
getAbfsOutputStream().getMessageDigest().reset();
}
setActiveBlock(activeBlock);
}
return getActiveBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
tracingContextFlush.setIngressHandler(BLOB_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
LOG.trace("Flushing data at offset {} for path {}", offset, getAbfsOutputStream().getPath());
String fullBlobMd5 = computeFullBlobMd5();
String fullBlobMd5 = null;
if (getClient().isFullBlobChecksumValidationEnabled()) {
fullBlobMd5 = computeFullBlobMd5();
}
op = getClient().flush(blockListXml.getBytes(StandardCharsets.UTF_8),
getAbfsOutputStream().getPath(),
isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
Expand All @@ -194,7 +197,9 @@ isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
LOG.error("Error in remote flush for path {} and offset {}", getAbfsOutputStream().getPath(), offset, ex);
throw ex;
} finally {
getAbfsOutputStream().getFullBlobContentMd5().reset();
if (getClient().isFullBlobChecksumValidationEnabled()) {
getAbfsOutputStream().getFullBlobContentMd5().reset();
}
}
return op;
}
Expand All @@ -221,7 +226,7 @@ protected AbfsRestOperation remoteAppendBlobWrite(String path,
AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException {
// Perform the remote append operation using the blob client.
AbfsRestOperation op = null;
AbfsRestOperation op;
try {
op = blobClient.appendBlock(path, reqParams, uploadData.toByteArray(), tracingContext);
} catch (AbfsRestOperationException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ protected synchronized AbfsBlock createBlockInternal(long position)
if (getActiveBlock() == null) {
setBlockCount(getBlockCount() + 1);
AbfsBlock activeBlock = new AbfsBlock(getAbfsOutputStream(), position);
getAbfsOutputStream().getMessageDigest().reset();
if (getAbfsOutputStream().isChecksumValidationEnabled()) {
getAbfsOutputStream().getMessageDigest().reset();
}
setActiveBlock(activeBlock);
}
return getActiveBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
tracingContextFlush.setIngressHandler(DFS_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
}
String fullBlobMd5 = computeFullBlobMd5();
String fullBlobMd5 = null;
if (getClient().isFullBlobChecksumValidationEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the check getClient().isFullBlobChecksumValidationEnabled() is common in both DFS, Blob ingress handlers- we can add it as a common method in the abstract class itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would not result in any major improvement as it's just called at 2 places

fullBlobMd5 = computeFullBlobMd5();
}
LOG.trace("Flushing data at offset {} and path {}", offset, getAbfsOutputStream().getPath());
AbfsRestOperation op;
try {
Expand All @@ -194,7 +197,9 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
getAbfsOutputStream().getPath(), offset, ex);
throw ex;
} finally {
getAbfsOutputStream().getFullBlobContentMd5().reset();
if (getClient().isFullBlobChecksumValidationEnabled()) {
getAbfsOutputStream().getFullBlobContentMd5().reset();
}
}
return op;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,18 @@ void createRenamePendingJson(Path path, byte[] bytes)
String blockId = generateBlockId();
String blockList = generateBlockListXml(blockId);
byte[] buffer = blockList.getBytes(StandardCharsets.UTF_8);
String computedMd5 = abfsClient.computeMD5Hash(buffer, 0, buffer.length);
String computedMd5 = null;
if (abfsClient.isFullBlobChecksumValidationEnabled()) {
computedMd5 = abfsClient.computeMD5Hash(buffer, 0, buffer.length);
}

AppendRequestParameters appendRequestParameters
= new AppendRequestParameters(0, 0,
bytes.length, AppendRequestParameters.Mode.APPEND_MODE, false, null,
abfsClient.getAbfsConfiguration().isExpectHeaderEnabled(),
new BlobAppendRequestParameters(blockId, eTag), abfsClient.computeMD5Hash(bytes, 0, bytes.length));
new BlobAppendRequestParameters(blockId, eTag),
abfsClient.isChecksumValidationEnabled() ? abfsClient.computeMD5Hash(
bytes, 0, bytes.length) : null);

abfsClient.append(path.toUri().getPath(), bytes,
appendRequestParameters, null, null, tracingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2125,7 +2125,7 @@ private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem fs)
Mockito.anyBoolean(), Mockito.nullable(String.class),
Mockito.nullable(String.class), Mockito.anyString(),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.any(TracingContext.class), Mockito.anyString());
Mockito.any(TracingContext.class), Mockito.nullable(String.class));
return createAnswer.callRealMethod();
};
RenameAtomicityTestUtils.addCreatePathMock(client,
Expand Down
Loading