From 80e73ed0004cceb47a450c79aa4faa598502fa45 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 13 Jan 2014 22:56:57 -0800 Subject: [PATCH] Adding minimal additional functionality to EdgeRDD --- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index c0a23d1986754..832b7816fe833 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -57,6 +57,23 @@ class EdgeRDD[@specialized ED: ClassTag]( }, preservesPartitioning = true)) } + /** + * Map the values in an edge partitioning preserving the structure but changing the values. + * + * @tparam ED2 the new edge value type + * @param f the function from an edge to a new edge value + * @return a new EdgeRDD containing the new edge values + */ + def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] = + mapEdgePartitions((pid, part) => part.map(f)) + + /** + * Reverse all the edges in this RDD. + * + * @return a new EdgeRDD containing all the edges reversed + */ + def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse) + /** * Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same * [[PartitionStrategy]].