diff --git a/awscli/botocore/httpchecksum.py b/awscli/botocore/httpchecksum.py index 02fb105384c6..79d0a8a401a3 100644 --- a/awscli/botocore/httpchecksum.py +++ b/awscli/botocore/httpchecksum.py @@ -75,6 +75,10 @@ def update(self, chunk): def digest(self): return self._int_crc32.to_bytes(4, byteorder="big") + @property + def int_crc(self): + return self._int_crc32 + class CrtCrc32Checksum(BaseChecksum): # Note: This class is only used if the CRT is available @@ -88,6 +92,10 @@ def update(self, chunk): def digest(self): return self._int_crc32.to_bytes(4, byteorder="big") + @property + def int_crc(self): + return self._int_crc32 + class CrtCrc32cChecksum(BaseChecksum): # Note: This class is only used if the CRT is available @@ -101,6 +109,10 @@ def update(self, chunk): def digest(self): return self._int_crc32c.to_bytes(4, byteorder="big") + @property + def int_crc(self): + return self._int_crc32c + class CrtCrc64NvmeChecksum(BaseChecksum): # Note: This class is only used if the CRT is available @@ -114,6 +126,10 @@ def update(self, chunk): def digest(self): return self._int_crc64nvme.to_bytes(8, byteorder="big") + @property + def int_crc(self): + return self._int_crc64nvme + class Sha1Checksum(BaseChecksum): def __init__(self): @@ -150,6 +166,7 @@ def __init__( self._raw = raw self._checksum_name = checksum_name self._checksum_cls = checksum_cls + self._reuse_checksum = hasattr(self._raw, 'checksum') self._reset() if chunk_size is None: @@ -160,8 +177,10 @@ def _reset(self): self._remaining = b"" self._complete = False self._checksum = None - if self._checksum_cls: + if self._checksum_cls and not self._reuse_checksum: self._checksum = self._checksum_cls() + if self._reuse_checksum: + self._checksum = self._raw.checksum def seek(self, offset, whence=0): if offset != 0 or whence != 0: @@ -204,7 +223,7 @@ def _make_chunk(self): hex_len = hex(len(raw_chunk))[2:].encode("ascii") self._complete = not raw_chunk - if self._checksum: + if self._checksum and not self._reuse_checksum: self._checksum.update(raw_chunk) if self._checksum and self._complete: @@ -237,6 +256,10 @@ def _validate_checksum(self): error_msg = f"Expected checksum {self._expected} did not match calculated checksum: {self._checksum.b64digest()}" raise FlexibleChecksumError(error_msg=error_msg) + @property + def checksum(self): + return self._checksum + def resolve_checksum_context(request, operation_model, params): resolve_request_checksum_algorithm(request, operation_model, params) diff --git a/awscli/s3transfer/checksums.py b/awscli/s3transfer/checksums.py new file mode 100644 index 000000000000..ac7dfceb6d61 --- /dev/null +++ b/awscli/s3transfer/checksums.py @@ -0,0 +1,178 @@ +import base64 +from functools import cached_property + +from botocore.httpchecksum import ( + CrtCrc32cChecksum, + CrtCrc32Checksum, + CrtCrc64NvmeChecksum, +) + + +class PartStreamingChecksumBody: + def __init__(self, stream, starting_index, full_object_checksum): + self._stream = stream + self._starting_index = starting_index + self._checksum = CRC_CHECKSUM_CLS[ + full_object_checksum.checksum_algorithm + ]() + self._full_object_checksum = full_object_checksum + # If the underlying stream already has a checksum object + # it's updating (eg `botocore.httpchecksum.StreamingChecksumBody`), + # reuse its calculated value. + self._reuse_checksum = hasattr(self._stream, 'checksum') + + def read(self, *args, **kwargs): + value = self._stream.read(*args, **kwargs) + if not self._reuse_checksum: + self._checksum.update(value) + if not value: + self._set_part_checksum() + return value + + def _set_part_checksum(self): + if not self._reuse_checksum: + value = self._checksum.int_crc + else: + value = self._stream.checksum.int_crc + self._full_object_checksum.set_part_checksum( + self._starting_index, + value, + ) + + +class FullObjectChecksum: + def __init__(self, checksum_algorithm, content_length): + self.checksum_algorithm = checksum_algorithm + self._content_length = content_length + self._combine_function = _CRC_CHECKSUM_TO_COMBINE_FUNCTION[ + self.checksum_algorithm + ] + self._stored_checksum = None + self._part_checksums = None + self._calculated_checksum = None + + @cached_property + def calculated_checksum(self): + if self._calculated_checksum is None: + self._combine_part_checksums() + return self._calculated_checksum + + def set_stored_checksum(self, stored_checksum): + self._stored_checksum = stored_checksum + + def set_part_checksum(self, offset, checksum): + if self._part_checksums is None: + self._part_checksums = {} + self._part_checksums[offset] = checksum + + def _combine_part_checksums(self): + if self._part_checksums is None: + return + sorted_keys = sorted(self._part_checksums.keys()) + combined = self._part_checksums[sorted_keys[0]] + for i, offset in enumerate(sorted_keys[1:]): + part_checksum = self._part_checksums[offset] + if i + 1 == len(sorted_keys) - 1: + next_offset = self._content_length + else: + next_offset = sorted_keys[i + 2] + offset_len = next_offset - offset + combined = self._combine_function( + combined, part_checksum, offset_len + ) + self._calculated_checksum = base64.b64encode( + combined.to_bytes(4, byteorder='big') + ).decode('ascii') + + def validate(self): + if self.calculated_checksum != self._stored_checksum: + raise ValueError( + f"Calculated checksum {self.calculated_checksum} does not match " + f"stored checksum {self._stored_checksum}" + ) + + +def combine_crc32(crc1, crc2, len2): + """ + Combine two CRC32 checksums computed with binascii.crc32. + + This implementation follows the algorithm used in zlib's crc32_combine. + + Args: + crc1: CRC32 checksum of the first data block (from binascii.crc32) + crc2: CRC32 checksum of the second data block (from binascii.crc32) + len2: Length in bytes of the second data block + + Returns: + Combined CRC32 checksum as if the two blocks were concatenated + """ + + # CRC-32 polynomial in reversed bit order + POLY = 0xEDB88320 + + def gf2_matrix_times(mat, vec): + """Multiply matrix by vector over GF(2)""" + result = 0 + for i in range(32): + if vec & (1 << i): + result ^= mat[i] + return result & 0xFFFFFFFF + + def gf2_matrix_square(square, mat): + """Square matrix over GF(2)""" + for n in range(32): + square[n] = gf2_matrix_times(mat, mat[n]) + + # Create initial CRC matrix for 1 bit + odd = [0] * 32 + even = [0] * 32 + + # Build odd matrix (for 1 bit shift) + odd[0] = POLY + for n in range(1, 32): + odd[n] = 1 << (n - 1) + + # Square to get even matrix (for 2 bit shift), then keep squaring + gf2_matrix_square(even, odd) + gf2_matrix_square(odd, even) + + # Process len2 bytes (8 * len2 bits) + length = len2 + + # Process chunks of 3 bits at a time (since we have matrices for 4 and 8 bit shifts) + while length != 0: + # Square matrices to advance to next power of 2 + gf2_matrix_square(even, odd) + if length & 1: + crc1 = gf2_matrix_times(even, crc1) + length >>= 1 + + if length == 0: + break + + gf2_matrix_square(odd, even) + if length & 1: + crc1 = gf2_matrix_times(odd, crc1) + length >>= 1 + + # XOR the two CRCs + crc1 ^= crc2 + + return crc1 & 0xFFFFFFFF + + +_CRC_CHECKSUM_TO_COMBINE_FUNCTION = { + "ChecksumCRC64NVME": None, + "ChecksumCRC32C": None, + "ChecksumCRC32": combine_crc32, +} + + +CRC_CHECKSUM_CLS = { + "ChecksumCRC64NVME": CrtCrc64NvmeChecksum, + "ChecksumCRC32C": CrtCrc32cChecksum, + "ChecksumCRC32": CrtCrc32Checksum, +} + + +CRC_CHECKSUMS = _CRC_CHECKSUM_TO_COMBINE_FUNCTION.keys() diff --git a/awscli/s3transfer/download.py b/awscli/s3transfer/download.py index 1cb0a65c2320..1185bfd97f34 100644 --- a/awscli/s3transfer/download.py +++ b/awscli/s3transfer/download.py @@ -15,6 +15,11 @@ import threading from botocore.exceptions import ClientError +from s3transfer.checksums import ( + CRC_CHECKSUMS, + FullObjectChecksum, + PartStreamingChecksumBody, +) from s3transfer.compat import seekable from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG @@ -137,6 +142,14 @@ def get_final_io_task(self): """ raise NotImplementedError('must implement get_final_io_task()') + def get_validate_checksum_task(self, full_object_checksum): + return ValidateChecksumTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'full_object_checksum': full_object_checksum, + }, + ) + def _get_fileobj_from_filename(self, filename): f = DeferredOpenFile( filename, mode='wb', open_function=self._osutil.open @@ -350,10 +363,12 @@ def _submit( if ( transfer_future.meta.size is None or transfer_future.meta.etag is None + or transfer_future.meta.stored_checksum is None ): response = client.head_object( Bucket=transfer_future.meta.call_args.bucket, Key=transfer_future.meta.call_args.key, + ChecksumMode="ENABLED", **transfer_future.meta.call_args.extra_args, ) # If a size was not provided figure out the size for the @@ -364,6 +379,8 @@ def _submit( # Provide an etag to ensure a stored object is not modified # during a multipart download. transfer_future.meta.provide_object_etag(response.get('ETag')) + # Provide checksum + self._provide_checksum_to_meta(response, transfer_future.meta) download_output_manager = self._get_download_output_manager_cls( transfer_future, osutil @@ -480,6 +497,23 @@ def _submit_ranged_download_request( download_output_manager, io_executor ) ) + # + full_object_checksum = None + if transfer_future.meta.stored_checksum: + full_object_checksum = FullObjectChecksum( + transfer_future.meta.checksum_algorithm, + transfer_future.meta.size, + ) + full_object_checksum.set_stored_checksum( + transfer_future.meta.stored_checksum, + ) + validate_checksum_invoker = CountCallbackInvoker( + self._get_validate_checksum_task( + download_output_manager, + io_executor, + full_object_checksum, + ) + ) for i in range(num_parts): # Calculate the range parameter range_parameter = calculate_range_parameter( @@ -494,6 +528,7 @@ def _submit_ranged_download_request( extra_args['IfMatch'] = transfer_future.meta.etag extra_args.update(call_args.extra_args) finalize_download_invoker.increment() + validate_checksum_invoker.increment() # Submit the ranged downloads self._transfer_coordinator.submit( request_executor, @@ -511,13 +546,36 @@ def _submit_ranged_download_request( 'download_output_manager': download_output_manager, 'io_chunksize': config.io_chunksize, 'bandwidth_limiter': bandwidth_limiter, + 'full_object_checksum': full_object_checksum, }, - done_callbacks=[finalize_download_invoker.decrement], + done_callbacks=[ + validate_checksum_invoker.decrement, + finalize_download_invoker.decrement, + ], ), tag=get_object_tag, ) + validate_checksum_invoker.finalize() finalize_download_invoker.finalize() + def _get_validate_checksum_task( + self, + download_manager, + io_executor, + full_object_checksum, + ): + if full_object_checksum is None: + task = CompleteDownloadNOOPTask( + transfer_coordinator=self._transfer_coordinator, + ) + else: + task = download_manager.get_validate_checksum_task( + full_object_checksum, + ) + return FunctionContainer( + self._transfer_coordinator.submit, io_executor, task + ) + def _get_final_io_task_submission_callback( self, download_manager, io_executor ): @@ -536,6 +594,17 @@ def _calculate_range_param(self, part_size, part_index, num_parts): range_param = f'bytes={start_range}-{end_range}' return range_param + def _provide_checksum_to_meta(self, response, transfer_meta): + checksum_type = response.get("ChecksumType") + if not checksum_type or checksum_type != "FULL_OBJECT": + # transfer_meta.provide_stored_checksum({}) + return + for crc_checksum in CRC_CHECKSUMS: + if checksum_value := response.get(crc_checksum): + transfer_meta.provide_checksum_algorithm(crc_checksum) + transfer_meta.provide_stored_checksum(checksum_value) + return + class GetObjectTask(Task): def _main( @@ -551,6 +620,7 @@ def _main( io_chunksize, start_index=0, bandwidth_limiter=None, + full_object_checksum=None, ): """Downloads an object and places content into io queue @@ -577,9 +647,16 @@ def _main( response = client.get_object( Bucket=bucket, Key=key, **extra_args ) - streaming_body = StreamReaderProgress( - response['Body'], callbacks - ) + if full_object_checksum: + streaming_body = PartStreamingChecksumBody( + response['Body'], + start_index, + full_object_checksum, + ) + else: + streaming_body = StreamReaderProgress( + response['Body'], callbacks + ) if bandwidth_limiter: streaming_body = ( bandwidth_limiter.get_bandwith_limited_stream( @@ -831,3 +908,8 @@ def request_writes(self, offset, data): del self._pending_offsets[next_write_offset] self._next_offset += len(next_write) return writes + + +class ValidateChecksumTask(Task): + def _main(self, full_object_checksum): + full_object_checksum.validate() diff --git a/awscli/s3transfer/futures.py b/awscli/s3transfer/futures.py index 6222a42baba8..776a682d48da 100644 --- a/awscli/s3transfer/futures.py +++ b/awscli/s3transfer/futures.py @@ -128,6 +128,8 @@ def __init__(self, call_args=None, transfer_id=None): self._size = None self._user_context = {} self._etag = None + self._stored_checksum = None + self._checksum_algorithm = None @property def call_args(self): @@ -154,6 +156,20 @@ def etag(self): """The etag of the stored object for validating multipart downloads""" return self._etag + @property + def stored_checksum(self): + return self._stored_checksum + + @property + def checksum_algorithm(self): + return self._checksum_algorithm + + def provide_stored_checksum(self, checksum): + self._stored_checksum = checksum + + def provide_checksum_algorithm(self, algorithm): + self._checksum_algorithm = algorithm + def provide_transfer_size(self, size): """A method to provide the size of a transfer request diff --git a/awscli/s3transfer/manager.py b/awscli/s3transfer/manager.py index baaafe1d37cb..7f1ea4555a81 100644 --- a/awscli/s3transfer/manager.py +++ b/awscli/s3transfer/manager.py @@ -43,6 +43,7 @@ TaskSemaphore, get_callbacks, set_default_checksum_algorithm, + set_default_checksum_type, signal_not_transferring, signal_transferring, ) @@ -521,6 +522,7 @@ def _add_operation_defaults(self, extra_args): == "when_supported" ): set_default_checksum_algorithm(extra_args) + set_default_checksum_type(extra_args) def _submit_transfer( self, call_args, submission_task_cls, extra_main_kwargs=None diff --git a/awscli/s3transfer/tasks.py b/awscli/s3transfer/tasks.py index 7da35a6d9359..97ce4089bc2c 100644 --- a/awscli/s3transfer/tasks.py +++ b/awscli/s3transfer/tasks.py @@ -361,7 +361,16 @@ def _main(self, client, bucket, key, extra_args): class CompleteMultipartUploadTask(Task): """Task to complete a multipart upload""" - def _main(self, client, bucket, key, upload_id, parts, extra_args): + def _main( + self, + client, + bucket, + key, + upload_id, + parts, + extra_args, + full_object_checksum, + ): """ :param client: The client to use when calling CompleteMultipartUpload :param bucket: The name of the bucket to upload to @@ -376,6 +385,11 @@ def _main(self, client, bucket, key, upload_id, parts, extra_args): :param extra_args: A dictionary of any extra arguments that may be used in completing the multipart transfer. """ + if full_object_checksum: + extra_args['ChecksumType'] = "FULL_OBJECT" + extra_args[full_object_checksum.checksum_algorithm] = ( + full_object_checksum.calculated_checksum + ) client.complete_multipart_upload( Bucket=bucket, Key=key, diff --git a/awscli/s3transfer/upload.py b/awscli/s3transfer/upload.py index a3db8f899e2b..ac3d188634a8 100644 --- a/awscli/s3transfer/upload.py +++ b/awscli/s3transfer/upload.py @@ -13,6 +13,11 @@ import math from io import BytesIO +from s3transfer.checksums import ( + CRC_CHECKSUMS, + FullObjectChecksum, + PartStreamingChecksumBody, +) from s3transfer.compat import readable, seekable from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS from s3transfer.futures import IN_MEMORY_UPLOAD_TAG @@ -272,7 +277,9 @@ def get_put_object_body(self, transfer_future): close_callbacks=close_callbacks, ) - def yield_upload_part_bodies(self, transfer_future, chunksize): + def yield_upload_part_bodies( + self, transfer_future, chunksize, full_object_checksum + ): full_file_size = transfer_future.meta.size num_parts = self._get_num_parts(transfer_future, chunksize) for part_number in range(1, num_parts + 1): @@ -300,6 +307,7 @@ def yield_upload_part_bodies(self, transfer_future, chunksize): full_file_size=full_size, callbacks=callbacks, close_callbacks=close_callbacks, + full_object_checksum=full_object_checksum, ) yield part_number, read_file_chunk @@ -676,6 +684,16 @@ def _submit_multipart_request( "Checksum", "" ) + full_object_checksum = None + if ( + f"Checksum{call_args.extra_args['ChecksumAlgorithm']}" + in CRC_CHECKSUMS + ): + full_object_checksum = FullObjectChecksum( + f"Checksum{call_args.extra_args['ChecksumAlgorithm']}", + transfer_future.meta.size, + ) + create_multipart_extra_args = self._extra_create_multipart_args( call_args.extra_args ) @@ -708,7 +726,9 @@ def _submit_multipart_request( adjuster = ChunksizeAdjuster() chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size) part_iterator = upload_input_manager.yield_upload_part_bodies( - transfer_future, chunksize + transfer_future, + chunksize, + full_object_checksum, ) for part_number, fileobj in part_iterator: @@ -724,6 +744,7 @@ def _submit_multipart_request( 'key': call_args.key, 'part_number': part_number, 'extra_args': extra_part_args, + # 'full_object_checksum': full_object_checksum, }, pending_main_kwargs={ 'upload_id': create_multipart_future @@ -746,6 +767,7 @@ def _submit_multipart_request( 'bucket': call_args.bucket, 'key': call_args.key, 'extra_args': complete_multipart_extra_args, + 'full_object_checksum': full_object_checksum, }, pending_main_kwargs={ 'upload_id': create_multipart_future, @@ -800,7 +822,14 @@ class UploadPartTask(Task): """Task to upload a part in a multipart upload""" def _main( - self, client, fileobj, bucket, key, upload_id, part_number, extra_args + self, + client, + fileobj, + bucket, + key, + upload_id, + part_number, + extra_args, ): """ :param client: The client to use when calling PutObject diff --git a/awscli/s3transfer/utils.py b/awscli/s3transfer/utils.py index 43dfc8db6c4b..ef9ca40f38f4 100644 --- a/awscli/s3transfer/utils.py +++ b/awscli/s3transfer/utils.py @@ -24,6 +24,7 @@ from botocore.exceptions import IncompleteReadError, ReadTimeoutError from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper from botocore.utils import is_s3express_bucket +from s3transfer.checksums import CRC_CHECKSUM_CLS from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS @@ -267,6 +268,7 @@ def open_file_chunk_reader_from_fileobj( full_file_size, callbacks, close_callbacks=None, + full_object_checksum=None, ): return ReadFileChunk( fileobj, @@ -275,6 +277,7 @@ def open_file_chunk_reader_from_fileobj( callbacks=callbacks, enable_callbacks=False, close_callbacks=close_callbacks, + full_object_checksum=full_object_checksum, ) def open(self, filename, mode): @@ -413,6 +416,7 @@ def __init__( callbacks=None, enable_callbacks=True, close_callbacks=None, + full_object_checksum=None, ): """ @@ -465,6 +469,10 @@ def __init__( self._close_callbacks = close_callbacks if close_callbacks is None: self._close_callbacks = close_callbacks + self._full_object_checksum = full_object_checksum + self._checksum = CRC_CHECKSUM_CLS[ + self._full_object_checksum.checksum_algorithm + ]() @classmethod def from_filename( @@ -505,6 +513,10 @@ def from_filename( file_size = os.fstat(f.fileno()).st_size return cls(f, chunk_size, file_size, callbacks, enable_callbacks) + @property + def checksum(self): + return self._checksum + def _calculate_file_size( self, fileobj, requested_size, start_byte, actual_file_size ): @@ -519,6 +531,13 @@ def read(self, amount=None): amount_to_read = min(amount_left, amount) data = self._fileobj.read(amount_to_read) self._amount_read += len(data) + if self._checksum: + self._checksum.update(data) + if self._amount_read == self._size: + self._full_object_checksum.set_part_checksum( + self._start_byte, + self._checksum.int_crc, + ) if self._callbacks is not None and self._callbacks_enabled: invoke_progress_callbacks(self._callbacks, len(data)) return data @@ -825,3 +844,7 @@ def set_default_checksum_algorithm(extra_args): if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS): return extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM) + + +def set_default_checksum_type(extra_args): + extra_args.setdefault("ChecksumType", "FULL_OBJECT")