Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions openfl/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ def _process_named_tensor(self, named_tensor, collaborator_name):
"int_to_float": proto.int_to_float,
"int_list": proto.int_list,
"bool_list": proto.bool_list,
"dtype": proto.dtype,
}
for proto in named_tensor.transformer_metadata
]
Expand Down
1 change: 1 addition & 0 deletions openfl/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ def named_tensor_to_nparray(self, named_tensor):
"int_to_float": proto.int_to_float,
"int_list": proto.int_list,
"bool_list": proto.bool_list,
"dtype": proto.dtype,
}
for proto in named_tensor.transformer_metadata
]
Expand Down
6 changes: 3 additions & 3 deletions openfl/pipelines/eden_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import numpy as np
import torch

from openfl.pipelines.pipeline import Float32NumpyArrayToBytes, TransformationPipeline, Transformer
from openfl.pipelines.pipeline import NumpyArrayToBytes, TransformationPipeline, Transformer


class Eden:
Expand Down Expand Up @@ -731,7 +731,7 @@ class EdenTransformer(Transformer):
with dimensions less than this threshold are not compressed.
device (str): The device to be used for quantization ('cpu' or 'cuda').
eden (Eden): The Eden object for quantization.
no_comp (Float32NumpyArrayToBytes): The transformer for data that are
no_comp (NumpyArrayToBytes): The transformer for data that are
not compressed.
"""

Expand All @@ -756,7 +756,7 @@ def __init__(self, n_bits=8, dim_threshold=100, device="cpu"):
)

self.dim_threshold = dim_threshold
self.no_comp = Float32NumpyArrayToBytes()
self.no_comp = NumpyArrayToBytes()

def forward(self, data, **kwargs):
"""Quantize data.
Expand Down
4 changes: 2 additions & 2 deletions openfl/pipelines/no_compression_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

"""NoCompressionPipeline module."""

from openfl.pipelines.pipeline import Float32NumpyArrayToBytes, TransformationPipeline
from openfl.pipelines.pipeline import NumpyArrayToBytes, TransformationPipeline


class NoCompressionPipeline(TransformationPipeline):
"""The data pipeline without any compression."""

def __init__(self, **kwargs):
"""Initialize."""
super().__init__(transformers=[Float32NumpyArrayToBytes()], **kwargs)
super().__init__(transformers=[NumpyArrayToBytes()], **kwargs)
25 changes: 9 additions & 16 deletions openfl/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,48 +48,41 @@ def backward(self, data, metadata, **kwargs):
raise NotImplementedError


class Float32NumpyArrayToBytes(Transformer):
"""Transformer class for converting float32 Numpy arrays to bytes
arrays."""
class NumpyArrayToBytes(Transformer):
"""Transformer for converting generic Numpy arrays to bytes."""

def __init__(self):
"""Initialize Float32NumpyArrayToBytes."""
self.lossy = False

def forward(self, data: np.ndarray, **kwargs):
"""Convert a float32 Numpy array to bytes.
"""Convert a Numpy array to bytes.

Args:
data: The float32 Numpy array to be converted.
data: The Numpy array to be converted.
**kwargs: Additional keyword arguments for the conversion.

Returns:
data_bytes: The data converted to bytes.
metadata: The metadata for the conversion.
"""
# TODO: Warn when this casting is being performed.
if data.dtype != np.float32:
data = data.astype(np.float32)
array_shape = data.shape
# Better call it array_shape?
metadata = {"int_list": list(array_shape)}
metadata = {"int_list": list(array_shape), "dtype": str(data.dtype)}
data_bytes = data.tobytes(order="C")
return data_bytes, metadata

def backward(self, data, metadata, **kwargs):
"""Convert bytes back to a float32 Numpy array.
"""Convert bytes back to a Numpy array.

Args:
data: The data in bytes.
metadata: The metadata for the conversion.

Returns:
The data converted back to a float32 Numpy array.
The data converted back to a Numpy array.
"""
array_shape = tuple(metadata["int_list"])
flat_array = np.frombuffer(data, dtype=np.float32)
# For integer parameters we probably should unpack arrays
# with shape (1,)
dtype = np.dtype(metadata["dtype"])
flat_array = np.frombuffer(data, dtype=dtype)
return np.reshape(flat_array, newshape=array_shape, order="C")


Expand Down
4 changes: 2 additions & 2 deletions openfl/pipelines/random_shift_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import numpy as np

from openfl.pipelines.pipeline import Float32NumpyArrayToBytes, TransformationPipeline, Transformer
from openfl.pipelines.pipeline import NumpyArrayToBytes, TransformationPipeline, Transformer


class RandomShiftTransformer(Transformer):
Expand Down Expand Up @@ -73,5 +73,5 @@ class RandomShiftPipeline(TransformationPipeline):

def __init__(self, **kwargs):
"""Initialize."""
transformers = [RandomShiftTransformer(), Float32NumpyArrayToBytes()]
transformers = [RandomShiftTransformer(), NumpyArrayToBytes()]
super().__init__(transformers=transformers)
16 changes: 6 additions & 10 deletions openfl/pipelines/tensor_codec.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer(s): Changes in this file are unrelated to the PR. Variables are renamed for readability and python GC action

Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def compress(self, tensor_key, data, require_lossless=False, **kwargs):
metadata: metadata associated with compressed tensor.
"""
if require_lossless:
compressed_nparray, metadata = self.lossless_pipeline.forward(data, **kwargs)
data, metadata = self.lossless_pipeline.forward(data, **kwargs)
else:
compressed_nparray, metadata = self.compression_pipeline.forward(data, **kwargs)
data, metadata = self.compression_pipeline.forward(data, **kwargs)
# Define the compressed tensorkey that should be
# returned ('trained.delta'->'trained.delta.lossy_compressed')
tensor_name, origin, round_number, report, tags = tensor_key
Expand All @@ -80,7 +80,7 @@ def compress(self, tensor_key, data, require_lossless=False, **kwargs):
else:
new_tags = change_tags(tags, add_field="lossy_compressed")
compressed_tensor_key = TensorKey(tensor_name, origin, round_number, report, new_tags)
return compressed_tensor_key, compressed_nparray, metadata
return compressed_tensor_key, data, metadata

def decompress(
self,
Expand Down Expand Up @@ -121,13 +121,9 @@ def decompress(
assert "compressed" in tags, "Cannot losslessly decompress lossy tensor"

if require_lossless or "compressed" in tags:
decompressed_nparray = self.lossless_pipeline.backward(
data, transformer_metadata, **kwargs
)
data = self.lossless_pipeline.backward(data, transformer_metadata, **kwargs)
else:
decompressed_nparray = self.compression_pipeline.backward(
data, transformer_metadata, **kwargs
)
data = self.compression_pipeline.backward(data, transformer_metadata, **kwargs)
# Define the decompressed tensorkey that should be returned
if "lossy_compressed" in tags:
new_tags = change_tags(
Expand All @@ -144,7 +140,7 @@ def decompress(
else:
raise NotImplementedError("Decompression is only supported on compressed data")

return decompressed_tensor_key, decompressed_nparray
return decompressed_tensor_key, data

@staticmethod
def generate_delta(tensor_key, nparray, base_model_nparray):
Expand Down
1 change: 1 addition & 0 deletions openfl/protocols/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message MetadataProto {
map<int32, float> int_to_float = 1;
repeated int32 int_list = 2;
repeated bool bool_list = 3;
string dtype = 4;
}

// handles large size data
Expand Down
20 changes: 6 additions & 14 deletions openfl/protocols/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def model_proto_to_bytes_and_metadata(model_proto):
"int_to_float": proto.int_to_float,
"int_list": proto.int_list,
"bool_list": proto.bool_list,
"dtype": proto.dtype,
}
for proto in tensor_proto.transformer_metadata
]
Expand Down Expand Up @@ -115,25 +116,16 @@ def construct_named_tensor(tensor_key, nparray, transformer_metadata, lossless):
"""
metadata_protos = []
for metadata in transformer_metadata:
if metadata.get("int_to_float") is not None:
int_to_float = metadata.get("int_to_float")
else:
int_to_float = {}

if metadata.get("int_list") is not None:
int_list = metadata.get("int_list")
else:
int_list = []

if metadata.get("bool_list") is not None:
bool_list = metadata.get("bool_list")
else:
bool_list = []
int_to_float = metadata.get("int_to_float", {})
int_list = metadata.get("int_list", [])
bool_list = metadata.get("bool_list", [])
dtype = metadata.get("dtype", "")
metadata_protos.append(
base_pb2.MetadataProto(
int_to_float=int_to_float,
int_list=int_list,
bool_list=bool_list,
dtype=dtype,
)
)

Expand Down
1 change: 1 addition & 0 deletions tests/openfl/component/aggregator/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def model():
metadata.int_to_float[1] = 1.
metadata.int_list.extend([1, 8])
metadata.bool_list.append(True)
metadata.dtype = "float32"
tensor.data_bytes = 32 * b'1'

return model
Expand Down
1 change: 1 addition & 0 deletions tests/openfl/component/collaborator/test_collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def named_tensor():
metadata.int_to_float[1] = 1.
metadata.int_list.extend([1, 8])
metadata.bool_list.append(True)
metadata.dtype = "float32"

return tensor

Expand Down
24 changes: 12 additions & 12 deletions tests/openfl/pipelines/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import pytest

from openfl.pipelines.pipeline import Float32NumpyArrayToBytes
from openfl.pipelines.pipeline import NumpyArrayToBytes
from openfl.pipelines.pipeline import TransformationPipeline
from openfl.pipelines.pipeline import Transformer
from openfl.protocols import base_pb2
Expand Down Expand Up @@ -46,14 +46,14 @@ def test_transformer_backward():


def test_f32natb_is_lossy():
"""Test that Float32NumpyArrayToBytes object creates with lossy = False."""
t = Float32NumpyArrayToBytes()
"""Test that NumpyArrayToBytes object creates with lossy = False."""
t = NumpyArrayToBytes()
assert t.lossy is False


def test_f32natb_forward(named_tensor):
"""Test that Float32NumpyArrayToBytes.forward works correctly."""
t = Float32NumpyArrayToBytes()
"""Test that NumpyArrayToBytes.forward works correctly."""
t = NumpyArrayToBytes()
proto = named_tensor.transformer_metadata.pop()
metadata = {'int_to_float': proto.int_to_float,
'int_list': proto.int_list,
Expand All @@ -69,8 +69,8 @@ def test_f32natb_forward(named_tensor):


def test_f32natb_backward(named_tensor):
"""Test that Float32NumpyArrayToBytes.backward works correctly."""
t = Float32NumpyArrayToBytes()
"""Test that NumpyArrayToBytes.backward works correctly."""
t = NumpyArrayToBytes()
proto = named_tensor.transformer_metadata.pop()
metadata = {'int_to_float': proto.int_to_float,
'int_list': proto.int_list,
Expand All @@ -88,7 +88,7 @@ def test_f32natb_backward(named_tensor):

def test_transformation_pipeline_forward(named_tensor):
"""Test that TransformationPipeline.forward works correctly."""
transformer = Float32NumpyArrayToBytes()
transformer = NumpyArrayToBytes()
tp = TransformationPipeline([transformer])
proto = named_tensor.transformer_metadata.pop()
metadata = {'int_to_float': proto.int_to_float,
Expand All @@ -108,7 +108,7 @@ def test_transformation_pipeline_forward(named_tensor):

def test_transformation_pipeline_backward(named_tensor):
"""Test that TransformationPipeline.backward works correctly."""
transformer = Float32NumpyArrayToBytes()
transformer = NumpyArrayToBytes()
tp = TransformationPipeline([transformer])
proto = named_tensor.transformer_metadata.pop()
metadata = {'int_to_float': proto.int_to_float,
Expand All @@ -127,7 +127,7 @@ def test_transformation_pipeline_backward(named_tensor):

def test_transformation_pipeline_is_lossy_false(named_tensor):
"""Test that TransformationPipeline.is_lossy returns False if all transformers is not lossy."""
transformer = Float32NumpyArrayToBytes()
transformer = NumpyArrayToBytes()
tp = TransformationPipeline([transformer])

is_lossy = tp.is_lossy()
Expand All @@ -137,8 +137,8 @@ def test_transformation_pipeline_is_lossy_false(named_tensor):

def test_transformation_pipeline_is_lossy(named_tensor):
"""Test that TransformationPipeline.is_lossy returns False if any transformer is lossy."""
transformer1 = Float32NumpyArrayToBytes()
transformer2 = Float32NumpyArrayToBytes()
transformer1 = NumpyArrayToBytes()
transformer2 = NumpyArrayToBytes()
transformer2.lossy = True
tp = TransformationPipeline([transformer1, transformer2])

Expand Down
Loading
Loading