From 42fc65701dc446b76a3944c7cb5ed6fce02fa80e Mon Sep 17 00:00:00 2001 From: Steve Yoo Date: Mon, 18 Aug 2025 10:49:52 -0400 Subject: [PATCH 1/3] Validate full object checksums for multipart downloads --- awscli/botocore/httpchecksum.py | 16 ++++ awscli/s3transfer/checksums.py | 163 ++++++++++++++++++++++++++++++++ awscli/s3transfer/download.py | 81 +++++++++++++++- awscli/s3transfer/futures.py | 8 ++ 4 files changed, 267 insertions(+), 1 deletion(-) create mode 100644 awscli/s3transfer/checksums.py diff --git a/awscli/botocore/httpchecksum.py b/awscli/botocore/httpchecksum.py index 02fb105384c6..1cec6ee32a5a 100644 --- a/awscli/botocore/httpchecksum.py +++ b/awscli/botocore/httpchecksum.py @@ -75,6 +75,10 @@ def update(self, chunk): def digest(self): return self._int_crc32.to_bytes(4, byteorder="big") + @property + def int_crc(self): + return self._int_crc32 + class CrtCrc32Checksum(BaseChecksum): # Note: This class is only used if the CRT is available @@ -88,6 +92,10 @@ def update(self, chunk): def digest(self): return self._int_crc32.to_bytes(4, byteorder="big") + @property + def int_crc(self): + return self._int_crc32 + class CrtCrc32cChecksum(BaseChecksum): # Note: This class is only used if the CRT is available @@ -101,6 +109,10 @@ def update(self, chunk): def digest(self): return self._int_crc32c.to_bytes(4, byteorder="big") + @property + def int_crc(self): + return self._int_crc32c + class CrtCrc64NvmeChecksum(BaseChecksum): # Note: This class is only used if the CRT is available @@ -114,6 +126,10 @@ def update(self, chunk): def digest(self): return self._int_crc64nvme.to_bytes(8, byteorder="big") + @property + def int_crc(self): + return self._int_crc64nvme + class Sha1Checksum(BaseChecksum): def __init__(self): diff --git a/awscli/s3transfer/checksums.py b/awscli/s3transfer/checksums.py new file mode 100644 index 000000000000..46cc20eba54f --- /dev/null +++ b/awscli/s3transfer/checksums.py @@ -0,0 +1,163 @@ +import base64 +from functools import cached_property + +from botocore.httpchecksum import ( + CrtCrc32cChecksum, + CrtCrc32Checksum, + CrtCrc64NvmeChecksum, +) + + +class StreamingChecksumBody: + def __init__(self, stream, starting_index, checksum_validator): + self._stream = stream + self._starting_index = starting_index + self._checksum = _CRC_CHECKSUM_CLS[ + checksum_validator.checksum_algorithm + ]() + self._checksum_validator = checksum_validator + + def read(self, *args, **kwargs): + value = self._stream.read(*args, **kwargs) + self._checksum.update(value) + if not value: + self._checksum_validator.set_part_checksums( + self._starting_index, self._checksum.int_crc + ) + return value + + +class ChecksumValidator: + def __init__(self, stored_checksum, content_length): + self.checksum_algorithm = list(stored_checksum.keys())[0] + self._checksum_value = stored_checksum[self.checksum_algorithm] + self._combine_function = _CRC_CHECKSUM_TO_COMBINE_FUNCTION[ + self.checksum_algorithm + ] + self._part_checksums = None + self._calculated_checksum = None + self._content_length = content_length + + @cached_property + def calculated_checksum(self): + if self._calculated_checksum is None: + self._combine_part_checksums() + return self._calculated_checksum + + def set_part_checksums(self, offset, checksum): + if self._part_checksums is None: + self._part_checksums = {} + self._part_checksums[offset] = checksum + + def _combine_part_checksums(self): + if self._part_checksums is None: + return + sorted_keys = sorted(self._part_checksums.keys()) + combined = self._part_checksums[sorted_keys[0]] + for i, offset in enumerate(sorted_keys[1:]): + part_checksum = self._part_checksums[offset] + if i + 1 == len(sorted_keys) - 1: + next_offset = self._content_length + else: + next_offset = sorted_keys[i + 2] + offset_len = next_offset - offset + combined = self._combine_function( + combined, part_checksum, offset_len + ) + self._calculated_checksum = base64.b64encode( + combined.to_bytes(4, byteorder='big') + ).decode('ascii') + + def validate(self): + if not self._checksum_value: + return + if self.calculated_checksum != self._checksum_value: + raise Exception( + f"stored: {self._checksum_value} != calculated: {self.calculated_checksum}" + ) + + +def combine_crc32(crc1, crc2, len2): + """ + Combine two CRC32 checksums computed with binascii.crc32. + + This implementation follows the algorithm used in zlib's crc32_combine. + + Args: + crc1: CRC32 checksum of the first data block (from binascii.crc32) + crc2: CRC32 checksum of the second data block (from binascii.crc32) + len2: Length in bytes of the second data block + + Returns: + Combined CRC32 checksum as if the two blocks were concatenated + """ + + # CRC-32 polynomial in reversed bit order + POLY = 0xEDB88320 + + def gf2_matrix_times(mat, vec): + """Multiply matrix by vector over GF(2)""" + result = 0 + for i in range(32): + if vec & (1 << i): + result ^= mat[i] + return result & 0xFFFFFFFF + + def gf2_matrix_square(square, mat): + """Square matrix over GF(2)""" + for n in range(32): + square[n] = gf2_matrix_times(mat, mat[n]) + + # Create initial CRC matrix for 1 bit + odd = [0] * 32 + even = [0] * 32 + + # Build odd matrix (for 1 bit shift) + odd[0] = POLY + for n in range(1, 32): + odd[n] = 1 << (n - 1) + + # Square to get even matrix (for 2 bit shift), then keep squaring + gf2_matrix_square(even, odd) + gf2_matrix_square(odd, even) + + # Process len2 bytes (8 * len2 bits) + length = len2 + + # Process chunks of 3 bits at a time (since we have matrices for 4 and 8 bit shifts) + while length != 0: + # Square matrices to advance to next power of 2 + gf2_matrix_square(even, odd) + if length & 1: + crc1 = gf2_matrix_times(even, crc1) + length >>= 1 + + if length == 0: + break + + gf2_matrix_square(odd, even) + if length & 1: + crc1 = gf2_matrix_times(odd, crc1) + length >>= 1 + + # XOR the two CRCs + crc1 ^= crc2 + + return crc1 & 0xFFFFFFFF + + +_CRC_CHECKSUM_TO_COMBINE_FUNCTION = { + "ChecksumCRC64NVME": None, + "ChecksumCRC32C": None, + "ChecksumCRC32": combine_crc32, +} + + +_CRC_CHECKSUM_CLS = { + "ChecksumCRC64NVME": CrtCrc64NvmeChecksum, + "ChecksumCRC32C": CrtCrc32cChecksum, + "ChecksumCRC32": CrtCrc32Checksum, +} + + +CRC_CHECKSUMS = _CRC_CHECKSUM_TO_COMBINE_FUNCTION.keys() diff --git a/awscli/s3transfer/download.py b/awscli/s3transfer/download.py index 1cb0a65c2320..c26cbbda641f 100644 --- a/awscli/s3transfer/download.py +++ b/awscli/s3transfer/download.py @@ -15,6 +15,11 @@ import threading from botocore.exceptions import ClientError +from s3transfer.checksums import ( + CRC_CHECKSUMS, + ChecksumValidator, + StreamingChecksumBody, +) from s3transfer.compat import seekable from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG @@ -137,6 +142,14 @@ def get_final_io_task(self): """ raise NotImplementedError('must implement get_final_io_task()') + def get_validate_checksum_task(self, checksum_validator): + return ValidateChecksumTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'checksum_validator': checksum_validator, + }, + ) + def _get_fileobj_from_filename(self, filename): f = DeferredOpenFile( filename, mode='wb', open_function=self._osutil.open @@ -350,10 +363,12 @@ def _submit( if ( transfer_future.meta.size is None or transfer_future.meta.etag is None + or transfer_future.meta.stored_checksum is None ): response = client.head_object( Bucket=transfer_future.meta.call_args.bucket, Key=transfer_future.meta.call_args.key, + ChecksumMode="ENABLED", **transfer_future.meta.call_args.extra_args, ) # If a size was not provided figure out the size for the @@ -364,6 +379,8 @@ def _submit( # Provide an etag to ensure a stored object is not modified # during a multipart download. transfer_future.meta.provide_object_etag(response.get('ETag')) + # Provide checksum + self._provide_checksum_to_meta(response, transfer_future.meta) download_output_manager = self._get_download_output_manager_cls( transfer_future, osutil @@ -480,6 +497,20 @@ def _submit_ranged_download_request( download_output_manager, io_executor ) ) + # + checksum_validator = None + if transfer_future.meta.stored_checksum: + checksum_validator = ChecksumValidator( + transfer_future.meta.stored_checksum, + transfer_future.meta.size, + ) + validate_checksum_invoker = CountCallbackInvoker( + self._get_validate_checksum_task( + download_output_manager, + io_executor, + checksum_validator, + ) + ) for i in range(num_parts): # Calculate the range parameter range_parameter = calculate_range_parameter( @@ -494,6 +525,7 @@ def _submit_ranged_download_request( extra_args['IfMatch'] = transfer_future.meta.etag extra_args.update(call_args.extra_args) finalize_download_invoker.increment() + validate_checksum_invoker.increment() # Submit the ranged downloads self._transfer_coordinator.submit( request_executor, @@ -511,13 +543,36 @@ def _submit_ranged_download_request( 'download_output_manager': download_output_manager, 'io_chunksize': config.io_chunksize, 'bandwidth_limiter': bandwidth_limiter, + 'checksum_validator': checksum_validator, }, - done_callbacks=[finalize_download_invoker.decrement], + done_callbacks=[ + validate_checksum_invoker.decrement, + finalize_download_invoker.decrement, + ], ), tag=get_object_tag, ) + validate_checksum_invoker.finalize() finalize_download_invoker.finalize() + def _get_validate_checksum_task( + self, + download_manager, + io_executor, + checksum_validator, + ): + if checksum_validator is None: + task = CompleteDownloadNOOPTask( + transfer_coordinator=self._transfer_coordinator, + ) + else: + task = download_manager.get_validate_checksum_task( + checksum_validator, + ) + return FunctionContainer( + self._transfer_coordinator.submit, io_executor, task + ) + def _get_final_io_task_submission_callback( self, download_manager, io_executor ): @@ -536,6 +591,18 @@ def _calculate_range_param(self, part_size, part_index, num_parts): range_param = f'bytes={start_range}-{end_range}' return range_param + def _provide_checksum_to_meta(self, response, transfer_meta): + checksum_type = response.get("ChecksumType") + if not checksum_type or checksum_type != "FULL_OBJECT": + transfer_meta.provide_stored_checksum({}) + for crc_checksum in CRC_CHECKSUMS: + if checksum_value := response.get(crc_checksum): + transfer_meta.provide_stored_checksum( + {crc_checksum: checksum_value} + ) + return + transfer_meta.provide_stored_checksum({}) + class GetObjectTask(Task): def _main( @@ -551,6 +618,7 @@ def _main( io_chunksize, start_index=0, bandwidth_limiter=None, + checksum_validator=None, ): """Downloads an object and places content into io queue @@ -580,6 +648,12 @@ def _main( streaming_body = StreamReaderProgress( response['Body'], callbacks ) + if checksum_validator: + streaming_body = StreamingChecksumBody( + streaming_body, + current_index, + checksum_validator, + ) if bandwidth_limiter: streaming_body = ( bandwidth_limiter.get_bandwith_limited_stream( @@ -831,3 +905,8 @@ def request_writes(self, offset, data): del self._pending_offsets[next_write_offset] self._next_offset += len(next_write) return writes + + +class ValidateChecksumTask(Task): + def _main(self, checksum_validator): + checksum_validator.validate() diff --git a/awscli/s3transfer/futures.py b/awscli/s3transfer/futures.py index 6222a42baba8..a12001c1bccf 100644 --- a/awscli/s3transfer/futures.py +++ b/awscli/s3transfer/futures.py @@ -128,6 +128,7 @@ def __init__(self, call_args=None, transfer_id=None): self._size = None self._user_context = {} self._etag = None + self._stored_checksum = None @property def call_args(self): @@ -154,6 +155,13 @@ def etag(self): """The etag of the stored object for validating multipart downloads""" return self._etag + @property + def stored_checksum(self): + return self._stored_checksum + + def provide_stored_checksum(self, checksum): + self._stored_checksum = checksum + def provide_transfer_size(self, size): """A method to provide the size of a transfer request From ed1743db3d381b2d021cad889e67df95ff9446f8 Mon Sep 17 00:00:00 2001 From: Steve Yoo Date: Wed, 20 Aug 2025 11:56:55 -0400 Subject: [PATCH 2/3] Reuse calculated checksums if available --- awscli/botocore/httpchecksum.py | 4 +++ awscli/s3transfer/checksums.py | 52 +++++++++++++++++++++------------ awscli/s3transfer/download.py | 50 +++++++++++++++++-------------- awscli/s3transfer/futures.py | 8 +++++ 4 files changed, 73 insertions(+), 41 deletions(-) diff --git a/awscli/botocore/httpchecksum.py b/awscli/botocore/httpchecksum.py index 1cec6ee32a5a..560fe6db3ebf 100644 --- a/awscli/botocore/httpchecksum.py +++ b/awscli/botocore/httpchecksum.py @@ -253,6 +253,10 @@ def _validate_checksum(self): error_msg = f"Expected checksum {self._expected} did not match calculated checksum: {self._checksum.b64digest()}" raise FlexibleChecksumError(error_msg=error_msg) + @property + def checksum(self): + return self._checksum + def resolve_checksum_context(request, operation_model, params): resolve_request_checksum_algorithm(request, operation_model, params) diff --git a/awscli/s3transfer/checksums.py b/awscli/s3transfer/checksums.py index 46cc20eba54f..6e390f06cf79 100644 --- a/awscli/s3transfer/checksums.py +++ b/awscli/s3transfer/checksums.py @@ -8,35 +8,47 @@ ) -class StreamingChecksumBody: - def __init__(self, stream, starting_index, checksum_validator): +class PartStreamingChecksumBody: + def __init__(self, stream, starting_index, full_object_checksum): self._stream = stream self._starting_index = starting_index self._checksum = _CRC_CHECKSUM_CLS[ - checksum_validator.checksum_algorithm + full_object_checksum.checksum_algorithm ]() - self._checksum_validator = checksum_validator + self._full_object_checksum = full_object_checksum + # If the underlying stream already has a checksum object + # it's updating (eg `botocore.httpchecksum.StreamingChecksumBody`), + # reuse its calculated value. + self._should_update = not hasattr(self._stream, 'checksum') def read(self, *args, **kwargs): value = self._stream.read(*args, **kwargs) - self._checksum.update(value) + if self._should_update: + self._checksum.update(value) if not value: - self._checksum_validator.set_part_checksums( - self._starting_index, self._checksum.int_crc - ) + self._set_part_checksum() return value + def _set_part_checksum(self): + if self._should_update: + value = self._checksum.int_crc + else: + value = self._stream.checksum.int_crc + self._full_object_checksum.set_part_checksum( + self._starting_index, value, + ) + -class ChecksumValidator: - def __init__(self, stored_checksum, content_length): - self.checksum_algorithm = list(stored_checksum.keys())[0] - self._checksum_value = stored_checksum[self.checksum_algorithm] +class FullObjectChecksum: + def __init__(self, checksum_algorithm, content_length): + self.checksum_algorithm = checksum_algorithm + self._content_length = content_length self._combine_function = _CRC_CHECKSUM_TO_COMBINE_FUNCTION[ self.checksum_algorithm ] + self._stored_checksum = None self._part_checksums = None self._calculated_checksum = None - self._content_length = content_length @cached_property def calculated_checksum(self): @@ -44,7 +56,10 @@ def calculated_checksum(self): self._combine_part_checksums() return self._calculated_checksum - def set_part_checksums(self, offset, checksum): + def set_stored_checksum(self, stored_checksum): + self._stored_checksum = stored_checksum + + def set_part_checksum(self, offset, checksum): if self._part_checksums is None: self._part_checksums = {} self._part_checksums[offset] = checksum @@ -69,11 +84,10 @@ def _combine_part_checksums(self): ).decode('ascii') def validate(self): - if not self._checksum_value: - return - if self.calculated_checksum != self._checksum_value: - raise Exception( - f"stored: {self._checksum_value} != calculated: {self.calculated_checksum}" + if self.calculated_checksum != self._stored_checksum: + raise ValueError( + f"Calculated checksum {self.calculated_checksum} does not match " + f"stored checksum {self._stored_checksum}" ) diff --git a/awscli/s3transfer/download.py b/awscli/s3transfer/download.py index c26cbbda641f..6f180615e6e7 100644 --- a/awscli/s3transfer/download.py +++ b/awscli/s3transfer/download.py @@ -17,8 +17,8 @@ from botocore.exceptions import ClientError from s3transfer.checksums import ( CRC_CHECKSUMS, - ChecksumValidator, - StreamingChecksumBody, + PartStreamingChecksumBody, + FullObjectChecksum, ) from s3transfer.compat import seekable from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError @@ -142,11 +142,11 @@ def get_final_io_task(self): """ raise NotImplementedError('must implement get_final_io_task()') - def get_validate_checksum_task(self, checksum_validator): + def get_validate_checksum_task(self, full_object_checksum): return ValidateChecksumTask( transfer_coordinator=self._transfer_coordinator, main_kwargs={ - 'checksum_validator': checksum_validator, + 'full_object_checksum': full_object_checksum, }, ) @@ -498,17 +498,20 @@ def _submit_ranged_download_request( ) ) # - checksum_validator = None + full_object_checksum = None if transfer_future.meta.stored_checksum: - checksum_validator = ChecksumValidator( - transfer_future.meta.stored_checksum, + full_object_checksum = FullObjectChecksum( + transfer_future.meta.checksum_algorithm, transfer_future.meta.size, ) + full_object_checksum.set_stored_checksum( + transfer_future.meta.stored_checksum, + ) validate_checksum_invoker = CountCallbackInvoker( self._get_validate_checksum_task( download_output_manager, io_executor, - checksum_validator, + full_object_checksum, ) ) for i in range(num_parts): @@ -543,7 +546,7 @@ def _submit_ranged_download_request( 'download_output_manager': download_output_manager, 'io_chunksize': config.io_chunksize, 'bandwidth_limiter': bandwidth_limiter, - 'checksum_validator': checksum_validator, + 'full_object_checksum': full_object_checksum, }, done_callbacks=[ validate_checksum_invoker.decrement, @@ -559,15 +562,15 @@ def _get_validate_checksum_task( self, download_manager, io_executor, - checksum_validator, + full_object_checksum, ): - if checksum_validator is None: + if full_object_checksum is None: task = CompleteDownloadNOOPTask( transfer_coordinator=self._transfer_coordinator, ) else: task = download_manager.get_validate_checksum_task( - checksum_validator, + full_object_checksum, ) return FunctionContainer( self._transfer_coordinator.submit, io_executor, task @@ -594,14 +597,17 @@ def _calculate_range_param(self, part_size, part_index, num_parts): def _provide_checksum_to_meta(self, response, transfer_meta): checksum_type = response.get("ChecksumType") if not checksum_type or checksum_type != "FULL_OBJECT": - transfer_meta.provide_stored_checksum({}) + # transfer_meta.provide_stored_checksum({}) + return for crc_checksum in CRC_CHECKSUMS: if checksum_value := response.get(crc_checksum): + transfer_meta.provide_checksum_algorithm( + crc_checksum + ) transfer_meta.provide_stored_checksum( - {crc_checksum: checksum_value} + checksum_value ) return - transfer_meta.provide_stored_checksum({}) class GetObjectTask(Task): @@ -618,7 +624,7 @@ def _main( io_chunksize, start_index=0, bandwidth_limiter=None, - checksum_validator=None, + full_object_checksum=None, ): """Downloads an object and places content into io queue @@ -648,11 +654,11 @@ def _main( streaming_body = StreamReaderProgress( response['Body'], callbacks ) - if checksum_validator: - streaming_body = StreamingChecksumBody( + if full_object_checksum: + streaming_body = PartStreamingChecksumBody( streaming_body, - current_index, - checksum_validator, + start_index, + full_object_checksum, ) if bandwidth_limiter: streaming_body = ( @@ -908,5 +914,5 @@ def request_writes(self, offset, data): class ValidateChecksumTask(Task): - def _main(self, checksum_validator): - checksum_validator.validate() + def _main(self, full_object_checksum): + full_object_checksum.validate() diff --git a/awscli/s3transfer/futures.py b/awscli/s3transfer/futures.py index a12001c1bccf..776a682d48da 100644 --- a/awscli/s3transfer/futures.py +++ b/awscli/s3transfer/futures.py @@ -129,6 +129,7 @@ def __init__(self, call_args=None, transfer_id=None): self._user_context = {} self._etag = None self._stored_checksum = None + self._checksum_algorithm = None @property def call_args(self): @@ -159,9 +160,16 @@ def etag(self): def stored_checksum(self): return self._stored_checksum + @property + def checksum_algorithm(self): + return self._checksum_algorithm + def provide_stored_checksum(self, checksum): self._stored_checksum = checksum + def provide_checksum_algorithm(self, algorithm): + self._checksum_algorithm = algorithm + def provide_transfer_size(self, size): """A method to provide the size of a transfer request From eb036dcae70cc87c0aa0d0d7adebc5b54a7c0322 Mon Sep 17 00:00:00 2001 From: Steve Yoo Date: Thu, 21 Aug 2025 12:28:25 -0400 Subject: [PATCH 3/3] Implement full object checksum validation for MPUs --- awscli/botocore/httpchecksum.py | 7 +++++-- awscli/s3transfer/checksums.py | 13 ++++++------ awscli/s3transfer/download.py | 19 ++++++++---------- awscli/s3transfer/manager.py | 2 ++ awscli/s3transfer/tasks.py | 16 ++++++++++++++- awscli/s3transfer/upload.py | 35 ++++++++++++++++++++++++++++++--- awscli/s3transfer/utils.py | 23 ++++++++++++++++++++++ 7 files changed, 92 insertions(+), 23 deletions(-) diff --git a/awscli/botocore/httpchecksum.py b/awscli/botocore/httpchecksum.py index 560fe6db3ebf..79d0a8a401a3 100644 --- a/awscli/botocore/httpchecksum.py +++ b/awscli/botocore/httpchecksum.py @@ -166,6 +166,7 @@ def __init__( self._raw = raw self._checksum_name = checksum_name self._checksum_cls = checksum_cls + self._reuse_checksum = hasattr(self._raw, 'checksum') self._reset() if chunk_size is None: @@ -176,8 +177,10 @@ def _reset(self): self._remaining = b"" self._complete = False self._checksum = None - if self._checksum_cls: + if self._checksum_cls and not self._reuse_checksum: self._checksum = self._checksum_cls() + if self._reuse_checksum: + self._checksum = self._raw.checksum def seek(self, offset, whence=0): if offset != 0 or whence != 0: @@ -220,7 +223,7 @@ def _make_chunk(self): hex_len = hex(len(raw_chunk))[2:].encode("ascii") self._complete = not raw_chunk - if self._checksum: + if self._checksum and not self._reuse_checksum: self._checksum.update(raw_chunk) if self._checksum and self._complete: diff --git a/awscli/s3transfer/checksums.py b/awscli/s3transfer/checksums.py index 6e390f06cf79..ac7dfceb6d61 100644 --- a/awscli/s3transfer/checksums.py +++ b/awscli/s3transfer/checksums.py @@ -12,30 +12,31 @@ class PartStreamingChecksumBody: def __init__(self, stream, starting_index, full_object_checksum): self._stream = stream self._starting_index = starting_index - self._checksum = _CRC_CHECKSUM_CLS[ + self._checksum = CRC_CHECKSUM_CLS[ full_object_checksum.checksum_algorithm ]() self._full_object_checksum = full_object_checksum # If the underlying stream already has a checksum object # it's updating (eg `botocore.httpchecksum.StreamingChecksumBody`), # reuse its calculated value. - self._should_update = not hasattr(self._stream, 'checksum') + self._reuse_checksum = hasattr(self._stream, 'checksum') def read(self, *args, **kwargs): value = self._stream.read(*args, **kwargs) - if self._should_update: + if not self._reuse_checksum: self._checksum.update(value) if not value: self._set_part_checksum() return value def _set_part_checksum(self): - if self._should_update: + if not self._reuse_checksum: value = self._checksum.int_crc else: value = self._stream.checksum.int_crc self._full_object_checksum.set_part_checksum( - self._starting_index, value, + self._starting_index, + value, ) @@ -167,7 +168,7 @@ def gf2_matrix_square(square, mat): } -_CRC_CHECKSUM_CLS = { +CRC_CHECKSUM_CLS = { "ChecksumCRC64NVME": CrtCrc64NvmeChecksum, "ChecksumCRC32C": CrtCrc32cChecksum, "ChecksumCRC32": CrtCrc32Checksum, diff --git a/awscli/s3transfer/download.py b/awscli/s3transfer/download.py index 6f180615e6e7..1185bfd97f34 100644 --- a/awscli/s3transfer/download.py +++ b/awscli/s3transfer/download.py @@ -17,8 +17,8 @@ from botocore.exceptions import ClientError from s3transfer.checksums import ( CRC_CHECKSUMS, - PartStreamingChecksumBody, FullObjectChecksum, + PartStreamingChecksumBody, ) from s3transfer.compat import seekable from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError @@ -601,12 +601,8 @@ def _provide_checksum_to_meta(self, response, transfer_meta): return for crc_checksum in CRC_CHECKSUMS: if checksum_value := response.get(crc_checksum): - transfer_meta.provide_checksum_algorithm( - crc_checksum - ) - transfer_meta.provide_stored_checksum( - checksum_value - ) + transfer_meta.provide_checksum_algorithm(crc_checksum) + transfer_meta.provide_stored_checksum(checksum_value) return @@ -651,15 +647,16 @@ def _main( response = client.get_object( Bucket=bucket, Key=key, **extra_args ) - streaming_body = StreamReaderProgress( - response['Body'], callbacks - ) if full_object_checksum: streaming_body = PartStreamingChecksumBody( - streaming_body, + response['Body'], start_index, full_object_checksum, ) + else: + streaming_body = StreamReaderProgress( + response['Body'], callbacks + ) if bandwidth_limiter: streaming_body = ( bandwidth_limiter.get_bandwith_limited_stream( diff --git a/awscli/s3transfer/manager.py b/awscli/s3transfer/manager.py index baaafe1d37cb..7f1ea4555a81 100644 --- a/awscli/s3transfer/manager.py +++ b/awscli/s3transfer/manager.py @@ -43,6 +43,7 @@ TaskSemaphore, get_callbacks, set_default_checksum_algorithm, + set_default_checksum_type, signal_not_transferring, signal_transferring, ) @@ -521,6 +522,7 @@ def _add_operation_defaults(self, extra_args): == "when_supported" ): set_default_checksum_algorithm(extra_args) + set_default_checksum_type(extra_args) def _submit_transfer( self, call_args, submission_task_cls, extra_main_kwargs=None diff --git a/awscli/s3transfer/tasks.py b/awscli/s3transfer/tasks.py index 7da35a6d9359..97ce4089bc2c 100644 --- a/awscli/s3transfer/tasks.py +++ b/awscli/s3transfer/tasks.py @@ -361,7 +361,16 @@ def _main(self, client, bucket, key, extra_args): class CompleteMultipartUploadTask(Task): """Task to complete a multipart upload""" - def _main(self, client, bucket, key, upload_id, parts, extra_args): + def _main( + self, + client, + bucket, + key, + upload_id, + parts, + extra_args, + full_object_checksum, + ): """ :param client: The client to use when calling CompleteMultipartUpload :param bucket: The name of the bucket to upload to @@ -376,6 +385,11 @@ def _main(self, client, bucket, key, upload_id, parts, extra_args): :param extra_args: A dictionary of any extra arguments that may be used in completing the multipart transfer. """ + if full_object_checksum: + extra_args['ChecksumType'] = "FULL_OBJECT" + extra_args[full_object_checksum.checksum_algorithm] = ( + full_object_checksum.calculated_checksum + ) client.complete_multipart_upload( Bucket=bucket, Key=key, diff --git a/awscli/s3transfer/upload.py b/awscli/s3transfer/upload.py index a3db8f899e2b..ac3d188634a8 100644 --- a/awscli/s3transfer/upload.py +++ b/awscli/s3transfer/upload.py @@ -13,6 +13,11 @@ import math from io import BytesIO +from s3transfer.checksums import ( + CRC_CHECKSUMS, + FullObjectChecksum, + PartStreamingChecksumBody, +) from s3transfer.compat import readable, seekable from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS from s3transfer.futures import IN_MEMORY_UPLOAD_TAG @@ -272,7 +277,9 @@ def get_put_object_body(self, transfer_future): close_callbacks=close_callbacks, ) - def yield_upload_part_bodies(self, transfer_future, chunksize): + def yield_upload_part_bodies( + self, transfer_future, chunksize, full_object_checksum + ): full_file_size = transfer_future.meta.size num_parts = self._get_num_parts(transfer_future, chunksize) for part_number in range(1, num_parts + 1): @@ -300,6 +307,7 @@ def yield_upload_part_bodies(self, transfer_future, chunksize): full_file_size=full_size, callbacks=callbacks, close_callbacks=close_callbacks, + full_object_checksum=full_object_checksum, ) yield part_number, read_file_chunk @@ -676,6 +684,16 @@ def _submit_multipart_request( "Checksum", "" ) + full_object_checksum = None + if ( + f"Checksum{call_args.extra_args['ChecksumAlgorithm']}" + in CRC_CHECKSUMS + ): + full_object_checksum = FullObjectChecksum( + f"Checksum{call_args.extra_args['ChecksumAlgorithm']}", + transfer_future.meta.size, + ) + create_multipart_extra_args = self._extra_create_multipart_args( call_args.extra_args ) @@ -708,7 +726,9 @@ def _submit_multipart_request( adjuster = ChunksizeAdjuster() chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size) part_iterator = upload_input_manager.yield_upload_part_bodies( - transfer_future, chunksize + transfer_future, + chunksize, + full_object_checksum, ) for part_number, fileobj in part_iterator: @@ -724,6 +744,7 @@ def _submit_multipart_request( 'key': call_args.key, 'part_number': part_number, 'extra_args': extra_part_args, + # 'full_object_checksum': full_object_checksum, }, pending_main_kwargs={ 'upload_id': create_multipart_future @@ -746,6 +767,7 @@ def _submit_multipart_request( 'bucket': call_args.bucket, 'key': call_args.key, 'extra_args': complete_multipart_extra_args, + 'full_object_checksum': full_object_checksum, }, pending_main_kwargs={ 'upload_id': create_multipart_future, @@ -800,7 +822,14 @@ class UploadPartTask(Task): """Task to upload a part in a multipart upload""" def _main( - self, client, fileobj, bucket, key, upload_id, part_number, extra_args + self, + client, + fileobj, + bucket, + key, + upload_id, + part_number, + extra_args, ): """ :param client: The client to use when calling PutObject diff --git a/awscli/s3transfer/utils.py b/awscli/s3transfer/utils.py index 43dfc8db6c4b..ef9ca40f38f4 100644 --- a/awscli/s3transfer/utils.py +++ b/awscli/s3transfer/utils.py @@ -24,6 +24,7 @@ from botocore.exceptions import IncompleteReadError, ReadTimeoutError from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper from botocore.utils import is_s3express_bucket +from s3transfer.checksums import CRC_CHECKSUM_CLS from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS @@ -267,6 +268,7 @@ def open_file_chunk_reader_from_fileobj( full_file_size, callbacks, close_callbacks=None, + full_object_checksum=None, ): return ReadFileChunk( fileobj, @@ -275,6 +277,7 @@ def open_file_chunk_reader_from_fileobj( callbacks=callbacks, enable_callbacks=False, close_callbacks=close_callbacks, + full_object_checksum=full_object_checksum, ) def open(self, filename, mode): @@ -413,6 +416,7 @@ def __init__( callbacks=None, enable_callbacks=True, close_callbacks=None, + full_object_checksum=None, ): """ @@ -465,6 +469,10 @@ def __init__( self._close_callbacks = close_callbacks if close_callbacks is None: self._close_callbacks = close_callbacks + self._full_object_checksum = full_object_checksum + self._checksum = CRC_CHECKSUM_CLS[ + self._full_object_checksum.checksum_algorithm + ]() @classmethod def from_filename( @@ -505,6 +513,10 @@ def from_filename( file_size = os.fstat(f.fileno()).st_size return cls(f, chunk_size, file_size, callbacks, enable_callbacks) + @property + def checksum(self): + return self._checksum + def _calculate_file_size( self, fileobj, requested_size, start_byte, actual_file_size ): @@ -519,6 +531,13 @@ def read(self, amount=None): amount_to_read = min(amount_left, amount) data = self._fileobj.read(amount_to_read) self._amount_read += len(data) + if self._checksum: + self._checksum.update(data) + if self._amount_read == self._size: + self._full_object_checksum.set_part_checksum( + self._start_byte, + self._checksum.int_crc, + ) if self._callbacks is not None and self._callbacks_enabled: invoke_progress_callbacks(self._callbacks, len(data)) return data @@ -825,3 +844,7 @@ def set_default_checksum_algorithm(extra_args): if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS): return extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM) + + +def set_default_checksum_type(extra_args): + extra_args.setdefault("ChecksumType", "FULL_OBJECT")