diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/MaxDegree.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/MaxDegree.java new file mode 100644 index 000000000000..bfd67161719b --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/MaxDegree.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.statistics; + +import org.apache.flink.api.java.DataSet; +import org.gradoop.common.model.impl.pojo.EPGMGraphHead; +import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator; +import org.gradoop.flink.model.impl.epgm.LogicalGraph; +import org.gradoop.flink.model.impl.operators.statistics.functions.AddSumDegreesToGraphHeadCrossFunction; + +/** + * Max degree operator calculates the maximum degree of all vertices of a graph and writes it to the graph + * head as a new property named {@link MaxDegree#PROPERTY_MAX_DEGREE}. + */ +public class MaxDegree implements UnaryBaseGraphToBaseGraphOperator { + + /** + * The name of the property that holds the max degree after the calculation. + */ + public static final String PROPERTY_MAX_DEGREE = "maxDegree"; + + @Override + public LogicalGraph execute(LogicalGraph graph) { + + DataSet newGraphHead = new VertexDegrees().execute(graph) + .max(1) + .crossWithTiny(graph.getGraphHead().first(1)) + .with(new AddSumDegreesToGraphHeadCrossFunction(PROPERTY_MAX_DEGREE)); + + return graph.getFactory().fromDataSets(newGraphHead, graph.getVertices(), graph.getEdges()); + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/MinDegree.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/MinDegree.java new file mode 100644 index 000000000000..0c912ca0d230 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/MinDegree.java @@ -0,0 +1,44 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.statistics; + +import org.apache.flink.api.java.DataSet; +import org.gradoop.common.model.impl.pojo.EPGMGraphHead; +import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator; +import org.gradoop.flink.model.impl.epgm.LogicalGraph; +import org.gradoop.flink.model.impl.operators.statistics.functions.AddSumDegreesToGraphHeadCrossFunction; + +/** + * Min degree operator calculates the minimum degree of all vertices of a graph and writes it to the graph + * head as a new property named {@link MinDegree#PROPERTY_MIN_DEGREE}. + */ +public class MinDegree implements UnaryBaseGraphToBaseGraphOperator { + + /** + * The name of the property that holds the min degree after the calculation. + */ + public static final String PROPERTY_MIN_DEGREE = "minDegree"; + + @Override + public LogicalGraph execute(LogicalGraph graph) { + DataSet newGraphHead = new VertexDegrees().execute(graph) + .min(1) + .crossWithTiny(graph.getGraphHead().first(1)) + .with(new AddSumDegreesToGraphHeadCrossFunction(PROPERTY_MIN_DEGREE)); + + return graph.getFactory().fromDataSets(newGraphHead, graph.getVertices(), graph.getEdges()); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageClusteringCoefficientTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageClusteringCoefficientTest.java similarity index 96% rename from gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageClusteringCoefficientTest.java rename to gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageClusteringCoefficientTest.java index 919d5b90562b..304d886faedd 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageClusteringCoefficientTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageClusteringCoefficientTest.java @@ -13,12 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.gradoop.flink.model.impl.operators.sampling.statistics; +package org.gradoop.flink.model.impl.operators.statistics; import org.gradoop.common.model.impl.pojo.EPGMGraphHead; import org.gradoop.flink.model.GradoopFlinkTestBase; import org.gradoop.flink.model.impl.epgm.LogicalGraph; -import org.gradoop.flink.model.impl.operators.statistics.AverageClusteringCoefficient; import org.gradoop.flink.util.FlinkAsciiGraphLoader; import org.junit.Before; import org.junit.Test; diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageDegreeTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageDegreeTest.java similarity index 92% rename from gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageDegreeTest.java rename to gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageDegreeTest.java index 77e546eaf7d3..fad6839a68f1 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageDegreeTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageDegreeTest.java @@ -13,11 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.gradoop.flink.model.impl.operators.sampling.statistics; +package org.gradoop.flink.model.impl.operators.statistics; import org.gradoop.flink.model.GradoopFlinkTestBase; import org.gradoop.flink.model.impl.epgm.LogicalGraph; -import org.gradoop.flink.model.impl.operators.statistics.AverageDegree; import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants; import org.junit.Test; diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageIncomingDegreeTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageIncomingDegreeTest.java similarity index 92% rename from gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageIncomingDegreeTest.java rename to gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageIncomingDegreeTest.java index 1a798e3c897e..68470a32a7e9 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageIncomingDegreeTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageIncomingDegreeTest.java @@ -13,11 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.gradoop.flink.model.impl.operators.sampling.statistics; +package org.gradoop.flink.model.impl.operators.statistics; import org.gradoop.flink.model.GradoopFlinkTestBase; import org.gradoop.flink.model.impl.epgm.LogicalGraph; -import org.gradoop.flink.model.impl.operators.statistics.AverageIncomingDegree; import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants; import org.junit.Test; diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageOutgoingDegreeTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageOutgoingDegreeTest.java similarity index 92% rename from gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageOutgoingDegreeTest.java rename to gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageOutgoingDegreeTest.java index 5e2c41487eab..d6cb949da592 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageOutgoingDegreeTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/AverageOutgoingDegreeTest.java @@ -13,11 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.gradoop.flink.model.impl.operators.sampling.statistics; +package org.gradoop.flink.model.impl.operators.statistics; import org.gradoop.flink.model.GradoopFlinkTestBase; import org.gradoop.flink.model.impl.epgm.LogicalGraph; -import org.gradoop.flink.model.impl.operators.statistics.AverageOutgoingDegree; import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants; import org.junit.Test; diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/ConnectedComponentsDistributionTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/ConnectedComponentsDistributionTest.java similarity index 97% rename from gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/ConnectedComponentsDistributionTest.java rename to gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/ConnectedComponentsDistributionTest.java index e8ec8ab4e303..4cb9b03b40fb 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/ConnectedComponentsDistributionTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/ConnectedComponentsDistributionTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.gradoop.flink.model.impl.operators.sampling.statistics; +package org.gradoop.flink.model.impl.operators.statistics; import com.google.common.collect.Lists; import org.apache.flink.api.java.DataSet; @@ -21,7 +21,6 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.gradoop.flink.model.GradoopFlinkTestBase; import org.gradoop.flink.model.impl.epgm.LogicalGraph; -import org.gradoop.flink.model.impl.operators.statistics.ConnectedComponentsDistribution; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/DegreeCentralityTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/DegreeCentralityTest.java similarity index 95% rename from gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/DegreeCentralityTest.java rename to gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/DegreeCentralityTest.java index 83169280b710..33bd5f28540e 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/DegreeCentralityTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/DegreeCentralityTest.java @@ -13,12 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.gradoop.flink.model.impl.operators.sampling.statistics; +package org.gradoop.flink.model.impl.operators.statistics; import org.apache.flink.api.java.DataSet; import org.gradoop.flink.model.GradoopFlinkTestBase; import org.gradoop.flink.model.impl.epgm.LogicalGraph; -import org.gradoop.flink.model.impl.operators.statistics.DegreeCentrality; import org.junit.Test; import static org.junit.Assert.assertEquals; diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/GraphDensityTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/GraphDensityTest.java similarity index 88% rename from gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/GraphDensityTest.java rename to gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/GraphDensityTest.java index 708ed8185d48..7777fedcf710 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/GraphDensityTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/GraphDensityTest.java @@ -13,14 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.gradoop.flink.model.impl.operators.sampling.statistics; +package org.gradoop.flink.model.impl.operators.statistics; import org.gradoop.flink.model.GradoopFlinkTestBase; import org.gradoop.flink.model.impl.epgm.LogicalGraph; -import org.gradoop.flink.model.impl.operators.statistics.GraphDensity; import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -46,6 +46,6 @@ public void testGraphDensity() throws Exception { // density should not be 0 assertTrue("Graph density is 0", density > 0.); // density for social network graph should be (24 / 11 * 10) = 0.21818... - assertTrue("Computed graph density is incorrect", density == (24d / 110d)); + assertEquals("Computed graph density is incorrect", 24d / 110d, density, 0.0); } } diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/MaxDegreeTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/MaxDegreeTest.java new file mode 100644 index 000000000000..2c468f98ebf4 --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/MaxDegreeTest.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.statistics; + +import org.gradoop.flink.model.GradoopFlinkTestBase; +import org.gradoop.flink.model.impl.epgm.LogicalGraph; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Integration test class of {@link MaxDegree}. + */ +public class MaxDegreeTest extends GradoopFlinkTestBase { + + /** + * Tests the computation of the maximum degree of a logical graph. + * + * @throws Exception If loading of the example-graph fails + */ + @Test + public void testMaxDegree() throws Exception { + LogicalGraph graph = getSocialNetworkLoader().getLogicalGraph(); + + long maxDegree = graph.callForGraph(new MaxDegree()) + .getGraphHead() + .collect() + .get(0) + .getPropertyValue(MaxDegree.PROPERTY_MAX_DEGREE).getLong(); + + assertEquals(6L, maxDegree); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/MinDegreeTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/MinDegreeTest.java new file mode 100644 index 000000000000..ff5238e7db5a --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/MinDegreeTest.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.statistics; + +import org.gradoop.flink.model.GradoopFlinkTestBase; +import org.gradoop.flink.model.impl.epgm.LogicalGraph; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Integration test class of {@link MinDegree}. + */ +public class MinDegreeTest extends GradoopFlinkTestBase { + + /** + * Tests the computation of the minimum degree of a logical graph. + * + * @throws Exception If loading of the example-graph fails + */ + @Test + public void testMinDegree() throws Exception { + LogicalGraph graph = getSocialNetworkLoader().getLogicalGraph(); + + long minDegree = graph.callForGraph(new MinDegree()) + .getGraphHead() + .collect() + .get(0) + .getPropertyValue(MinDegree.PROPERTY_MIN_DEGREE).getLong(); + + assertEquals(2L, minDegree); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/ValueConnectedComponentsTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/ValueConnectedComponentsTest.java similarity index 95% rename from gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/ValueConnectedComponentsTest.java rename to gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/ValueConnectedComponentsTest.java index 74ddf8a150ad..1ff6309f3943 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/ValueConnectedComponentsTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/statistics/ValueConnectedComponentsTest.java @@ -13,14 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.gradoop.flink.model.impl.operators.sampling.statistics; +package org.gradoop.flink.model.impl.operators.statistics; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.gradoop.flink.model.GradoopFlinkTestBase; import org.gradoop.flink.model.impl.epgm.LogicalGraph; -import org.gradoop.flink.model.impl.operators.statistics.ConnectedComponentsDistributionAsValues; import org.gradoop.flink.util.FlinkAsciiGraphLoader; import org.junit.Test; diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolution.java new file mode 100644 index 000000000000..722e6cb699ba --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolution.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator; +import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; +import org.gradoop.temporal.model.api.TimeDimension; +import org.gradoop.temporal.model.impl.TemporalGraph; +import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType; +import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree; +import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; +import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees; +import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; + +import java.util.Objects; + +/** + * Operator that calculates the average degree evolution of all vertices of a temporal graph for the + * whole lifetime of the graph. The average value is rounded up to the next integer. + */ +public class AvgDegreeEvolution + implements UnaryBaseGraphToValueOperator>> { + /** + * The time dimension that will be considered. + */ + private final TimeDimension dimension; + + /** + * The degree type (IN, OUT, BOTH); + */ + private final VertexDegree degreeType; + + /** + * Creates an instance of this average degree evolution operator. + * + * @param degreeType the degree type to use (IN, OUT, BOTH). + * @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME). + */ + public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) { + this.degreeType = Objects.requireNonNull(degreeType); + this.dimension = Objects.requireNonNull(dimension); + } + + @Override + public DataSet> execute(TemporalGraph graph) { + return graph.getEdges() + // 1) Extract vertex id(s) and corresponding time intervals + .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) + // 2) Group them by the vertex id + .groupBy(0) + // 3) For each vertex id, build a degree tree data structure + .reduceGroup(new BuildTemporalDegreeTree()) + // 4) Transform each tree to aggregated evolution + .map(new TransformDeltaToAbsoluteDegreeTree()) + // 5) Merge trees together and calculate aggregation + .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.AVG)); + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/MaxDegreeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/MaxDegreeEvolution.java new file mode 100644 index 000000000000..ed17add7d87b --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/MaxDegreeEvolution.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator; +import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; +import org.gradoop.temporal.model.api.TimeDimension; +import org.gradoop.temporal.model.impl.TemporalGraph; +import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType; +import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree; +import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; +import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees; +import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; + +import java.util.Objects; + +/** + * Operator that calculates the maximum degree evolution of all vertices of a temporal graph for the + * whole lifetime of the graph. + */ +public class MaxDegreeEvolution + implements UnaryBaseGraphToValueOperator>> { + /** + * The time dimension that will be considered. + */ + private final TimeDimension dimension; + + /** + * The degree type (IN, OUT, BOTH); + */ + private final VertexDegree degreeType; + + /** + * Creates an instance of this maximum degree evolution operator. + * + * @param degreeType the degree type to use (IN, OUT, BOTH). + * @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME). + */ + public MaxDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) { + this.degreeType = Objects.requireNonNull(degreeType); + this.dimension = Objects.requireNonNull(dimension); + } + + @Override + public DataSet> execute(TemporalGraph graph) { + return graph.getEdges() + // 1) Extract vertex id(s) and corresponding time intervals + .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) + // 2) Group them by the vertex id + .groupBy(0) + // 3) For each vertex id, build a degree tree data structure + .reduceGroup(new BuildTemporalDegreeTree()) + // 4) Transform each tree to aggregated evolution + .map(new TransformDeltaToAbsoluteDegreeTree()) + // 5) Merge trees together and calculate aggregation + .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.MAX)); + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/MinDegreeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/MinDegreeEvolution.java new file mode 100644 index 000000000000..53683575f895 --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/MinDegreeEvolution.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator; +import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; +import org.gradoop.temporal.model.api.TimeDimension; +import org.gradoop.temporal.model.impl.TemporalGraph; +import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType; +import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree; +import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; +import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees; +import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; + +import java.util.Objects; + +/** + * Operator that calculates the minimum degree evolution of all vertices of a temporal graph for the + * whole lifetime of the graph. + */ +public class MinDegreeEvolution + implements UnaryBaseGraphToValueOperator>> { + /** + * The time dimension that will be considered. + */ + private final TimeDimension dimension; + + /** + * The degree type (IN, OUT, BOTH); + */ + private final VertexDegree degreeType; + + /** + * Creates an instance of this minimum degree evolution operator. + * + * @param degreeType the degree type to use (IN, OUT, BOTH). + * @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME). + */ + public MinDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) { + this.degreeType = Objects.requireNonNull(degreeType); + this.dimension = Objects.requireNonNull(dimension); + } + + @Override + public DataSet> execute(TemporalGraph graph) { + return graph.getEdges() + // 1) Extract vertex id(s) and corresponding time intervals + .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) + // 2) Group them by the vertex id + .groupBy(0) + // 3) For each vertex id, build a degree tree data structure + .reduceGroup(new BuildTemporalDegreeTree()) + // 4) Transform each tree to aggregated evolution + .map(new TransformDeltaToAbsoluteDegreeTree()) + // 5) Merge trees together and calculate aggregation + .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.MIN)); + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java new file mode 100644 index 000000000000..803758d747fe --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +/** + * Enum for defining an aggregate type. + */ +public enum AggregateType { + /** + * Minimum aggregation. + */ + MIN, + /** + * Maximum aggregation. + */ + MAX, + /** + * Average aggregation. + */ + AVG +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java new file mode 100644 index 000000000000..c6bcad9ef6e9 --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java @@ -0,0 +1,115 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.gradoop.common.model.impl.id.GradoopId; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree) + * that represents the aggregated degree value for the whole graph at the given time. + */ +public class GroupDegreeTreesToAggregateDegrees + implements GroupReduceFunction>, Tuple2> { + + /** + * The aggregate type to use (min,max,avg). + */ + private final AggregateType aggregateType; + + /** + * Creates an instance of this group reduce function. + * + * @param aggregateType the aggregate type to use (min,max,avg). + */ + public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType) { + this.aggregateType = aggregateType; + } + + @Override + public void reduce(Iterable>> iterable, + Collector> collector) throws Exception { + + // init necessary maps and set + HashMap> degreeTrees = new HashMap<>(); + HashMap vertexDegrees = new HashMap<>(); + SortedSet timePoints = new TreeSet<>(); + + // convert the iterables to a hashmap and remember all possible timestamps + for (Tuple2> tuple : iterable) { + degreeTrees.put(tuple.f0, tuple.f1); + timePoints.addAll(tuple.f1.keySet()); + } + + int numberOfVertices = degreeTrees.size(); + + // Add default times + timePoints.add(Long.MIN_VALUE); + + for (Long timePoint : timePoints) { + // skip last default time + if (Long.MAX_VALUE == timePoint) { + continue; + } + // Iterate over all vertices + for (Map.Entry> entry : degreeTrees.entrySet()) { + // Make sure the vertex is registered in the current vertexDegrees capture + if (!vertexDegrees.containsKey(entry.getKey())) { + vertexDegrees.put(entry.getKey(), 0); + } + + // Check if timestamp is in tree, if not, take the lower key + if (entry.getValue().containsKey(timePoint)) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint)); + } else { + Long lowerKey = entry.getValue().lowerKey(timePoint); + if (lowerKey != null) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey)); + } + } + } + + // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. + Optional opt; + switch (aggregateType) { + case MIN: + opt = vertexDegrees.values().stream().reduce(Math::min); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + case MAX: + opt = vertexDegrees.values().stream().reduce(Math::max); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + case AVG: + opt = vertexDegrees.values().stream().reduce(Math::addExact); + opt.ifPresent(integer -> collector.collect( + new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices)))); + break; + default: + throw new IllegalArgumentException("Aggregate type not specified."); + } + } + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java new file mode 100644 index 000000000000..9292044ce16e --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Replaces the degree tree, that just stores the degree changes for each time, with a degree tree that + * stores the actual degree of the vertex at that time. + */ +@FunctionAnnotation.ForwardedFields("f0") +public class TransformDeltaToAbsoluteDegreeTree + implements MapFunction>, + Tuple2>> { + + /** + * To reduce object instantiations. + */ + private TreeMap absoluteDegreeTree; + + @Override + public Tuple2> map( + Tuple2> vIdTreeMapTuple) throws Exception { + // init the degree and the temporal tree + int degree = 0; + absoluteDegreeTree = new TreeMap<>(); + + // aggregate the degrees + for (Map.Entry entry : vIdTreeMapTuple.f1.entrySet()) { + degree += entry.getValue(); + absoluteDegreeTree.put(entry.getKey(), degree); + } + vIdTreeMapTuple.f1 = absoluteDegreeTree; + return vIdTreeMapTuple; + } +} diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolutionTest.java new file mode 100644 index 000000000000..3877bb55d3fb --- /dev/null +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolutionTest.java @@ -0,0 +1,144 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric; + +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; +import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; +import org.gradoop.temporal.model.api.TimeDimension; +import org.gradoop.temporal.model.impl.TemporalGraph; +import org.gradoop.temporal.util.TemporalGradoopTestBase; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class AvgDegreeEvolutionTest extends TemporalGradoopTestBase { + /** + * The expected in-degrees for each vertex label. + */ + private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); + /** + * The expected out-degrees for each vertex label. + */ + private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); + /** + * The expected degrees for each vertex label. + */ + private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); + + static { + // IN DEGREES + EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); + + // OUT DEGREES + EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); + + // DEGREES + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); + } + + /** + * The degree type to test. + */ + @Parameterized.Parameter(0) + public VertexDegree degreeType; + + /** + * The expected degree evolution fo the given type. + */ + @Parameterized.Parameter(1) + public List> expectedDegrees; + + /** + * The temporal graph to test the operator. + */ + TemporalGraph testGraph; + + /** + * The parameters to test the operator. + * + * @return three different vertex degree types with its corresponding expected degree evolution. + */ + @Parameterized.Parameters(name = "Test degree type {0}.") + public static Iterable parameters() { + return Arrays.asList( + new Object[] {VertexDegree.IN, EXPECTED_IN_DEGREES}, + new Object[] {VertexDegree.OUT, EXPECTED_OUT_DEGREES}, + new Object[] {VertexDegree.BOTH, EXPECTED_BOTH_DEGREES}); + } + + /** + * Set up the test graph an create the id-label mapping. + * + * @throws Exception in case of an error + */ + @Before + public void setUp() throws Exception { + testGraph = getTestGraphWithValues(); + Collection> idLabelCollection = new HashSet<>(); + testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel())) + .returns(new TypeHint>() { + }).output(new LocalCollectionOutputFormat<>(idLabelCollection)); + getExecutionEnvironment().execute(); + } + + /** + * Test the avg degree evolution operator. + * + * @throws Exception in case of an error. + */ + @Test + public void testAvgDegree() throws Exception { + Collection> resultCollection = new ArrayList<>(); + + final DataSet> resultDataSet = testGraph + .callForValue(new AvgDegreeEvolution(degreeType, TimeDimension.VALID_TIME)); + + resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); + getExecutionEnvironment().execute(); + + assertTrue(resultCollection.containsAll(expectedDegrees)); + assertTrue(expectedDegrees.containsAll(resultCollection)); + } +} diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/MaxDegreeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/MaxDegreeEvolutionTest.java new file mode 100644 index 000000000000..9373649e89cd --- /dev/null +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/MaxDegreeEvolutionTest.java @@ -0,0 +1,144 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric; + +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; +import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; +import org.gradoop.temporal.model.api.TimeDimension; +import org.gradoop.temporal.model.impl.TemporalGraph; +import org.gradoop.temporal.util.TemporalGradoopTestBase; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class MaxDegreeEvolutionTest extends TemporalGradoopTestBase { + /** + * The expected in-degrees for each vertex label. + */ + private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); + /** + * The expected out-degrees for each vertex label. + */ + private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); + /** + * The expected degrees for each vertex label. + */ + private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); + + static { + // IN DEGREES + EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); + + // OUT DEGREES + EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); + + // DEGREES + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 3)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); + } + + /** + * The degree type to test. + */ + @Parameterized.Parameter(0) + public VertexDegree degreeType; + + /** + * The expected degree evolution fo the given type. + */ + @Parameterized.Parameter(1) + public List> expectedDegrees; + + /** + * The temporal graph to test the operator. + */ + TemporalGraph testGraph; + + /** + * The parameters to test the operator. + * + * @return three different vertex degree types with its corresponding expected degree evolution. + */ + @Parameterized.Parameters(name = "Test degree type {0}.") + public static Iterable parameters() { + return Arrays.asList( + new Object[] {VertexDegree.IN, EXPECTED_IN_DEGREES}, + new Object[] {VertexDegree.OUT, EXPECTED_OUT_DEGREES}, + new Object[] {VertexDegree.BOTH, EXPECTED_BOTH_DEGREES}); + } + + /** + * Set up the test graph an create the id-label mapping. + * + * @throws Exception in case of an error + */ + @Before + public void setUp() throws Exception { + testGraph = getTestGraphWithValues(); + Collection> idLabelCollection = new HashSet<>(); + testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel())) + .returns(new TypeHint>() { + }).output(new LocalCollectionOutputFormat<>(idLabelCollection)); + getExecutionEnvironment().execute(); + } + + /** + * Test the max degree evolution operator. + * + * @throws Exception in case of an error. + */ + @Test + public void testMaxDegree() throws Exception { + Collection> resultCollection = new ArrayList<>(); + + final DataSet> resultDataSet = testGraph + .callForValue(new MaxDegreeEvolution(degreeType, TimeDimension.VALID_TIME)); + + resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); + getExecutionEnvironment().execute(); + + assertTrue(resultCollection.containsAll(expectedDegrees)); + assertTrue(expectedDegrees.containsAll(resultCollection)); + } +} diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/MinDegreeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/MinDegreeEvolutionTest.java new file mode 100644 index 000000000000..56e36bf2cb05 --- /dev/null +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/MinDegreeEvolutionTest.java @@ -0,0 +1,144 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric; + +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; +import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; +import org.gradoop.temporal.model.api.TimeDimension; +import org.gradoop.temporal.model.impl.TemporalGraph; +import org.gradoop.temporal.util.TemporalGradoopTestBase; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class MinDegreeEvolutionTest extends TemporalGradoopTestBase { + /** + * The expected in-degrees for each vertex label. + */ + private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); + /** + * The expected out-degrees for each vertex label. + */ + private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); + /** + * The expected degrees for each vertex label. + */ + private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); + + static { + // IN DEGREES + EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 0)); + + // OUT DEGREES + EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 0)); + + // DEGREES + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 0)); + } + + /** + * The degree type to test. + */ + @Parameterized.Parameter(0) + public VertexDegree degreeType; + + /** + * The expected degree evolution fo the given type. + */ + @Parameterized.Parameter(1) + public List> expectedDegrees; + + /** + * The temporal graph to test the operator. + */ + TemporalGraph testGraph; + + /** + * The parameters to test the operator. + * + * @return three different vertex degree types with its corresponding expected degree evolution. + */ + @Parameterized.Parameters(name = "Test degree type {0}.") + public static Iterable parameters() { + return Arrays.asList( + new Object[] {VertexDegree.IN, EXPECTED_IN_DEGREES}, + new Object[] {VertexDegree.OUT, EXPECTED_OUT_DEGREES}, + new Object[] {VertexDegree.BOTH, EXPECTED_BOTH_DEGREES}); + } + + /** + * Set up the test graph an create the id-label mapping. + * + * @throws Exception in case of an error + */ + @Before + public void setUp() throws Exception { + testGraph = getTestGraphWithValues(); + Collection> idLabelCollection = new HashSet<>(); + testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel())) + .returns(new TypeHint>() { + }).output(new LocalCollectionOutputFormat<>(idLabelCollection)); + getExecutionEnvironment().execute(); + } + + /** + * Test the min degree evolution operator. + * + * @throws Exception in case of an error. + */ + @Test + public void testMinDegree() throws Exception { + Collection> resultCollection = new ArrayList<>(); + + final DataSet> resultDataSet = testGraph + .callForValue(new MinDegreeEvolution(degreeType, TimeDimension.VALID_TIME)); + + resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); + getExecutionEnvironment().execute(); + + assertTrue(resultCollection.containsAll(expectedDegrees)); + assertTrue(expectedDegrees.containsAll(resultCollection)); + } +}