-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19604. ABFS: Full blob md5 computation during flush change to be config driven #7853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
59a4ea3
a8e0488
92c975c
a161e06
538a2cd
2bd7ce0
6f24907
6b138c0
8292192
f15c5e6
7f8c465
c42ac75
d0f2aea
07c396c
5e6298a
d951cc0
9ae0198
5f48139
ef1db63
4b4b7a4
513511a
dbb743f
b28abe2
3c249d9
4c24330
1200cb2
493484e
144ba1a
f8a0a64
20f3582
4fd400b
c5f32be
f9a3529
53a39c4
4e5efa2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -438,6 +438,10 @@ public class AbfsConfiguration{ | |
| FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION) | ||
| private boolean isChecksumValidationEnabled; | ||
|
|
||
| @BooleanConfigurationValidatorAnnotation(ConfigurationKey = | ||
| FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION) | ||
| private boolean isFullBlobChecksumValidationEnabled; | ||
|
|
||
| @BooleanConfigurationValidatorAnnotation(ConfigurationKey = | ||
| FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE) | ||
| private boolean isPaginatedDeleteEnabled; | ||
|
|
@@ -1705,6 +1709,10 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) | |
| this.isChecksumValidationEnabled = isChecksumValidationEnabled; | ||
| } | ||
|
|
||
| public boolean isFullBlobChecksumValidationEnabled() { | ||
|
||
| return isFullBlobChecksumValidationEnabled; | ||
| } | ||
|
|
||
| public long getBlobCopyProgressPollWaitMillis() { | ||
| return blobCopyProgressPollWaitMillis; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -1073,11 +1073,15 @@ public AbfsRestOperation flush(byte[] buffer, | |||||
| requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, String.valueOf(buffer.length))); | ||||||
| requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML)); | ||||||
| requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); | ||||||
| String md5Hash = null; | ||||||
| if (leaseId != null) { | ||||||
| requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); | ||||||
| } | ||||||
| if (blobMd5 != null) { | ||||||
| if (isFullBlobChecksumValidationEnabled() && blobMd5 != null) { | ||||||
| requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); | ||||||
| } else { | ||||||
anujmodi2021 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| md5Hash = computeMD5Hash(buffer, 0, buffer.length); | ||||||
| requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Hash)); | ||||||
|
||||||
| } | ||||||
| final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||||||
| abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST); | ||||||
|
|
@@ -1103,8 +1107,21 @@ public AbfsRestOperation flush(byte[] buffer, | |||||
| AbfsRestOperation op1 = getPathStatus(path, true, tracingContext, | ||||||
| contextEncryptionAdapter); | ||||||
| String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5); | ||||||
| if (blobMd5 != null && !blobMd5.equals(metadataMd5)) { | ||||||
| throw ex; | ||||||
| /* | ||||||
| * Validate the response by comparing the server's MD5 metadata against either: | ||||||
| * 1. The full blob content MD5 (if full blob checksum validation is enabled), or | ||||||
| * 2. The full block ID buffer MD5 (fallback if blob checksum validation is disabled) | ||||||
| */ | ||||||
| if (getAbfsConfiguration().isFullBlobChecksumValidationEnabled() && blobMd5 != null) { | ||||||
anujmodi2021 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| if (getAbfsConfiguration().isFullBlobChecksumValidationEnabled() && blobMd5 != null) { | |
| if (fullBlobChecksumValidationEnabled && blobMd5 != null) { |
anujmodi2021 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1423,6 +1423,17 @@ protected boolean isChecksumValidationEnabled() { | |
| return getAbfsConfiguration().getIsChecksumValidationEnabled(); | ||
| } | ||
|
|
||
| /** | ||
| * Conditions check for allowing checksum support for write operation. | ||
| * Server will support this if client sends the MD5 Hash as a request header. | ||
| * For azure stoage service documentation and more details refer to | ||
|
||
| * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">Path - Update Azure Rest API</a>. | ||
| * @return true if full blob checksum validation enabled. | ||
| */ | ||
| protected boolean isFullBlobChecksumValidationEnabled() { | ||
| return getAbfsConfiguration().isFullBlobChecksumValidationEnabled(); | ||
| } | ||
|
|
||
| /** | ||
| * Compute MD5Hash of the given byte array starting from given offset up to given length. | ||
| * @param data byte array from which data is fetched to compute MD5 Hash. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -223,7 +223,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) | |
| md5 = MessageDigest.getInstance(MD5); | ||
| fullBlobContentMd5 = MessageDigest.getInstance(MD5); | ||
| } catch (NoSuchAlgorithmException e) { | ||
| if (client.isChecksumValidationEnabled()) { | ||
| if (isChecksumValidationEnabled()) { | ||
| throw new IOException("MD5 algorithm not available", e); | ||
| } | ||
| } | ||
|
|
@@ -464,10 +464,13 @@ public synchronized void write(final byte[] data, final int off, final int lengt | |
| AbfsBlock block = createBlockIfNeeded(position); | ||
| int written = bufferData(block, data, off, length); | ||
| // Update the incremental MD5 hash with the written data. | ||
| getMessageDigest().update(data, off, written); | ||
|
|
||
| if (isChecksumValidationEnabled()) { | ||
| getMessageDigest().update(data, off, written); | ||
| } | ||
| // Update the full blob MD5 hash with the written data. | ||
| getFullBlobContentMd5().update(data, off, written); | ||
| if (isFullBlobChecksumValidationEnabled()) { | ||
| getFullBlobContentMd5().update(data, off, written); | ||
| } | ||
| int remainingCapacity = block.remainingCapacity(); | ||
|
|
||
| if (written < length) { | ||
|
|
@@ -544,7 +547,12 @@ private void uploadBlockAsync(AbfsBlock blockToUpload, | |
| outputStreamStatistics.bytesToUpload(bytesLength); | ||
| outputStreamStatistics.writeCurrentBuffer(); | ||
| DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload(); | ||
| String md5Hash = getMd5(); | ||
| String md5Hash; | ||
| if (getClient().getAbfsConfiguration().getIsChecksumValidationEnabled()) { | ||
|
||
| md5Hash = getMd5(); | ||
|
||
| } else { | ||
| md5Hash = null; | ||
| } | ||
| final Future<Void> job = | ||
| executorService.submit(() -> { | ||
| AbfsPerfTracker tracker = | ||
|
|
@@ -1222,6 +1230,20 @@ public MessageDigest getFullBlobContentMd5() { | |
| return fullBlobContentMd5; | ||
| } | ||
|
|
||
| /** | ||
| * @return true if checksum validation is enabled. | ||
| */ | ||
| public boolean isChecksumValidationEnabled() { | ||
| return getClient().isChecksumValidationEnabled(); | ||
| } | ||
|
|
||
| /** | ||
| * @return true if full blob checksum validation is enabled. | ||
| */ | ||
| public boolean isFullBlobChecksumValidationEnabled() { | ||
| return getClient().isFullBlobChecksumValidationEnabled(); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the Base64-encoded MD5 checksum based on the current digest state. | ||
| * This finalizes the digest calculation. Returns null if the digest is empty. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -179,7 +179,10 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, | |
| tracingContextFlush.setIngressHandler(DFS_FLUSH); | ||
| tracingContextFlush.setPosition(String.valueOf(offset)); | ||
| } | ||
| String fullBlobMd5 = computeFullBlobMd5(); | ||
| String fullBlobMd5 = null; | ||
| if (getClient().isFullBlobChecksumValidationEnabled()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the check getClient().isFullBlobChecksumValidationEnabled() is common in both DFS, Blob ingress handlers- we can add it as a common method in the abstract class itself
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would not result in any major improvement as it's just called at 2 places |
||
| fullBlobMd5 = computeFullBlobMd5(); | ||
| } | ||
| LOG.trace("Flushing data at offset {} and path {}", offset, getAbfsOutputStream().getPath()); | ||
| AbfsRestOperation op; | ||
| try { | ||
|
|
@@ -194,7 +197,9 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, | |
| getAbfsOutputStream().getPath(), offset, ex); | ||
| throw ex; | ||
| } finally { | ||
| getAbfsOutputStream().getFullBlobContentMd5().reset(); | ||
| if (getClient().isFullBlobChecksumValidationEnabled()) { | ||
| getAbfsOutputStream().getFullBlobContentMd5().reset(); | ||
| } | ||
| } | ||
| return op; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,7 @@ | |
|
|
||
| import static java.net.HttpURLConnection.HTTP_CONFLICT; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION; | ||
| import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted; | ||
| import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsDirectory; | ||
| import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; | ||
|
|
@@ -113,7 +114,10 @@ public void testReadFile() throws Exception { | |
| boolean[] createFileWithAbfs = new boolean[]{false, true, false, true}; | ||
| boolean[] readFileWithAbfs = new boolean[]{false, true, true, false}; | ||
|
|
||
| AzureBlobFileSystem abfs = getFileSystem(); | ||
| Configuration conf = getRawConfiguration(); | ||
| conf.setBoolean(FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, true); | ||
| FileSystem fileSystem = FileSystem.newInstance(conf); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When creating new instance, we need to close it. Can you check for all the tests in this class and make sure any new instance sis getting closed within test itself?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. taken |
||
| AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; | ||
| // test only valid for non-namespace enabled account | ||
| Assume.assumeFalse("Namespace enabled account does not support this test", | ||
| getIsNamespaceEnabled(abfs)); | ||
|
|
@@ -412,7 +416,10 @@ public void testScenario2() throws Exception { | |
| */ | ||
| @Test | ||
| public void testScenario3() throws Exception { | ||
| AzureBlobFileSystem abfs = getFileSystem(); | ||
| Configuration conf = getRawConfiguration(); | ||
| conf.setBoolean(FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, true); | ||
| FileSystem fileSystem = FileSystem.newInstance(conf); | ||
| AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; | ||
| Assume.assumeFalse("Namespace enabled account does not support this test", | ||
| getIsNamespaceEnabled(abfs)); | ||
| Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -166,7 +166,12 @@ public static void setMockAbfsRestOperationForFlushOperation( | |
| requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, String.valueOf(buffer.length))); | ||
| requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML)); | ||
| requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); | ||
| requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); | ||
| if (spiedClient.isFullBlobChecksumValidationEnabled()) { | ||
|
||
| requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); | ||
| } else { | ||
| requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, | ||
| spiedClient.computeMD5Hash(buffer, 0, buffer.length))); | ||
| } | ||
| final AbfsUriQueryBuilder abfsUriQueryBuilder = spiedClient.createDefaultUriQueryBuilder(); | ||
| abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST); | ||
| abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(false)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -427,7 +427,7 @@ public void testNoNetworkCallsForSecondFlush() throws Exception { | |||||
| Mockito.any(TracingContext.class)); | ||||||
| Mockito.verify(blobClient, Mockito.times(1)). | ||||||
| flush(Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any(), | ||||||
| Mockito.any(TracingContext.class), Mockito.anyString()); | ||||||
| Mockito.any(TracingContext.class), Mockito.nullable(String.class)); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
|
|
@@ -481,6 +481,8 @@ public void testResetCalledOnExceptionInRemoteFlush() throws Exception { | |||||
| //expected exception | ||||||
| } | ||||||
| // Verify that reset was called on the message digest | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a similar test where with config disabled we assert that methods to compute md5 hash were not called at all?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. taken |
||||||
| Mockito.verify(mockMessageDigest, Mockito.times(1)).reset(); | ||||||
| if (spiedClient.isChecksumValidationEnabled()) { | ||||||
|
||||||
| if (spiedClient.isChecksumValidationEnabled()) { | |
| if (spiedClient.isFullBlobChecksumValidationEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: assert message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
Uh oh!
There was an error while loading. Please reload this page.