diff --git a/openfl-workspace/torch/mnist/plan/cols.yaml b/openfl-workspace/torch/mnist/plan/cols.yaml index 2f4993ebbb..b60b50e5a8 100644 --- a/openfl-workspace/torch/mnist/plan/cols.yaml +++ b/openfl-workspace/torch/mnist/plan/cols.yaml @@ -2,3 +2,5 @@ # Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. collaborators: +- collaborator1 +- collaborator2 \ No newline at end of file diff --git a/openfl-workspace/torch/mnist/plan/plan.yaml b/openfl-workspace/torch/mnist/plan/plan.yaml index af5b6e8656..95c671172d 100644 --- a/openfl-workspace/torch/mnist/plan/plan.yaml +++ b/openfl-workspace/torch/mnist/plan/plan.yaml @@ -24,7 +24,7 @@ compression_pipeline: template: openfl.pipelines.NoCompressionPipeline data_loader: settings: - batch_size: 64 + batch_size: 1 collaborator_count: 2 template: src.dataloader.PyTorchMNISTInMemory network: @@ -39,12 +39,14 @@ tasks: apply: global metrics: - acc + use_tqdm: 1 locally_tuned_model_validation: function: validate_task kwargs: apply: local metrics: - acc + use_tqdm: 1 settings: {} train: function: train_task @@ -52,3 +54,4 @@ tasks: epochs: 1 metrics: - loss + use_tqdm: 1 diff --git a/openfl-workspace/torch/mnist/src/cnn_model.py b/openfl-workspace/torch/mnist/src/cnn_model.py index aebad53904..037ac81a6b 100644 --- a/openfl-workspace/torch/mnist/src/cnn_model.py +++ b/openfl-workspace/torch/mnist/src/cnn_model.py @@ -5,6 +5,7 @@ import torch import torch.nn as nn import torch.nn.functional as F +import torchvision.models as models class DigitRecognizerCNN(nn.Module): """ @@ -44,10 +45,8 @@ def __init__(self, **kwargs): fc2 (nn.Linear): Second fully connected layer with 500 input features and 10 output features. """ super(DigitRecognizerCNN, self).__init__(**kwargs) - self.conv1 = nn.Conv2d(1, 20, 2, 1) - self.conv2 = nn.Conv2d(20, 50, 5, 1) - self.fc1 = nn.Linear(800, 500) - self.fc2 = nn.Linear(500, 10) + self.model = models.resnet50(pretrained=True) + self.fc2 = nn.Linear(1000, 10) # Update the number of output features to 10 def forward(self, x): """ @@ -63,12 +62,7 @@ def forward(self, x): Returns: torch.Tensor: Output tensor after passing through the CNN layers. """ - x = F.relu(self.conv1(x)) - x = F.max_pool2d(x, 2, 2) - x = F.relu(self.conv2(x)) - x = F.max_pool2d(x, 2, 2) - x = x.view(-1, 800) - x = F.relu(self.fc1(x)) + x = self.model(x) x = self.fc2(x) return x diff --git a/openfl-workspace/torch/mnist/src/dataloader.py b/openfl-workspace/torch/mnist/src/dataloader.py index 3f3eeeb0bb..81a127f710 100644 --- a/openfl-workspace/torch/mnist/src/dataloader.py +++ b/openfl-workspace/torch/mnist/src/dataloader.py @@ -6,8 +6,21 @@ from openfl.federated import PyTorchDataLoader from torchvision import datasets from torchvision import transforms +import torch import numpy as np from logging import getLogger +from torchvision import transforms +from torch.utils.data import DataLoader + +# Define the preprocessing transformations +preprocess = transforms.Compose( + [ + transforms.Resize(64), # Resize the shorter side to 256 + transforms.CenterCrop(64), # Crop the center to a 224x224 square + transforms.ToTensor(), # Convert to a PyTorch tensor + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), + ] +) logger = getLogger(__name__) @@ -30,20 +43,24 @@ def __init__(self, data_path, batch_size, **kwargs): int(data_path) except: raise ValueError( - "Expected `%s` to be representable as `int`, as it refers to the data shard " + - "number used by the collaborator.", - data_path + "Expected `%s` to be representable as `int`, as it refers to the data shard " + + "number used by the collaborator.", + data_path, ) num_classes, X_train, y_train, X_valid, y_valid = load_mnist_shard( shard_num=int(data_path), **kwargs ) - self.X_train = X_train - self.y_train = y_train + t = torch.from_numpy + + number = 1 + self.X_train = t(np.random.random([number, 3, 64, 64])).float() + self.y_train = t(np.random.randint(0, 9, [number])) + self.train_loader = self.get_train_loader() - self.X_valid = X_valid - self.y_valid = y_valid + self.X_valid = t(np.random.random([number, 3, 64, 64])).float() + self.y_valid = t(np.random.randint(0, 9, [number])) self.val_loader = self.get_valid_loader() self.num_classes = num_classes @@ -76,7 +93,7 @@ def load_mnist_shard( num_classes = 10 (X_train, y_train), (X_valid, y_valid) = _load_raw_datashards( - shard_num, collaborator_count, transform=transforms.ToTensor() + shard_num, collaborator_count, transform=preprocess ) logger.info(f"MNIST > X_train Shape : {X_train.shape}") @@ -121,7 +138,9 @@ def _load_raw_datashards(shard_num, collaborator_count, transform=None): 2 tuples: (image, label) of the training, validation dataset """ train_data, val_data = ( - datasets.MNIST("data", train=train, download=True, transform=transform) + datasets.MNIST( + "~/workspace/giant_data", train=train, download=True, transform=transform + ) for train in (True, False) ) X_train_tot, y_train_tot = train_data.train_data, train_data.train_labels diff --git a/openfl/callbacks/callback_list.py b/openfl/callbacks/callback_list.py index fbf7ef22c7..6dffb04edd 100644 --- a/openfl/callbacks/callback_list.py +++ b/openfl/callbacks/callback_list.py @@ -3,6 +3,7 @@ from openfl.callbacks.callback import Callback from openfl.callbacks.memory_profiler import MemoryProfiler from openfl.callbacks.metric_writer import MetricWriter +from tictoc import bench_dict, timer class CallbackList(Callback): @@ -68,6 +69,16 @@ def _add_default_callbacks(self, add_memory_profiler, add_metric_writer): self.callbacks.append(self._metric_writer) def on_round_begin(self, round_num: int, logs=None): + if logs == 'agg': + if round_num > 0: + elapsed_time = timer.toc() + timer.tic() + with open(f"elapsed_time.txt", "a") as file: + file.write(str(elapsed_time) + "\n") + else: + timer.tic() + + bench_dict['global'].gstep() for callback in self.callbacks: callback.on_round_begin(round_num, logs) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index af457c00a7..7b9028108d 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -14,12 +14,13 @@ import openfl.callbacks as callbacks_module from openfl.component.aggregator.straggler_handling import StragglerPolicy, WaitForAllPolicy -from openfl.databases import PersistentTensorDB, TensorDB +from openfl.databases import PersistentTensorDB, TensorDB, TRY_CHANGE from openfl.interface.aggregation_functions import SecureWeightedAverage, WeightedAverage from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import base_pb2, utils from openfl.protocols.base_pb2 import NamedTensor from openfl.utilities import TaskResultKey, TensorKey, change_tags +from tictoc import bench_dict logger = logging.getLogger(__name__) @@ -231,7 +232,7 @@ def __init__( # TODO: Aggregator has no concrete notion of round_begin. # https://github.com/securefederatedai/openfl/pull/1195#discussion_r1879479537 self.callbacks.on_experiment_begin() - self.callbacks.on_round_begin(self.round_number) + self.callbacks.on_round_begin(self.round_number, 'agg') def _recover(self): """Populates the aggregator state to the state it was prior a restart""" @@ -476,6 +477,7 @@ def get_tasks(self, collaborator_name): sleep_time (int): Sleep time. time_to_quit (bool): Whether it's time to quit. """ + bench_dict['global'].step('wait get tasks') logger.debug( f"Aggregator GetTasks function reached from collaborator {collaborator_name}..." ) @@ -543,6 +545,7 @@ def get_tasks(self, collaborator_name): # Start straggler handling policy for timer based callback is required # for %age based policy callback is not required self.straggler_handling_policy.start_policy(callback=self._straggler_cutoff_time_elapsed) + bench_dict['global'].step('get_tasks') return tasks, self.round_number, sleep_time, time_to_quit @@ -607,6 +610,9 @@ def get_aggregated_tensor( Raises: ValueError: if Aggregator does not have an aggregated tensor for {tensor_key}. """ + bench_dict['global'].step('wait get aggregated tensor') + bench_dict['get_aggregate_tensor'].gstep() + if "compressed" in tags or require_lossless: compress_lossless = True else: @@ -624,16 +630,23 @@ def get_aggregated_tensor( tags = change_tags(tags, remove_field="compressed") if "lossy_compressed" in tags: tags = change_tags(tags, remove_field="lossy_compressed") + + bench_dict['get_aggregate_tensor'].step('change tag') tensor_key = TensorKey(tensor_name, self.uuid, round_number, report, tags) tensor_name, origin, round_number, report, tags = tensor_key + + bench_dict['get_aggregate_tensor'].step('get tensorkey') if "aggregated" in tags and "delta" in tags and round_number != 0: agg_tensor_key = TensorKey(tensor_name, origin, round_number, report, ("aggregated",)) else: agg_tensor_key = tensor_key + + bench_dict['get_aggregate_tensor'].step('tensorkey if') nparray = self.tensor_db.get_tensor_from_cache(agg_tensor_key) + bench_dict['get_aggregate_tensor'].step('tensor from cache') start_retrieving_time = time.time() while nparray is None: @@ -642,6 +655,7 @@ def get_aggregated_tensor( nparray = self.tensor_db.get_tensor_from_cache(agg_tensor_key) if (time.time() - start_retrieving_time) > 60: break + bench_dict['get_aggregate_tensor'].step('wait for tensorkey') if nparray is None: raise ValueError(f"Aggregator does not have an aggregated tensor for {tensor_key}") @@ -652,6 +666,9 @@ def get_aggregated_tensor( named_tensor = self._nparray_to_named_tensor( agg_tensor_key, nparray, send_model_deltas=True, compress_lossless=compress_lossless ) + bench_dict['get_aggregate_tensor'].step('_nparray_to_named_tensor') + bench_dict['get_aggregate_tensor'].gstop() + bench_dict['global'].step('get_aggregate_tensor') return named_tensor @@ -773,6 +790,7 @@ def send_local_task_results( Returns: None """ + bench_dict['global'].step('wait send local task') # Check if secure aggregation is enabled. if self._secure_aggregation_enabled: secagg_setup = self.secagg.process_secagg_setup_tensors(named_tensors) @@ -794,7 +812,7 @@ def send_local_task_results( f"Collaborator {collaborator_name} is sending task results " f"for {task_name}, round {round_number}" ) - + bench_dict['global'].step('send task results') self.process_task_results( collaborator_name, round_number, task_name, data_size, named_tensors ) @@ -869,6 +887,7 @@ def process_task_results( # Check if collaborator or round is done. self._is_collaborator_done(collaborator_name, round_number) + bench_dict['global'].step('process task') self._end_of_round_with_stragglers_check() def _end_of_round_with_stragglers_check(self): @@ -1191,6 +1210,9 @@ def _end_of_round_check(self): for task_name in self.assigner.get_all_tasks_for_round(self.round_number): logs.update(self._compute_validation_related_task_metrics(task_name)) + # End of round callbacks. + self.callbacks.on_round_end(self.round_number, logs) + # Once all of the task results have been processed self._end_of_round_check_done[self.round_number] = True @@ -1211,7 +1233,20 @@ def _end_of_round_check(self): # End of round callbacks. # todo handle case when aggregator restarted before callback was successful + + bench_dict['global'].step('save model') + self.callbacks.on_round_end(self.round_number, logs) + bench_dict['global'].step('on round end') + if self.round_number % 10 == 0: + bench_dict.save() + bench_dict['global'].step('save tictoc') + + if self.round_number % 3 == 0: + self.tensor_db.tensor_db.to_pickle(f'tensor_db_{str(self.round_number).zfill(2)}.pkl') + if TRY_CHANGE: + self.tensor_db.secondary_db.to_pickle(f'secondary_tensor_db_{str(self.round_number).zfill(2)}.pkl') + bench_dict['global'].step('save_db') self.round_number += 1 @@ -1227,15 +1262,58 @@ def _end_of_round_check(self): logger.info("Experiment Completed. Cleaning up...") # End of experiment callbacks. self.callbacks.on_experiment_end() + bench_dict.save() else: logger.info("Starting round %s...", self.round_number) # https://github.com/securefederatedai/openfl/pull/1195#discussion_r1879479537 - self.callbacks.on_round_begin(self.round_number) + bench_dict['global'].step('other') + bench_dict['global'].gstop() + self.callbacks.on_round_begin(self.round_number, 'agg') # Cleaning tensor db self.tensor_db.clean_up(self.db_store_rounds) + bench_dict['global'].step('Cleaning tensor db') # Reset straggler handling policy for the next round. self.straggler_handling_policy.reset_policy_for_round() + bench_dict['global'].step('reset straggler') + + def _has_analytics_results(self): + """ + Check if the current round has analytics results. + + Returns: + bool: True if the current round has analytics results, False otherwise. + """ + analytics_result = self.tensor_db.get_tensors_by_round_and_tags( + self.round_number, ("analytics",) + ) + return len(analytics_result) > 0 + + def save_analytics_result(self): + """ + Save analytics results to a JSON file. + This method retrieves tensors tagged with "analytics" for the current round + from the tensor database and saves them as a JSON file at the path specified + by `self.last_state_path`. The tensor values are converted to lists if they + are NumPy arrays. + The saved JSON file contains a dictionary where the keys are tensor names + and the values are the corresponding tensor data. + Logs the saved analytics result for reference. + Returns: + None + """ + analytics_result = self.tensor_db.get_tensors_by_round_and_tags( + self.round_number, ("analytics",) + ) + if len(analytics_result) > 0 and self.last_state_path: + with open(self.last_state_path, "w") as jsonfile: + analytics_result_json = {} + for tensorkey, values in analytics_result.items(): + if isinstance(values, np.ndarray): + values = values.tolist() + analytics_result_json[tensorkey.tensor_name] = values + json.dump(analytics_result_json, jsonfile, indent=4) + logger.debug(f"Analytics result: {analytics_result_json}") def _has_analytics_results(self): """ diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 805825d5e1..b7b78fd2b2 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -158,23 +158,30 @@ def ping(self): self.client.ping() def run(self): + from tictoc import bench_dict, timer """Run the collaborator.""" # Experiment begin + timer.tic() self.callbacks.on_experiment_begin() - + timer.pttoc('experiment starts') while True: + bench_dict[self.collaborator_name].gstep() tasks, round_num, sleep_time, time_to_quit = self.client.get_tasks() - + bench_dict[self.collaborator_name].step('wait for tasks') if time_to_quit: + bench_dict[self.collaborator_name].gstop() break if not tasks: sleep(sleep_time) + bench_dict[self.collaborator_name].step('sleep time') + bench_dict[self.collaborator_name].gstop() continue - + bench_dict[self.collaborator_name].step('sleep time') # Round begin logger.info("Round: %d Received Tasks: %s", round_num, tasks) self.callbacks.on_round_begin(round_num) + bench_dict[self.collaborator_name].step('on round begin') # Run tasks logs = {} @@ -182,12 +189,22 @@ def run(self): metrics = self.do_task(task, round_num) logs.update(metrics) + if isinstance(task, str): + task_name = task + else: + task_name = task.name + bench_dict[self.collaborator_name].step(f'do task {task_name}') # Round end self.tensor_db.clean_up(self.db_store_rounds) + bench_dict[self.collaborator_name].step('clean_up') self.callbacks.on_round_end(round_num, logs) + bench_dict[self.collaborator_name].step('on_round_end') + bench_dict[self.collaborator_name].gstop() + bench_dict.save() # Experiment end self.callbacks.on_experiment_end() + bench_dict.save() logger.info("Received shutdown signal. Exiting...") def do_task(self, task, round_number) -> dict: diff --git a/openfl/databases/__init__.py b/openfl/databases/__init__.py index 0e64082d5f..bfe76aed65 100644 --- a/openfl/databases/__init__.py +++ b/openfl/databases/__init__.py @@ -3,4 +3,4 @@ from openfl.databases.persistent_db import PersistentTensorDB -from openfl.databases.tensor_db import TensorDB +from openfl.databases.tensor_db import TensorDB, TRY_CHANGE diff --git a/openfl/databases/tensor_db.py b/openfl/databases/tensor_db.py index e7bb6d76b9..b0330824d1 100644 --- a/openfl/databases/tensor_db.py +++ b/openfl/databases/tensor_db.py @@ -14,7 +14,18 @@ from openfl.databases.utilities import ROUND_PLACEHOLDER, _retrieve, _search, _store from openfl.interface.aggregation_functions import AggregationFunction from openfl.utilities import LocalTensor, TensorKey, change_tags - +import os +import sys +from tictoc import bench_dict +try_change = os.getenv('TRY_CHANGE', 'False') +try_change = try_change.lower() in ['true', '1', 't', 'y', 'yes'] +if try_change: + print('using TRY_CHANGE') + TRY_CHANGE=True +else: + TRY_CHANGE=False + +member_name = sys.argv[4] if len(sys.argv) > 3 else 'aggregator' class TensorDB: """The TensorDB stores a tensor key and the data that it corresponds to. @@ -43,6 +54,8 @@ def __init__(self) -> None: self.tensor_db = pd.DataFrame( {col: pd.Series(dtype=dtype) for col, dtype in types_dict.items()} ) + if TRY_CHANGE: + self.secondary_db = self.tensor_db self._bind_convenience_methods() self.mutex = Lock() @@ -86,6 +99,7 @@ def clean_up(self, remove_older_than: int = 1) -> None: if remove_older_than < 0: # Getting a negative argument calls off cleaning return + bench_dict['clean_up' + member_name].gstep() current_round = self.tensor_db["round"].astype(int).max() if current_round == ROUND_PLACEHOLDER: current_round = np.sort(self.tensor_db["round"].astype(int).unique())[-2] @@ -93,6 +107,17 @@ def clean_up(self, remove_older_than: int = 1) -> None: (self.tensor_db["round"].astype(int) > current_round - remove_older_than) | self.tensor_db["report"] ].reset_index(drop=True) + bench_dict['clean_up' + member_name].step('normal') + if TRY_CHANGE: + self.secondary_db = self.tensor_db[ + ~self.tensor_db["tags"].apply( + lambda x: any( + keyword in item for item in x for keyword in ["collaborator", "metric"] + ) + ) + ].reset_index(drop=True) + bench_dict['clean_up' + member_name].step('extra') + bench_dict['clean_up' + member_name].gstop() def cache_tensor(self, tensor_key_dict: Dict[TensorKey, np.ndarray]) -> None: """Insert a tensor into TensorDB (dataframe). @@ -105,26 +130,40 @@ def cache_tensor(self, tensor_key_dict: Dict[TensorKey, np.ndarray]) -> None: None """ entries_to_add = [] + with self.mutex: + bench_dict['cache_tensor' + member_name].gstep() for tensor_key, nparray in tensor_key_dict.items(): tensor_name, origin, fl_round, report, tags = tensor_key entries_to_add.append( - pd.DataFrame( - [ - [ - tensor_name, - origin, - fl_round, - report, - tags, - nparray, - ] - ], - columns=list(self.tensor_db.columns), - ) - ) - - self.tensor_db = pd.concat([self.tensor_db, *entries_to_add], ignore_index=True) + { + "tensor_name": tensor_name, + "origin": origin, + "round": fl_round, + "report": report, + "tags": tags, + "nparray": nparray, + }) + bench_dict['cache_tensor' + member_name].step('for') + + if len(entries_to_add)>0: + new_data = pd.DataFrame(entries_to_add) + self.tensor_db = pd.concat([self.tensor_db, new_data], ignore_index=True) + bench_dict['cache_tensor' + member_name].step('normal') + if TRY_CHANGE: + filtered_new_data = new_data[ + ~new_data["tags"].apply( + lambda x: any( + keyword in item for item in x for keyword in ["collaborator", "metric"] + ) + ) + ].reset_index(drop=True) + bench_dict['cache_tensor' + member_name].step('extra new data') + if len(filtered_new_data) > 0: + self.secondary_db = pd.concat([self.secondary_db, filtered_new_data], ignore_index=True) + bench_dict['cache_tensor' + member_name].step('extra append') + + bench_dict['cache_tensor' + member_name].gstop() def get_tensor_from_cache(self, tensor_key: TensorKey) -> Optional[np.ndarray]: """Perform a lookup of the tensor_key in the TensorDB. @@ -139,13 +178,51 @@ def get_tensor_from_cache(self, tensor_key: TensorKey) -> Optional[np.ndarray]: tensor_name, origin, fl_round, report, tags = tensor_key # TODO come up with easy way to ignore compression - df = self.tensor_db[ - (self.tensor_db["tensor_name"] == tensor_name) - & (self.tensor_db["origin"] == origin) - & (self.tensor_db["round"] == fl_round) - & (self.tensor_db["report"] == report) - & (self.tensor_db["tags"] == tags) - ] + bench_dict['get_cache_tensor' + member_name].gstep() + if any(keyword in item for item in tags for keyword in ["collaborator", "metric"]) or not TRY_CHANGE: + df = self.tensor_db[ + (self.tensor_db["tensor_name"] == tensor_name) + & (self.tensor_db["origin"] == origin) + & (self.tensor_db["round"] == fl_round) + & (self.tensor_db["report"] == report) + & (self.tensor_db["tags"] == tags) + ] + bench_dict['get_cache_tensor' + member_name].step('normal') + bench_dict['get_cache_tensor' + member_name].gstop() + else: + df = self.secondary_db[ + (self.secondary_db["tensor_name"] == tensor_name) + & (self.secondary_db["origin"] == origin) + & (self.secondary_db["round"] == fl_round) + & (self.secondary_db["report"] == report) + & (self.secondary_db["tags"] == tags) + ] + bench_dict['get_cache_tensor' + member_name].step('new') + bench_dict['get_cache_tensor' + member_name].gstop() + if len(df) == 0 and False: + self.secondary_db = self.tensor_db[ + ~self.tensor_db["tags"].apply( + lambda x: any( + keyword in item for item in x for keyword in ["collaborator", "metric"] + ) + ) + ].reset_index(drop=True) + print("NOT FOUND") + print(tags) + print("UPDATING") + + if self.secondary_db.empty: + self.secondary_db = pd.DataFrame(columns=self.tensor_db.columns) + + df = self.secondary_db[ + (self.secondary_db["tensor_name"] == tensor_name) + & (self.secondary_db["origin"] == origin) + & (self.secondary_db["round"] == fl_round) + & (self.secondary_db["report"] == report) + & (self.secondary_db["tags"] == tags) + ] + if len(df) == 0: + print('one of those') if len(df) == 0: return None