From 47c0847b7bd56e4f17a107ce8255223fc6908fa2 Mon Sep 17 00:00:00 2001 From: Chris310005 Date: Tue, 16 Jan 2024 13:49:26 +0100 Subject: [PATCH] [dbs-leipzig#1570] Added interval calculation and changed tests to check for intervals, changed result values to float --- .../metric/DegreeRangeEvolution.java | 14 ++-- .../GroupDegreeTreesToDegreeRange.java | 12 ++-- .../functions/MapDegreesToInterval.java | 67 +++++++++++++++++++ .../metric/DegreeRangeEvolutionTest.java | 40 +++++++++-- 4 files changed, 112 insertions(+), 21 deletions(-) create mode 100644 gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/MapDegreesToInterval.java diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java index 4912df020b3e..1c0db95f5211 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java @@ -16,15 +16,12 @@ package org.gradoop.temporal.model.impl.operators.metric; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator; import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; import org.gradoop.temporal.model.api.TimeDimension; import org.gradoop.temporal.model.impl.TemporalGraph; -import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToDegreeRange; -import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; -import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree; -import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; +import org.gradoop.temporal.model.impl.operators.metric.functions.*; import java.util.Objects; @@ -32,7 +29,7 @@ * Operator that calculates the degree range evolution of a temporal graph for the * whole lifetime of the graph. */ -public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator>> { +public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator>> { /** * The time dimension that will be considered. */ @@ -55,7 +52,7 @@ public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) { } @Override - public DataSet> execute(TemporalGraph graph) { + public DataSet> execute(TemporalGraph graph) { return graph.getEdges() // 1) Extract vertex id(s) and corresponding time intervals .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) @@ -65,6 +62,7 @@ public DataSet> execute(TemporalGraph graph) { .reduceGroup(new BuildTemporalDegreeTree()) // 4) Transform each tree to aggregated evolution .map(new TransformDeltaToAbsoluteDegreeTree()) - .reduceGroup(new GroupDegreeTreesToDegreeRange()); + .reduceGroup(new GroupDegreeTreesToDegreeRange()) + .mapPartition(new MapDegreesToInterval()); } } \ No newline at end of file diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java index 3aed673d9a34..60287837fe13 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java @@ -31,7 +31,7 @@ * that represents the aggregated degree value for the whole graph at the given time. */ public class GroupDegreeTreesToDegreeRange - implements GroupReduceFunction>, Tuple2> { + implements GroupReduceFunction>, Tuple2> { /** * Creates an instance of this group reduce function. @@ -43,7 +43,7 @@ public GroupDegreeTreesToDegreeRange() { @Override public void reduce(Iterable>> iterable, - Collector> collector) throws Exception { + Collector> collector) throws Exception { // init necessary maps and set HashMap> degreeTrees = new HashMap<>(); @@ -61,9 +61,9 @@ public void reduce(Iterable>> iterable, for (Long timePoint : timePoints) { // skip last default time - if (Long.MAX_VALUE == timePoint) { + /*if (Long.MAX_VALUE == timePoint) { continue; - } + }*/ // Iterate over all vertices for (Map.Entry> entry : degreeTrees.entrySet()) { // Make sure the vertex is registered in the current vertexDegrees capture @@ -83,8 +83,8 @@ public void reduce(Iterable>> iterable, } // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. - int maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0); - int minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0); + float maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0).floatValue(); + float minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0).floatValue(); collector.collect(new Tuple2<>(timePoint, maxDegree - minDegree)); } } diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/MapDegreesToInterval.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/MapDegreesToInterval.java new file mode 100644 index 000000000000..63ffde1b44d1 --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/MapDegreesToInterval.java @@ -0,0 +1,67 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.Collector; + +/** + * A map partition function that calculates the intervals in which the values stay the same + * by checking if the current value is the same as the previous one + * + */ + +public class MapDegreesToInterval implements MapPartitionFunction, Tuple3> { + @Override + public void mapPartition(Iterable> values, Collector> out) { + + //set starting values to null + Long startTimestamp = null; + Long endTimestamp = null; + Float value = null; + Boolean collected = false; + + //loop through each tuple + for (Tuple2 tuple : values) { + if (startTimestamp == null) { + // First element in the group + startTimestamp = tuple.f0; + endTimestamp = tuple.f0; + value = tuple.f1; + } else { + if (!tuple.f1.equals(value)) { + // Value changed, emit the current interval and start a new one + out.collect(new Tuple3<>(startTimestamp, tuple.f0, value)); + startTimestamp = tuple.f0; + endTimestamp = tuple.f0; + value = tuple.f1; + collected = true; + } else { + // Extend the current interval + endTimestamp = tuple.f0; + collected = false; + } + } + } + //check if the latest interval was collected, if not, collect it + //this happens when the last interval has the value 0 + if (!collected) { + out.collect(new Tuple3<>(startTimestamp, endTimestamp, value)); + } + } +} \ No newline at end of file diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java index a4da00e16de6..5d9c60e63cb4 100644 --- a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java @@ -19,6 +19,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.gradoop.common.model.impl.id.GradoopId; import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; import org.gradoop.temporal.model.api.TimeDimension; @@ -42,42 +43,67 @@ public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase { /** * The expected in-degrees for each vertex label. */ - private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); + private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); /** * The expected out-degrees for each vertex label. */ - private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); + private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); /** * The expected degrees for each vertex label. */ - private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); + private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); static { + + // IN DEGREES + EXPECTED_IN_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); + EXPECTED_IN_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); + EXPECTED_IN_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); + EXPECTED_IN_DEGREES.add(new Tuple3<>(5L, Long.MAX_VALUE, 1f)); + + /* EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); + */ // OUT DEGREES + EXPECTED_OUT_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); + EXPECTED_OUT_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); + EXPECTED_OUT_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); + EXPECTED_OUT_DEGREES.add(new Tuple3<>(5L, Long.MAX_VALUE, 1f)); + + /* EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2)); EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); - + */ // DEGREES + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(5L, 6L, 1f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(6L, 7L, 2f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(7L, Long.MAX_VALUE, 1f)); + + /* EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); + */ } + /** * The degree type to test. */ @@ -88,7 +114,7 @@ public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase { * The expected degree range evolution for the given type. */ @Parameterized.Parameter(1) - public List> expectedDegrees; + public List> expectedDegrees; /** * The temporal graph to test the operator. @@ -130,9 +156,9 @@ public void setUp() throws Exception { */ @Test public void testDegreeRange() throws Exception { - Collection> resultCollection = new ArrayList<>(); + Collection> resultCollection = new ArrayList<>(); - final DataSet> resultDataSet = testGraph + final DataSet> resultDataSet = testGraph .callForValue(new DegreeRangeEvolution(degreeType, TimeDimension.VALID_TIME)); resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection));