Skip to content

Commit

Permalink
[dbs-leipzig#1570] Added interval calculation and changed tests to ch…
Browse files Browse the repository at this point in the history
…eck for intervals, changed result values to float
  • Loading branch information
ChristopherLausch committed Jan 16, 2024
1 parent 24b218f commit 47c0847
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToDegreeRange;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.*;

import java.util.Objects;

/**
* Operator that calculates the degree range evolution of a temporal graph for the
* whole lifetime of the graph.
*/
public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Integer>>> {
public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple3<Long, Long, Float>>> {
/**
* The time dimension that will be considered.
*/
Expand All @@ -55,7 +52,7 @@ public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) {
}

@Override
public DataSet<Tuple2<Long, Integer>> execute(TemporalGraph graph) {
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
Expand All @@ -65,6 +62,7 @@ public DataSet<Tuple2<Long, Integer>> execute(TemporalGraph graph) {
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
.reduceGroup(new GroupDegreeTreesToDegreeRange());
.reduceGroup(new GroupDegreeTreesToDegreeRange())
.mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* that represents the aggregated degree value for the whole graph at the given time.
*/
public class GroupDegreeTreesToDegreeRange
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Integer>> {
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Float>> {

/**
* Creates an instance of this group reduce function.
Expand All @@ -43,7 +43,7 @@ public GroupDegreeTreesToDegreeRange() {

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Integer>> collector) throws Exception {
Collector<Tuple2<Long, Float>> collector) throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
Expand All @@ -61,9 +61,9 @@ public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,

for (Long timePoint : timePoints) {
// skip last default time
if (Long.MAX_VALUE == timePoint) {
/*if (Long.MAX_VALUE == timePoint) {
continue;
}
}*/
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
Expand All @@ -83,8 +83,8 @@ public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
}

// Here, every tree with this time point is iterated. Now we need to aggregate for the current time.
int maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0);
int minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0);
float maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0).floatValue();
float minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0).floatValue();
collector.collect(new Tuple2<>(timePoint, maxDegree - minDegree));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;

/**
* A map partition function that calculates the intervals in which the values stay the same
* by checking if the current value is the same as the previous one
*
*/

public class MapDegreesToInterval implements MapPartitionFunction<Tuple2<Long, Float>, Tuple3<Long, Long, Float>> {
@Override
public void mapPartition(Iterable<Tuple2<Long, Float>> values, Collector<Tuple3<Long, Long, Float>> out) {

//set starting values to null
Long startTimestamp = null;
Long endTimestamp = null;
Float value = null;
Boolean collected = false;

//loop through each tuple
for (Tuple2<Long, Float> tuple : values) {
if (startTimestamp == null) {
// First element in the group
startTimestamp = tuple.f0;
endTimestamp = tuple.f0;
value = tuple.f1;
} else {
if (!tuple.f1.equals(value)) {
// Value changed, emit the current interval and start a new one
out.collect(new Tuple3<>(startTimestamp, tuple.f0, value));
startTimestamp = tuple.f0;
endTimestamp = tuple.f0;
value = tuple.f1;
collected = true;
} else {
// Extend the current interval
endTimestamp = tuple.f0;
collected = false;
}
}
}
//check if the latest interval was collected, if not, collect it
//this happens when the last interval has the value 0
if (!collected) {
out.collect(new Tuple3<>(startTimestamp, endTimestamp, value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
Expand All @@ -42,42 +43,67 @@ public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase {
/**
* The expected in-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_IN_DEGREES = new ArrayList<>();
private static final List<Tuple3<Long, Long, Float>> EXPECTED_IN_DEGREES = new ArrayList<>();
/**
* The expected out-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_OUT_DEGREES = new ArrayList<>();
private static final List<Tuple3<Long, Long, Float>> EXPECTED_OUT_DEGREES = new ArrayList<>();
/**
* The expected degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_BOTH_DEGREES = new ArrayList<>();
private static final List<Tuple3<Long, Long, Float>> EXPECTED_BOTH_DEGREES = new ArrayList<>();

static {


// IN DEGREES
EXPECTED_IN_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f));
EXPECTED_IN_DEGREES.add(new Tuple3<>(0L, 4L, 1f));
EXPECTED_IN_DEGREES.add(new Tuple3<>(4L, 5L, 2f));
EXPECTED_IN_DEGREES.add(new Tuple3<>(5L, Long.MAX_VALUE, 1f));

/*
EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2));
EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1));
*/

// OUT DEGREES
EXPECTED_OUT_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f));
EXPECTED_OUT_DEGREES.add(new Tuple3<>(0L, 4L, 1f));
EXPECTED_OUT_DEGREES.add(new Tuple3<>(4L, 5L, 2f));
EXPECTED_OUT_DEGREES.add(new Tuple3<>(5L, Long.MAX_VALUE, 1f));

/*
EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1));

*/
// DEGREES
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(0L, 4L, 1f));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(4L, 5L, 2f));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(5L, 6L, 1f));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(6L, 7L, 2f));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(7L, Long.MAX_VALUE, 1f));

/*
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1));
*/
}


/**
* The degree type to test.
*/
Expand All @@ -88,7 +114,7 @@ public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase {
* The expected degree range evolution for the given type.
*/
@Parameterized.Parameter(1)
public List<Tuple2<Long, Integer>> expectedDegrees;
public List<Tuple3<Long, Long, Float>> expectedDegrees;

/**
* The temporal graph to test the operator.
Expand Down Expand Up @@ -130,9 +156,9 @@ public void setUp() throws Exception {
*/
@Test
public void testDegreeRange() throws Exception {
Collection<Tuple2<Long, Integer>> resultCollection = new ArrayList<>();
Collection<Tuple3<Long, Long, Float>> resultCollection = new ArrayList<>();

final DataSet<Tuple2<Long, Integer>> resultDataSet = testGraph
final DataSet<Tuple3<Long, Long, Float>> resultDataSet = testGraph
.callForValue(new DegreeRangeEvolution(degreeType, TimeDimension.VALID_TIME));

resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection));
Expand Down

0 comments on commit 47c0847

Please sign in to comment.