diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 0c823104215aa..278969655de48 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -39,6 +39,7 @@ if [ -f "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-dep CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes" DEPS_ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-deps.jar` @@ -59,6 +60,7 @@ if [[ $SPARK_TESTING == 1 ]]; then CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes" fi diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index dd25d0c6ed5f4..4148581f527fe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -288,7 +288,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } - new ShuffledRDD[K, V, (K, V)](self, partitioner) + if (self.partitioner == partitioner) self else new ShuffledRDD[K, V, (K, V)](self, partitioner) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index edd4f381db50c..cd90a1561a975 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -548,6 +548,11 @@ abstract class RDD[T: ClassTag]( * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ + def zipPartitions[B: ClassTag, V: ClassTag] + (rdd2: RDD[B], preservesPartitioning: Boolean) + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) + def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index a1a452315d143..856eb772a1084 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -22,10 +22,72 @@ package org.apache.spark.util.collection * A simple, fixed-size bit set implementation. This implementation is fast because it avoids * safety/bound checking. */ -class BitSet(numBits: Int) { +class BitSet(numBits: Int) extends Serializable { - private[this] val words = new Array[Long](bit2words(numBits)) - private[this] val numWords = words.length + private val words = new Array[Long](bit2words(numBits)) + private val numWords = words.length + + /** + * Compute the capacity (number of bits) that can be represented + * by this bitset. + */ + def capacity: Int = numWords * 64 + + /** + * Set all the bits up to a given index + */ + def setUntil(bitIndex: Int) { + val wordIndex = bitIndex >> 6 // divide by 64 + var i = 0 + while(i < wordIndex) { words(i) = -1; i += 1 } + if(wordIndex < words.size) { + // Set the remaining bits (note that the mask could still be zero) + val mask = ~(-1L << (bitIndex & 0x3f)) + words(wordIndex) |= mask + } + } + + /** + * Compute the bit-wise AND of the two sets returning the + * result. + */ + def &(other: BitSet): BitSet = { + val newBS = new BitSet(math.max(capacity, other.capacity)) + val smaller = math.min(numWords, other.numWords) + assert(newBS.numWords >= numWords) + assert(newBS.numWords >= other.numWords) + var ind = 0 + while( ind < smaller ) { + newBS.words(ind) = words(ind) & other.words(ind) + ind += 1 + } + newBS + } + + /** + * Compute the bit-wise OR of the two sets returning the + * result. + */ + def |(other: BitSet): BitSet = { + val newBS = new BitSet(math.max(capacity, other.capacity)) + assert(newBS.numWords >= numWords) + assert(newBS.numWords >= other.numWords) + val smaller = math.min(numWords, other.numWords) + var ind = 0 + while( ind < smaller ) { + newBS.words(ind) = words(ind) | other.words(ind) + ind += 1 + } + while( ind < numWords ) { + newBS.words(ind) = words(ind) + ind += 1 + } + while( ind < other.numWords ) { + newBS.words(ind) = other.words(ind) + ind += 1 + } + newBS + } /** * Sets the bit at the specified index to true. @@ -36,6 +98,11 @@ class BitSet(numBits: Int) { words(index >> 6) |= bitmask // div by 64 and mask } + def unset(index: Int) { + val bitmask = 1L << (index & 0x3f) // mod 64 and shift + words(index >> 6) &= ~bitmask // div by 64 and mask + } + /** * Return the value of the bit with the specified index. The value is true if the bit with * the index is currently set in this BitSet; otherwise, the result is false. @@ -48,6 +115,20 @@ class BitSet(numBits: Int) { (words(index >> 6) & bitmask) != 0 // div by 64 and mask } + /** + * Get an iterator over the set bits. + */ + def iterator = new Iterator[Int] { + var ind = nextSetBit(0) + override def hasNext: Boolean = ind >= 0 + override def next() = { + val tmp = ind + ind = nextSetBit(ind+1) + tmp + } + } + + /** Return the number of bits set to true in this BitSet. */ def cardinality(): Int = { var sum = 0 diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 87e009a4de93d..5ded5d0b6da84 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -84,6 +84,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( protected var _bitset = new BitSet(_capacity) + def getBitSet = _bitset + // Init of the array in constructor (instead of in declaration) to work around a Scala compiler // specialization bug that would generate two arrays (one for Object and one for specialized T). protected var _data: Array[T] = _ @@ -161,7 +163,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( def getPos(k: T): Int = { var pos = hashcode(hasher.hash(k)) & _mask var i = 1 - while (true) { + val maxProbe = _data.size + while (i < maxProbe) { if (!_bitset.get(pos)) { return INVALID_POS } else if (k == _data(pos)) { @@ -179,6 +182,22 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( /** Return the value at the specified position. */ def getValue(pos: Int): T = _data(pos) + def iterator = new Iterator[T] { + var pos = nextPos(0) + override def hasNext: Boolean = pos != INVALID_POS + override def next(): T = { + val tmp = getValue(pos) + pos = nextPos(pos+1) + tmp + } + } + + /** Return the value at the specified position. */ + def getValueSafe(pos: Int): T = { + assert(_bitset.get(pos)) + _data(pos) + } + /** * Return the next position with an element stored, starting from the given position inclusively. */ @@ -259,7 +278,7 @@ object OpenHashSet { * A set of specialized hash function implementation to avoid boxing hash code computation * in the specialized implementation of OpenHashSet. */ - sealed class Hasher[@specialized(Long, Int) T] { + sealed class Hasher[@specialized(Long, Int) T] extends Serializable { def hash(o: T): Int = o.hashCode() } diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index ad7969d012283..c529d89ffd192 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -21,7 +21,7 @@ - + @@ -68,9 +68,10 @@