diff --git a/analysis/tests/cross_partition_combiners_test.py b/analysis/tests/cross_partition_combiners_test.py index abe88fa4..22e0ddb6 100644 --- a/analysis/tests/cross_partition_combiners_test.py +++ b/analysis/tests/cross_partition_combiners_test.py @@ -378,8 +378,8 @@ def test_create_report_partition_size_is_used_as_weight_wo_mocks(self): _, _, weight = combiner.create_accumulator(per_partition_metrics) self.assertEqual(weight, 5.0) - @patch( - "analysis.cross_partition_combiners._per_partition_to_utility_report") + @patch("analysis.cross_partition_combiners._per_partition_to_utility_report" + ) def test_create_report_with_mocks(self, mock_per_partition_to_utility_report): dp_metrics = [pipeline_dp.Metrics.COUNT] diff --git a/examples/restaurant_visits/run_on_dataframes.py b/examples/restaurant_visits/run_on_dataframes.py new file mode 100644 index 00000000..cea9423c --- /dev/null +++ b/examples/restaurant_visits/run_on_dataframes.py @@ -0,0 +1,136 @@ +# Copyright 2022 OpenMined. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" Demo of running PipelineDP on (Pandas, Spark, Beam) DataFrames + +1. Install Python and run on the command line `pip install pipeline-dp absl-py` +2. Run python run_on_dataframes.py --input_file= --output_file=<...> --dataframes=pandas +""" + +from absl import app +from absl import flags +import os +import shutil +import pandas as pd + +from pyspark.sql import SparkSession +import pyspark + +import apache_beam as beam +from apache_beam.runners.portability import fn_api_runner +from apache_beam.dataframe.io import read_csv + +import pipeline_dp +from pipeline_dp import dataframes + +FLAGS = flags.FLAGS +flags.DEFINE_string('input_file', 'restaurants_week_data.csv', + 'The file with the restaraunt visits data') +flags.DEFINE_string('output_file', None, 'Output file') +flags.DEFINE_enum('dataframes', 'pandas', ['pandas', 'spark', 'beam'], + 'Which dataframes to use.') + + +def delete_if_exists(filename): + if os.path.exists(filename): + if os.path.isdir(filename): + shutil.rmtree(filename) + else: + os.remove(filename) + + +def load_data_in_pandas_dataframe() -> pd.DataFrame: + df = pd.read_csv(FLAGS.input_file) + df.rename(inplace=True, + columns={ + 'VisitorId': 'visitor_id', + 'Time entered': 'enter_time', + 'Time spent (minutes)': 'spent_minutes', + 'Money spent (euros)': 'spent_money', + 'Day': 'day' + }) + return df + + +def load_data_in_spark_dataframe( + spark: SparkSession) -> pyspark.sql.dataframe.DataFrame: + df = spark.read.csv(FLAGS.input_file, header=True, inferSchema=True) + return df.withColumnRenamed('VisitorId', 'visitor_id').withColumnRenamed( + 'Time entered', 'enter_time').withColumnRenamed( + 'Time spent (minutes)', 'spent_minutes').withColumnRenamed( + 'Money spent (euros)', + 'spent_money').withColumnRenamed('Day', 'day') + + +def load_data_in_beam_dataframe(pipeline): + df = pipeline | read_csv(FLAGS.input_file) + df = df.rename( + columns={ + 'VisitorId': 'visitor_id', + 'Time entered': 'enter_time', + 'Time spent (minutes)': 'spent_minutes', + 'Money spent (euros)': 'spent_money', + 'Day': 'day' + }) + return df + + +def compute_private_result(df): + dp_query_builder = dataframes.QueryBuilder(df, 'visitor_id') + query = dp_query_builder.groupby('day', 3, 1).count().sum( + 'spent_money', min_value=0, max_value=100).build_query() + result_df = query.run_query(dataframes.Budget(epsilon=5, delta=1e-10), + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN) + print(query.explain_computations()) + return result_df + + +def compute_on_pandas_dataframes() -> None: + df = load_data_in_pandas_dataframe() + result_df = compute_private_result(df) + result_df.to_csv(FLAGS.output_file) + + +def compute_on_spark_dataframes() -> None: + spark = SparkSession.builder \ + .master("local[1]") \ + .appName("SparkByExamples.com") \ + .getOrCreate() + df = load_data_in_spark_dataframe(spark) + df.printSchema() + result_df = compute_private_result(df) + result_df.printSchema() + delete_if_exists(FLAGS.output_file) + result_df.write.format("csv").option("header", True).save(FLAGS.output_file) + + +def compute_on_beam_dataframes() -> None: + with beam.Pipeline(runner=fn_api_runner.FnApiRunner()) as pipeline: + df = load_data_in_beam_dataframe(pipeline) + result_df = compute_private_result(df) + result_df.to_csv(FLAGS.output_file) + + +def main(unused_argv): + if FLAGS.dataframes == 'pandas': + compute_on_pandas_dataframes() + elif FLAGS.dataframes == 'spark': + compute_on_spark_dataframes() + elif FLAGS.dataframes == 'beam': + compute_on_beam_dataframes() + return 0 + + +if __name__ == '__main__': + flags.mark_flag_as_required("output_file") + app.run(main) diff --git a/examples/restaurant_visits/run_without_frameworks.py b/examples/restaurant_visits/run_without_frameworks.py index f2114af2..372555fa 100644 --- a/examples/restaurant_visits/run_without_frameworks.py +++ b/examples/restaurant_visits/run_without_frameworks.py @@ -14,7 +14,7 @@ """ Demo of running PipelineDP locally, without any external data processing framework 1. Install Python and run on the command line `pip install pipeline-dp absl-py` -2. Run python python run_without_frameworks.py --input_file= --output_file=<...> +2. Run python run_without_frameworks.py --input_file= --output_file=<...> """ from absl import app diff --git a/pipeline_dp/dataframes.py b/pipeline_dp/dataframes.py new file mode 100644 index 00000000..379bc55e --- /dev/null +++ b/pipeline_dp/dataframes.py @@ -0,0 +1,285 @@ +import abc +from dataclasses import dataclass + +import apache_beam.dataframe.convert +import pandas as pd + +import pipeline_dp +from typing import Optional, List, Dict, Iterable +import pyspark +import apache_beam as beam + + +@dataclass +class _Columns: + privacy_key: str + partition_key: str + value: Optional[str] + + +@dataclass +class _ContributionBounds: + max_partitions_contributed: Optional[int] = None + max_contributions_per_partition: Optional[int] = None + min_value: Optional[float] = None + max_value: Optional[float] = None + + +class DataFrameConvertor(abc.ABC): + + @abc.abstractmethod + def dataframe_to_collection(df, columns: _Columns): + pass + + @abc.abstractmethod + def collection_to_dataframe(col, partition_key_column: str): + pass + + +class PandasConverter(DataFrameConvertor): + + def dataframe_to_collection(self, df: pd.DataFrame, + columns: _Columns) -> list: + assert isinstance(df, + pd.DataFrame), "Only Pandas dataframes are supported" + columns_to_keep = [columns.privacy_key, columns.partition_key] + if columns.value is not None: + columns_to_keep.append(columns.value) + df = df[columns_to_keep] # leave only needed columns. + if columns.value is None: + # For count value is not needed, but for simplicity always provide + # value. + df['value'] = 0 + + # name=None makes that tuples instead of name tuple are returned. + return list(df.itertuples(index=False, name=None)) + + def collection_to_dataframe(self, col: Iterable, + partition_key_column: str) -> pd.DataFrame: + assert isinstance(col, Iterable), "Only local run is supported for now" + partition_keys, data = list(zip(*col)) + df = pd.DataFrame(data=data) + df[partition_key_column] = partition_keys + columns = list(df.columns) + columns = [columns[-1]] + columns[:-1] + df = df.reindex(columns=columns).set_index(partition_key_column) + return df + + +class SparkConverter(DataFrameConvertor): + + def __init__(self, spark): + self._spark = spark + + def dataframe_to_collection(self, df, columns: _Columns) -> pyspark.RDD: + columns_to_keep = [columns.privacy_key, columns.partition_key] + if columns.value is not None: + columns_to_keep.append(columns.value) + df = df[columns_to_keep] # leave only needed columns. + rdd = df.rdd.map(lambda row: (row[0], row[1], row[2])) + return rdd + + def collection_to_dataframe( + self, col: pyspark.RDD, + partition_key_column: str) -> pyspark.sql.dataframe.DataFrame: + + def convert_to_dict(row): + partition_key, metrics = row + result = {partition_key_column: partition_key} + result.update(metrics._asdict()) + return result + + col = col.map(convert_to_dict) + df = self._spark.createDataFrame(col) + return df + + +class BeamConverter(DataFrameConvertor): + + def dataframe_to_collection(self, df, columns: _Columns) -> list: + columns_to_keep = [columns.privacy_key, columns.partition_key] + if columns.value is not None: + columns_to_keep.append(columns.value) + df = df[columns_to_keep] # leave only needed columns. + # if columns.value is None: + # # For count value is not needed, but for simplicity always provide + # # value. + # df['value'] = 0 + col = beam.dataframe.convert.to_pcollection(df) + + def row_to_tuple(row): + return (row[0], row[1], row[2]) + + col = col | "Map to tuple" >> beam.Map(row_to_tuple) + col | "res" >> beam.io.WriteToText("/tmp/beam/res.txt") + return col + + def collection_to_dataframe(self, col, + partition_key_column: str) -> pd.DataFrame: + col | "res2" >> beam.io.WriteToText("/tmp/beam/res2.txt") + + def convert_to_beam_row(row): + # breakpoint() + partition_key, metrics = row + result_dict = {partition_key_column: partition_key} + result_dict.update(metrics._asdict()) + result = beam.Row(partition_key_column=partition_key, + count=metrics.count, + sum=metrics.sum) + # result = beam.Row(**result_dict) + # Beam.Select is cool thing + return result + + col = col | "To Row" >> beam.Map(convert_to_beam_row) + df = beam.dataframe.convert.to_dataframe(col) + return df.rename(columns={ + "partition_key_column": partition_key_column + }).set_index(partition_key_column) + + +def create_backend_for_dataframe( + df) -> pipeline_dp.pipeline_backend.PipelineBackend: + if isinstance(df, pd.DataFrame): + return pipeline_dp.LocalBackend() + if isinstance(df, pyspark.sql.dataframe.DataFrame): + return pipeline_dp.SparkRDDBackend(df.sparkSession.sparkContext) + if isinstance(df, beam.dataframe.frames.DeferredDataFrame): + return pipeline_dp.BeamBackend() + raise NotImplementedError( + f"Dataframes of type {type(df)} not yet supported") + + +def create_dataframe_converter(df) -> DataFrameConvertor: + if isinstance(df, pd.DataFrame): + return PandasConverter() + if isinstance(df, pyspark.sql.dataframe.DataFrame): + return SparkConverter(df.sparkSession) + if isinstance(df, beam.dataframe.frames.DeferredDataFrame): + return BeamConverter() + raise NotImplementedError( + f"Dataframes of type {type(df)} not yet supported") + + +@dataclass +class Budget: + epsilon: float + delta: float = 0 + + # TODO: validate budget. + + +class Query: + + def __init__(self, df, columns: _Columns, metrics: Dict[pipeline_dp.Metric, + List[str]], + contribution_bonds: _ContributionBounds, public_keys): + self._df = df + self._columns = columns + self._metrics = metrics + self._contribution_bonds = contribution_bonds + self._public_partitions = public_keys + self._expain_computation_report = None + + def run_query(self, + budget: Budget, + noise_kind: Optional[pipeline_dp.NoiseKind] = None): + converter = create_dataframe_converter(self._df) + col = converter.dataframe_to_collection(self._df, self._columns) + backend = create_backend_for_dataframe(self._df) + budget_accountant = pipeline_dp.NaiveBudgetAccountant( + total_epsilon=budget.epsilon, total_delta=budget.delta) + + dp_engine = pipeline_dp.DPEngine(budget_accountant, backend) + params = pipeline_dp.AggregateParams( + noise_kind=noise_kind, + metrics=self._metrics, + max_partitions_contributed=self._contribution_bonds. + max_partitions_contributed, + max_contributions_per_partition=self._contribution_bonds. + max_contributions_per_partition, + min_value=self._contribution_bonds.min_value, + max_value=self._contribution_bonds.max_value) + + data_extractors = pipeline_dp.DataExtractors( + privacy_id_extractor=lambda row: row[0], + partition_extractor=lambda row: row[1], + value_extractor=lambda row: row[2]) + + explain_computation_report = pipeline_dp.ExplainComputationReport() + + dp_result = dp_engine.aggregate( + col, + params, + data_extractors, + public_partitions=self._public_partitions, + out_explain_computation_report=explain_computation_report) + budget_accountant.compute_budgets() + self._expain_computation_report = explain_computation_report.text() + return converter.collection_to_dataframe(dp_result, + self._columns.partition_key) + + def explain_computations(self): + if self._expain_computation_report is None: + raise ValueError("Query is not run yet. Call run_query first") + return self._expain_computation_report + + +class QueryBuilder: + + def __init__(self, df, privacy_key_column: str): + self._df = df + self._privacy_key_column = privacy_key_column # todo: check df.scheme + self._groupby_column = None + self._value_column = None + self._metrics = {} + self._contribution_bounds = _ContributionBounds() + self._public_keys = None + + def groupby(self, + column: str, + max_partitions_contributed: int, + max_contributions_per_partition: int, + public_keys=None) -> 'QueryBuilder': + if self._groupby_column is not None: + raise ValueError("groupby can be called only once.") + self._groupby_column = column + self._contribution_bounds.max_partitions_contributed = max_partitions_contributed + self._contribution_bounds.max_contributions_per_partition = max_contributions_per_partition + self._public_keys = public_keys + return self + + def count(self, name: str = None) -> 'QueryBuilder': + if self._groupby_column is None: + raise ValueError( + "Global aggregations are not supported. Use groupby.") + if pipeline_dp.Metrics.COUNT in self._metrics: + raise ValueError("count can be counted only once.") + self._metrics[pipeline_dp.Metrics.COUNT] = name + return self + + def sum(self, + column: str, + min_value: float, + max_value: float, + name: str = None) -> 'QueryBuilder': + if self._groupby_column is None: + raise ValueError( + "Global aggregations are not supported. Use groupby.") + if pipeline_dp.Metrics.SUM in self._metrics: + raise ValueError("sum can be counted only once.") + self._metrics[pipeline_dp.Metrics.SUM] = name + self._value_column = column + self._contribution_bounds.min_value = min_value + self._contribution_bounds.max_value = max_value + return self + + def build_query(self) -> Query: + if self._groupby_column is None: + raise NotImplementedError( + "Global aggregations are not implemented yet.") + metrics = list(self._metrics.keys()) + return Query( + self._df, + _Columns(self._privacy_key_column, self._groupby_column, + self._value_column), metrics, self._contribution_bounds, + self._public_keys) diff --git a/tests/dp_engine_test.py b/tests/dp_engine_test.py index b7612886..656dc104 100644 --- a/tests/dp_engine_test.py +++ b/tests/dp_engine_test.py @@ -556,7 +556,8 @@ def test_aggregate_computation_graph_per_partition_bounding( unittest.mock.ANY, unittest.mock.ANY) - @patch('pipeline_dp.dp_engine.DPEngine._drop_partitions',) + @patch( + 'pipeline_dp.dp_engine.DPEngine._drop_partitions',) def test_aggregate_no_partition_filtering_public_partitions( self, mock_drop_partitions): # Arrange @@ -1093,9 +1094,9 @@ def run_e2e_private_partition_selection_large_budget(self, col, backend): return col - @unittest.skipIf( - sys.version_info.major == 3 and sys.version_info.minor <= 8, - "dp_accounting library only support python >=3.9") + @unittest.skipIf(sys.version_info.major == 3 and + sys.version_info.minor <= 8, + "dp_accounting library only support python >=3.9") @parameterized.parameters(False, True) def test_run_e2e_count_public_partition_local(self, pld_accounting): Accountant = pipeline_dp.PLDBudgetAccountant if pld_accounting else pipeline_dp.NaiveBudgetAccountant @@ -1224,9 +1225,9 @@ def test_min_max_sum_per_partition(self): self.assertLen(output, 1) self.assertAlmostEqual(output[0][1].sum, -3, delta=0.1) - @unittest.skipIf( - sys.version_info.major == 3 and sys.version_info.minor <= 8, - "dp_accounting library only support python >=3.9") + @unittest.skipIf(sys.version_info.major == 3 and + sys.version_info.minor <= 8, + "dp_accounting library only support python >=3.9") def test_pld_not_supported_metrics(self): with self.assertRaisesRegex( NotImplementedError, @@ -1240,9 +1241,9 @@ def test_pld_not_supported_metrics(self): engine.aggregate([1], aggregate_params, self._get_default_extractors(), public_partitions) - @unittest.skipIf( - sys.version_info.major == 3 and sys.version_info.minor <= 8, - "dp_accounting library only support python >=3.9") + @unittest.skipIf(sys.version_info.major == 3 and + sys.version_info.minor <= 8, + "dp_accounting library only support python >=3.9") def test_pld_not_support_private_partition_selection(self): with self.assertRaisesRegex( NotImplementedError,