Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexedRDD for accelerated joins #848

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings

implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
rdd.pairRDDFunctions

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
rdd: RDD[(K, V)]) =
Expand Down
78 changes: 65 additions & 13 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other, partitioner))
}

/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
Expand All @@ -275,6 +278,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
Expand All @@ -287,6 +292,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
Expand Down Expand Up @@ -325,16 +332,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] =
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other))
}

/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] =
def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other, numPartitions))
}

/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
Expand All @@ -343,6 +356,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
Expand All @@ -354,6 +369,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* into `numPartitions` partitions.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
Expand All @@ -365,6 +382,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
Expand All @@ -376,6 +395,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* RDD into the given number of partitions.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
Expand Down Expand Up @@ -412,55 +433,86 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W])] =
: JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
= fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))

def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
}
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
}

/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
}

/** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
}

/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
Expand Down
Loading