diff --git a/pipeline_dp/private_beam.py b/pipeline_dp/private_beam.py index 0433273c..b961f945 100644 --- a/pipeline_dp/private_beam.py +++ b/pipeline_dp/private_beam.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import abc +import collections import dataclasses import typing from apache_beam.transforms import ptransform @@ -481,3 +482,96 @@ def expand(self, pcol: pvalue.PCollection): # dp_result : (partition_key, result) return dp_result + + +# Cache for namedtuple types. It is should be used only in +# '_get_or_create_named_tuple()' function. +_agg_named_tuple_cache = {} + + +def _get_or_create_named_tuple(type_name: str, + field_names: tuple) -> 'MetricsTuple': + """Creates namedtuple type with a custom serializer.""" + + # The custom serializer is required for supporting serialization of + # namedtuples in Apache Beam. + cache_key = (type_name, field_names) + named_tuple = _agg_named_tuple_cache.get(cache_key) + if named_tuple is None: + named_tuple = collections.namedtuple(type_name, field_names) + named_tuple.__reduce__ = lambda self: (_create_named_tuple_instance, + (type_name, field_names, + tuple(self))) + _agg_named_tuple_cache[cache_key] = named_tuple + return named_tuple + + +def _create_named_tuple_instance(type_name: str, field_names: tuple, values): + return _get_or_create_named_tuple(type_name, field_names)(*values) + + +class Aggregate(PrivatePTransform): + """Transform class for performing multiple aggregations on a PrivatePCollection.""" + + def __init__(self, label=None): + super().__init__(return_anonymized=True, label=label) + + def aggregate_value(self, *args, col_name: str, + agg_type: pipeline_dp.Metrics): + """Returns _Aggregate transform corresponding to the agg_type + + Args: + args: args for Aggregate Transforms like SumParams.) + col_name: name of the column for the resulting aggregate value. + agg_type: type of pipeline_dp.Metrics identifying the aggregate + to calculate.""" + return _Aggregate([args], col_name=[col_name], agg_type=[agg_type]) + + +class _Aggregate(PrivatePTransform): + + def __init__(self, + *args, + col_name: str, + agg_type: pipeline_dp.Metrics, + label: Optional[str] = None): + super().__init__(return_anonymized=True, label=label) + self.args = args + self.col_name = col_name + self.agg_type = agg_type + + def aggregate_value(self, *args, col_name: str, + agg_type: pipeline_dp.Metrics): + return _Aggregate(list(*self.args) + [args], + col_name=list(self.col_name) + [col_name], + agg_type=list(self.agg_type) + [agg_type]) + + def expand(self, pcol: pvalue.PCollection): + columns = { + self.col_name[i]: pcol | "agg " + str(i) >> self._getTransform( + self.agg_type[i], *self.args[0][i]) + for i in range(len(self.col_name)) + } + return columns | 'LeftJoiner: Combine' >> beam.CoGroupByKey( + ) | beam.Map(lambda x: _create_named_tuple_instance( + 'AggregatesTuple', tuple(["pid"] + [k for k in x[1]]), + tuple([x[0]] + [x[1][k][0] for k in x[1]]))) + + def _getTransform(self, agg_type: pipeline_dp.Metrics, *args): + """Gets the correct transform corresponding to agg_type.""" + transform = None + if agg_type == pipeline_dp.Metrics.MEAN: + transform = Mean(*args) + elif agg_type == pipeline_dp.Metrics.SUM: + transform = Sum(*args) + elif agg_type == pipeline_dp.Metrics.COUNT: + transform = Count(*args) + elif agg_type == pipeline_dp.Metrics.PRIVACY_ID_COUNT: + transform = PrivacyIdCount(*args) + else: + raise NotImplementedError( + "Transform for agg_type: %s is not " + "implemented.", agg_type) + transform.set_additional_parameters( + budget_accountant=self._budget_accountant) + return transform diff --git a/tests/private_beam_test.py b/tests/private_beam_test.py index 00cdbc1b..6812a807 100644 --- a/tests/private_beam_test.py +++ b/tests/private_beam_test.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import collections import unittest import apache_beam as beam from apache_beam.runners.portability import fn_api_runner @@ -41,6 +42,14 @@ def value_per_key_within_tolerance(expected, actual, tolerance): return actual[0] == expected[0] and abs(actual[1] - expected[1]) <= tolerance + @staticmethod + def value_per_key_within_tolerance_named_tuple(expected, actual, tolerance): + expected_dict = expected._asdict() + actual_dict = actual._asdict() + return all([(actual_dict[k] == expected_dict[k]) or + (abs(actual_dict[k] - expected_dict[k]) <= tolerance) + for k in actual_dict]) + def test_make_private_transform_succeeds(self): runner = fn_api_runner.FnApiRunner() with beam.Pipeline(runner=runner) as pipeline: @@ -769,6 +778,65 @@ def test_combine_per_returns_sensible_result(self): equals_fn=lambda e, a: PrivateBeamTest. value_per_key_within_tolerance(e, a, 10.0))) + def test_multiple_aggregates(self): + with TestPipeline() as pipeline: + # Arrange + col = [(u, "pk1", 100) for u in range(30)] + col += [(f"{u + 20}", "pk2", 100) for u in range(30)] + col += [(f"{u + 30}", "pk1", -100.0) for u in range(30)] + pcol = pipeline | 'Create produce' >> beam.Create(col) + # Use very high epsilon and delta to minimize noise and test + # flakiness. + budget_accountant = budget_accounting.NaiveBudgetAccountant( + total_epsilon=800, total_delta=0.999) + private_collection = ( + pcol | 'Create private collection' >> private_beam.MakePrivate( + budget_accountant=budget_accountant, + privacy_id_extractor=lambda x: x[0])) + + privacy_id_count_params = aggregate_params.PrivacyIdCountParams( + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + max_partitions_contributed=2, + budget_weight=1, + partition_extractor=lambda x: x[1]) + sum_params = aggregate_params.SumParams( + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + max_partitions_contributed=2, + max_contributions_per_partition=3, + min_value=1.55, + max_value=2.7889, + budget_weight=1, + partition_extractor=lambda x: x[1], + value_extractor=lambda x: x[2]) + + # Act + result = private_collection | private_beam.Aggregate( + ).aggregate_value( + privacy_id_count_params, + col_name='privacy_id_count', + agg_type=pipeline_dp.Metrics.PRIVACY_ID_COUNT).aggregate_value( + sum_params, + col_name='sum', + agg_type=pipeline_dp.Metrics.SUM) + budget_accountant.compute_budgets() + + # Assert + # This is a health check to validate that the result is sensible. + # Hence, we use a very large tolerance to reduce test flakiness. + beam_util.assert_that( + result, + beam_util.equal_to( + [ + collections.namedtuple( + "AggregatesTuple", + ['pid', 'privacy_id_count', 'sum'])('pk1', 60, 130), + collections.namedtuple( + "AggregatesTuple", + ['pid', 'privacy_id_count', 'sum'])('pk2', 30, 83) + ], + equals_fn=lambda e, a: PrivateBeamTest. + value_per_key_within_tolerance_named_tuple(e, a, 10))) + class SumCombineFn(private_beam.PrivateCombineFn): """Test-only, not private combine_fn."""