diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index bd67f57753..18f6a68d21 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -868,6 +868,7 @@ def _process_named_tensor(self, named_tensor, collaborator_name): "int_to_float": proto.int_to_float, "int_list": proto.int_list, "bool_list": proto.bool_list, + "dtype": proto.dtype, } for proto in named_tensor.transformer_metadata ] diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index f0d27e85f4..f0e2a9df8f 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -514,6 +514,7 @@ def named_tensor_to_nparray(self, named_tensor): "int_to_float": proto.int_to_float, "int_list": proto.int_list, "bool_list": proto.bool_list, + "dtype": proto.dtype, } for proto in named_tensor.transformer_metadata ] diff --git a/openfl/pipelines/eden_pipeline.py b/openfl/pipelines/eden_pipeline.py index fe05bd3af5..245f17e473 100644 --- a/openfl/pipelines/eden_pipeline.py +++ b/openfl/pipelines/eden_pipeline.py @@ -43,7 +43,7 @@ import numpy as np import torch -from openfl.pipelines.pipeline import Float32NumpyArrayToBytes, TransformationPipeline, Transformer +from openfl.pipelines.pipeline import NumpyArrayToBytes, TransformationPipeline, Transformer class Eden: @@ -731,7 +731,7 @@ class EdenTransformer(Transformer): with dimensions less than this threshold are not compressed. device (str): The device to be used for quantization ('cpu' or 'cuda'). eden (Eden): The Eden object for quantization. - no_comp (Float32NumpyArrayToBytes): The transformer for data that are + no_comp (NumpyArrayToBytes): The transformer for data that are not compressed. """ @@ -756,7 +756,7 @@ def __init__(self, n_bits=8, dim_threshold=100, device="cpu"): ) self.dim_threshold = dim_threshold - self.no_comp = Float32NumpyArrayToBytes() + self.no_comp = NumpyArrayToBytes() def forward(self, data, **kwargs): """Quantize data. diff --git a/openfl/pipelines/no_compression_pipeline.py b/openfl/pipelines/no_compression_pipeline.py index cbb2317724..070616b852 100644 --- a/openfl/pipelines/no_compression_pipeline.py +++ b/openfl/pipelines/no_compression_pipeline.py @@ -4,7 +4,7 @@ """NoCompressionPipeline module.""" -from openfl.pipelines.pipeline import Float32NumpyArrayToBytes, TransformationPipeline +from openfl.pipelines.pipeline import NumpyArrayToBytes, TransformationPipeline class NoCompressionPipeline(TransformationPipeline): @@ -12,4 +12,4 @@ class NoCompressionPipeline(TransformationPipeline): def __init__(self, **kwargs): """Initialize.""" - super().__init__(transformers=[Float32NumpyArrayToBytes()], **kwargs) + super().__init__(transformers=[NumpyArrayToBytes()], **kwargs) diff --git a/openfl/pipelines/pipeline.py b/openfl/pipelines/pipeline.py index e83771b79e..8f71155bed 100644 --- a/openfl/pipelines/pipeline.py +++ b/openfl/pipelines/pipeline.py @@ -48,48 +48,41 @@ def backward(self, data, metadata, **kwargs): raise NotImplementedError -class Float32NumpyArrayToBytes(Transformer): - """Transformer class for converting float32 Numpy arrays to bytes - arrays.""" +class NumpyArrayToBytes(Transformer): + """Transformer for converting generic Numpy arrays to bytes.""" def __init__(self): - """Initialize Float32NumpyArrayToBytes.""" self.lossy = False def forward(self, data: np.ndarray, **kwargs): - """Convert a float32 Numpy array to bytes. + """Convert a Numpy array to bytes. Args: - data: The float32 Numpy array to be converted. + data: The Numpy array to be converted. **kwargs: Additional keyword arguments for the conversion. Returns: data_bytes: The data converted to bytes. metadata: The metadata for the conversion. """ - # TODO: Warn when this casting is being performed. - if data.dtype != np.float32: - data = data.astype(np.float32) array_shape = data.shape - # Better call it array_shape? - metadata = {"int_list": list(array_shape)} + metadata = {"int_list": list(array_shape), "dtype": str(data.dtype)} data_bytes = data.tobytes(order="C") return data_bytes, metadata def backward(self, data, metadata, **kwargs): - """Convert bytes back to a float32 Numpy array. + """Convert bytes back to a Numpy array. Args: data: The data in bytes. metadata: The metadata for the conversion. Returns: - The data converted back to a float32 Numpy array. + The data converted back to a Numpy array. """ array_shape = tuple(metadata["int_list"]) - flat_array = np.frombuffer(data, dtype=np.float32) - # For integer parameters we probably should unpack arrays - # with shape (1,) + dtype = np.dtype(metadata["dtype"]) + flat_array = np.frombuffer(data, dtype=dtype) return np.reshape(flat_array, newshape=array_shape, order="C") diff --git a/openfl/pipelines/random_shift_pipeline.py b/openfl/pipelines/random_shift_pipeline.py index 770cc72053..2d058907bc 100644 --- a/openfl/pipelines/random_shift_pipeline.py +++ b/openfl/pipelines/random_shift_pipeline.py @@ -6,7 +6,7 @@ import numpy as np -from openfl.pipelines.pipeline import Float32NumpyArrayToBytes, TransformationPipeline, Transformer +from openfl.pipelines.pipeline import NumpyArrayToBytes, TransformationPipeline, Transformer class RandomShiftTransformer(Transformer): @@ -73,5 +73,5 @@ class RandomShiftPipeline(TransformationPipeline): def __init__(self, **kwargs): """Initialize.""" - transformers = [RandomShiftTransformer(), Float32NumpyArrayToBytes()] + transformers = [RandomShiftTransformer(), NumpyArrayToBytes()] super().__init__(transformers=transformers) diff --git a/openfl/pipelines/tensor_codec.py b/openfl/pipelines/tensor_codec.py index 15edde0965..357cf0fde9 100644 --- a/openfl/pipelines/tensor_codec.py +++ b/openfl/pipelines/tensor_codec.py @@ -69,9 +69,9 @@ def compress(self, tensor_key, data, require_lossless=False, **kwargs): metadata: metadata associated with compressed tensor. """ if require_lossless: - compressed_nparray, metadata = self.lossless_pipeline.forward(data, **kwargs) + data, metadata = self.lossless_pipeline.forward(data, **kwargs) else: - compressed_nparray, metadata = self.compression_pipeline.forward(data, **kwargs) + data, metadata = self.compression_pipeline.forward(data, **kwargs) # Define the compressed tensorkey that should be # returned ('trained.delta'->'trained.delta.lossy_compressed') tensor_name, origin, round_number, report, tags = tensor_key @@ -80,7 +80,7 @@ def compress(self, tensor_key, data, require_lossless=False, **kwargs): else: new_tags = change_tags(tags, add_field="lossy_compressed") compressed_tensor_key = TensorKey(tensor_name, origin, round_number, report, new_tags) - return compressed_tensor_key, compressed_nparray, metadata + return compressed_tensor_key, data, metadata def decompress( self, @@ -121,13 +121,9 @@ def decompress( assert "compressed" in tags, "Cannot losslessly decompress lossy tensor" if require_lossless or "compressed" in tags: - decompressed_nparray = self.lossless_pipeline.backward( - data, transformer_metadata, **kwargs - ) + data = self.lossless_pipeline.backward(data, transformer_metadata, **kwargs) else: - decompressed_nparray = self.compression_pipeline.backward( - data, transformer_metadata, **kwargs - ) + data = self.compression_pipeline.backward(data, transformer_metadata, **kwargs) # Define the decompressed tensorkey that should be returned if "lossy_compressed" in tags: new_tags = change_tags( @@ -144,7 +140,7 @@ def decompress( else: raise NotImplementedError("Decompression is only supported on compressed data") - return decompressed_tensor_key, decompressed_nparray + return decompressed_tensor_key, data @staticmethod def generate_delta(tensor_key, nparray, base_model_nparray): diff --git a/openfl/protocols/base.proto b/openfl/protocols/base.proto index 16b4351fb9..0c1340ae8e 100644 --- a/openfl/protocols/base.proto +++ b/openfl/protocols/base.proto @@ -22,6 +22,7 @@ message MetadataProto { map int_to_float = 1; repeated int32 int_list = 2; repeated bool bool_list = 3; + string dtype = 4; } // handles large size data diff --git a/openfl/protocols/utils.py b/openfl/protocols/utils.py index d19cd594f0..fd61495580 100644 --- a/openfl/protocols/utils.py +++ b/openfl/protocols/utils.py @@ -34,6 +34,7 @@ def model_proto_to_bytes_and_metadata(model_proto): "int_to_float": proto.int_to_float, "int_list": proto.int_list, "bool_list": proto.bool_list, + "dtype": proto.dtype, } for proto in tensor_proto.transformer_metadata ] @@ -115,25 +116,16 @@ def construct_named_tensor(tensor_key, nparray, transformer_metadata, lossless): """ metadata_protos = [] for metadata in transformer_metadata: - if metadata.get("int_to_float") is not None: - int_to_float = metadata.get("int_to_float") - else: - int_to_float = {} - - if metadata.get("int_list") is not None: - int_list = metadata.get("int_list") - else: - int_list = [] - - if metadata.get("bool_list") is not None: - bool_list = metadata.get("bool_list") - else: - bool_list = [] + int_to_float = metadata.get("int_to_float", {}) + int_list = metadata.get("int_list", []) + bool_list = metadata.get("bool_list", []) + dtype = metadata.get("dtype", "") metadata_protos.append( base_pb2.MetadataProto( int_to_float=int_to_float, int_list=int_list, bool_list=bool_list, + dtype=dtype, ) ) diff --git a/tests/openfl/component/aggregator/test_aggregator.py b/tests/openfl/component/aggregator/test_aggregator.py index 342a218fe3..a9ff983992 100644 --- a/tests/openfl/component/aggregator/test_aggregator.py +++ b/tests/openfl/component/aggregator/test_aggregator.py @@ -26,6 +26,7 @@ def model(): metadata.int_to_float[1] = 1. metadata.int_list.extend([1, 8]) metadata.bool_list.append(True) + metadata.dtype = "float32" tensor.data_bytes = 32 * b'1' return model diff --git a/tests/openfl/component/collaborator/test_collaborator.py b/tests/openfl/component/collaborator/test_collaborator.py index 49288c2ffe..280101a751 100644 --- a/tests/openfl/component/collaborator/test_collaborator.py +++ b/tests/openfl/component/collaborator/test_collaborator.py @@ -37,6 +37,7 @@ def named_tensor(): metadata.int_to_float[1] = 1. metadata.int_list.extend([1, 8]) metadata.bool_list.append(True) + metadata.dtype = "float32" return tensor diff --git a/tests/openfl/pipelines/test_pipeline.py b/tests/openfl/pipelines/test_pipeline.py index d2c7eb9795..68204e367e 100644 --- a/tests/openfl/pipelines/test_pipeline.py +++ b/tests/openfl/pipelines/test_pipeline.py @@ -5,7 +5,7 @@ import numpy as np import pytest -from openfl.pipelines.pipeline import Float32NumpyArrayToBytes +from openfl.pipelines.pipeline import NumpyArrayToBytes from openfl.pipelines.pipeline import TransformationPipeline from openfl.pipelines.pipeline import Transformer from openfl.protocols import base_pb2 @@ -46,14 +46,14 @@ def test_transformer_backward(): def test_f32natb_is_lossy(): - """Test that Float32NumpyArrayToBytes object creates with lossy = False.""" - t = Float32NumpyArrayToBytes() + """Test that NumpyArrayToBytes object creates with lossy = False.""" + t = NumpyArrayToBytes() assert t.lossy is False def test_f32natb_forward(named_tensor): - """Test that Float32NumpyArrayToBytes.forward works correctly.""" - t = Float32NumpyArrayToBytes() + """Test that NumpyArrayToBytes.forward works correctly.""" + t = NumpyArrayToBytes() proto = named_tensor.transformer_metadata.pop() metadata = {'int_to_float': proto.int_to_float, 'int_list': proto.int_list, @@ -69,8 +69,8 @@ def test_f32natb_forward(named_tensor): def test_f32natb_backward(named_tensor): - """Test that Float32NumpyArrayToBytes.backward works correctly.""" - t = Float32NumpyArrayToBytes() + """Test that NumpyArrayToBytes.backward works correctly.""" + t = NumpyArrayToBytes() proto = named_tensor.transformer_metadata.pop() metadata = {'int_to_float': proto.int_to_float, 'int_list': proto.int_list, @@ -88,7 +88,7 @@ def test_f32natb_backward(named_tensor): def test_transformation_pipeline_forward(named_tensor): """Test that TransformationPipeline.forward works correctly.""" - transformer = Float32NumpyArrayToBytes() + transformer = NumpyArrayToBytes() tp = TransformationPipeline([transformer]) proto = named_tensor.transformer_metadata.pop() metadata = {'int_to_float': proto.int_to_float, @@ -108,7 +108,7 @@ def test_transformation_pipeline_forward(named_tensor): def test_transformation_pipeline_backward(named_tensor): """Test that TransformationPipeline.backward works correctly.""" - transformer = Float32NumpyArrayToBytes() + transformer = NumpyArrayToBytes() tp = TransformationPipeline([transformer]) proto = named_tensor.transformer_metadata.pop() metadata = {'int_to_float': proto.int_to_float, @@ -127,7 +127,7 @@ def test_transformation_pipeline_backward(named_tensor): def test_transformation_pipeline_is_lossy_false(named_tensor): """Test that TransformationPipeline.is_lossy returns False if all transformers is not lossy.""" - transformer = Float32NumpyArrayToBytes() + transformer = NumpyArrayToBytes() tp = TransformationPipeline([transformer]) is_lossy = tp.is_lossy() @@ -137,8 +137,8 @@ def test_transformation_pipeline_is_lossy_false(named_tensor): def test_transformation_pipeline_is_lossy(named_tensor): """Test that TransformationPipeline.is_lossy returns False if any transformer is lossy.""" - transformer1 = Float32NumpyArrayToBytes() - transformer2 = Float32NumpyArrayToBytes() + transformer1 = NumpyArrayToBytes() + transformer2 = NumpyArrayToBytes() transformer2.lossy = True tp = TransformationPipeline([transformer1, transformer2]) diff --git a/tests/openfl/pipelines/test_tensor_codec.py b/tests/openfl/pipelines/test_tensor_codec.py index 5ff1b6c704..9fad6f6d60 100644 --- a/tests/openfl/pipelines/test_tensor_codec.py +++ b/tests/openfl/pipelines/test_tensor_codec.py @@ -29,6 +29,7 @@ def named_tensor(): metadata.int_to_float[1] = 1. metadata.int_list.extend([1, 8]) metadata.bool_list.append(True) + metadata.dtype = "float32" return tensor @@ -51,7 +52,8 @@ def test_compress(tensor_key, named_tensor): tensor_codec = TensorCodec(NoCompressionPipeline()) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] array_shape = tuple(metadata[0]['int_list']) flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32) @@ -71,7 +73,8 @@ def test_compress_lossless(tensor_key, named_tensor): tensor_codec = TensorCodec(NoCompressionPipeline()) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] array_shape = tuple(metadata[0]['int_list']) flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32) @@ -91,7 +94,8 @@ def test_compress_not_lossy_lossless(tensor_key, named_tensor): tensor_codec = TensorCodec(SKCPipeline()) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] array_shape = tuple(metadata[0]['int_list']) flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32) @@ -111,7 +115,8 @@ def test_compress_not_require_lossless(tensor_key, named_tensor): tensor_codec = TensorCodec(SKCPipeline()) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] array_shape = tuple(metadata[0]['int_list']) flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32) @@ -142,7 +147,8 @@ def test_decompress_no_tags(tensor_key, named_tensor): tensor_codec = TensorCodec(NoCompressionPipeline()) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] with pytest.raises(AssertionError): tensor_codec.decompress( @@ -159,7 +165,8 @@ def test_decompress_require_lossless_no_compressed_in_tags(tensor_key, named_ten ) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] with pytest.raises(AssertionError): tensor_codec.decompress( @@ -176,7 +183,8 @@ def test_decompress_call_lossless_pipeline_with_require_lossless(tensor_key, nam ) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] tensor_codec.lossless_pipeline = mock.Mock() tensor_codec.decompress( @@ -195,7 +203,8 @@ def test_decompress_call_compression_pipeline(tensor_key, named_tensor): ) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] tensor_codec.compression_pipeline = mock.Mock() tensor_codec.decompress( @@ -214,7 +223,8 @@ def test_decompress_lossy_compressed_in_tags(tensor_key, named_tensor): ) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] decompressed_tensor_key, decompressed_nparray = tensor_codec.decompress( tensor_key, named_tensor.data_bytes, metadata @@ -231,7 +241,8 @@ def test_decompress_compressed_in_tags(tensor_key, named_tensor): ) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] decompressed_tensor_key, decompressed_nparray = tensor_codec.decompress( tensor_key, named_tensor.data_bytes, metadata @@ -244,7 +255,8 @@ def test_generate(tensor_key, named_tensor): tensor_codec = TensorCodec(NoCompressionPipeline()) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] array_shape = tuple(metadata[0]['int_list']) flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32) @@ -266,7 +278,8 @@ def test_generate_delta_assert_model_in_tags(tensor_key, named_tensor): ) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] array_shape = tuple(metadata[0]['int_list']) flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32) @@ -286,7 +299,8 @@ def test_apply_delta_agg(tensor_key, named_tensor): ) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] array_shape = tuple(metadata[0]['int_list']) flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32) @@ -309,7 +323,8 @@ def test_apply_delta_col(tensor_key, named_tensor): ) metadata = [{'int_to_float': proto.int_to_float, 'int_list': proto.int_list, - 'bool_list': proto.bool_list + 'bool_list': proto.bool_list, + 'dtype': proto.dtype, } for proto in named_tensor.transformer_metadata] array_shape = tuple(metadata[0]['int_list']) flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32)