diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 8b30cd4bfe69d..cda3a95c84f22 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -34,7 +34,8 @@ case class Aggregator[K, V, C] ( private val sparkConf = SparkEnv.get.conf private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) - def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], + context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null @@ -47,17 +48,18 @@ case class Aggregator[K, V, C] ( } combiners.iterator } else { - val combiners = - new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) + val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) { val (k, v) = iter.next() combiners.insert(k, v) } + context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled combiners.iterator } } - def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { + def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kc: Product2[K, C] = null @@ -75,6 +77,8 @@ case class Aggregator[K, V, C] ( val (k, c) = iter.next() combiners.insert(k, c) } + context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled combiners.iterator } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7f31d7e6f8aec..c1b57f74d7e9a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -279,7 +279,7 @@ private[spark] class Executor( //System.exit(1) } } finally { - // TODO: Unregister shuffle memory only for ShuffleMapTask + // TODO: Unregister shuffle memory only for ResultTask val shuffleMemoryMap = env.shuffleMemoryMap shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(Thread.currentThread().getId) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index bb1471d9ee16a..0c8f4662a5f3a 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -48,6 +48,16 @@ class TaskMetrics extends Serializable { */ var resultSerializationTime: Long = _ + /** + * The number of in-memory bytes spilled by this task + */ + var memoryBytesSpilled: Long = _ + + /** + * The number of on-disk bytes spilled by this task + */ + var diskBytesSpilled: Long = _ + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index a73714abcaf72..9c6b308804c77 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -106,6 +106,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { + val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size @@ -150,6 +151,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: map.insert(kv._1, new CoGroupValue(kv._2, depNum)) } } + context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled new InterruptibleIterator(context, map.iterator) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1248409e3513a..dd25d0c6ed5f4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -88,20 +88,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) + new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { - val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) + val combined = self.mapPartitionsWithContext((context, iter) => { + aggregator.combineValuesByKey(iter, context) + }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter)) + new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) + new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 3c53e88380193..64e22a30b48f9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -24,4 +24,6 @@ private[spark] class ExecutorSummary { var succeededTasks : Int = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 + var memoryBytesSpilled : Long = 0 + var diskBytesSpilled : Long = 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 0dd876480afa0..ab03eb5ce1ab4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -48,6 +48,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) Succeeded Tasks Shuffle Read Shuffle Write + Shuffle Spill (Memory) + Shuffle Spill (Disk) {createExecutorTable()} @@ -80,6 +82,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {v.succeededTasks} {Utils.bytesToString(v.shuffleRead)} {Utils.bytesToString(v.shuffleWrite)} + {Utils.bytesToString(v.memoryBytesSpilled)} + {Utils.bytesToString(v.diskBytesSpilled)} } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index bcd282445050d..858a10ce750ff 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -52,6 +52,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTime = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() + val stageIdToMemoryBytesSpilled = HashMap[Int, Long]() + val stageIdToDiskBytesSpilled = HashMap[Int, Long]() val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]() val stageIdToTasksComplete = HashMap[Int, Int]() val stageIdToTasksFailed = HashMap[Int, Int]() @@ -78,6 +80,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToTime.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) stageIdToShuffleWrite.remove(s.stageId) + stageIdToMemoryBytesSpilled.remove(s.stageId) + stageIdToDiskBytesSpilled.remove(s.stageId) stageIdToTasksActive.remove(s.stageId) stageIdToTasksComplete.remove(s.stageId) stageIdToTasksFailed.remove(s.stageId) @@ -149,6 +153,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList Option(taskEnd.taskMetrics).foreach { taskMetrics => taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } + y.memoryBytesSpilled += taskMetrics.memoryBytesSpilled + y.diskBytesSpilled += taskMetrics.diskBytesSpilled } } case _ => {} @@ -184,6 +190,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToShuffleWrite(sid) += shuffleWrite totalShuffleWrite += shuffleWrite + stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) + val memoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled).getOrElse(0L) + stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled + + stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L) + val diskBytesSpilled = metrics.map(m => m.diskBytesSpilled).getOrElse(0L) + stageIdToDiskBytesSpilled(sid) += diskBytesSpilled + val taskList = stageIdToTaskInfos.getOrElse( sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList -= ((taskEnd.taskInfo, None, None)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index d1e58016beaac..cfaf121895ec2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -56,6 +56,9 @@ private[spark] class StagePage(parent: JobProgressUI) { val hasShuffleRead = shuffleReadBytes > 0 val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) val hasShuffleWrite = shuffleWriteBytes > 0 + val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L) + val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L) + val hasBytesSpilled = (memoryBytesSpilled > 0 && diskBytesSpilled > 0) var activeTime = 0L listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) @@ -81,6 +84,16 @@ private[spark] class StagePage(parent: JobProgressUI) { {Utils.bytesToString(shuffleWriteBytes)} } + {if (hasBytesSpilled) +
  • + Shuffle spill (memory): + {Utils.bytesToString(memoryBytesSpilled)} +
  • +
  • + Shuffle spill (disk): + {Utils.bytesToString(diskBytesSpilled)} +
  • + } @@ -89,9 +102,10 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Duration", "GC Time", "Result Ser Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ + {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ Seq("Errors") - val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks) + val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined)) @@ -153,13 +167,29 @@ private[spark] class StagePage(parent: JobProgressUI) { } val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) + val memoryBytesSpilledSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.memoryBytesSpilled.toDouble + } + val memoryBytesSpilledQuantiles = "Shuffle spill (memory)" +: + getQuantileCols(memoryBytesSpilledSizes) + + val diskBytesSpilledSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.diskBytesSpilled.toDouble + } + val diskBytesSpilledQuantiles = "Shuffle spill (disk)" +: + getQuantileCols(diskBytesSpilledSizes) + val listings: Seq[Seq[String]] = Seq( serializationQuantiles, serviceQuantiles, gettingResultQuantiles, schedulerDelayQuantiles, if (hasShuffleRead) shuffleReadQuantiles else Nil, - if (hasShuffleWrite) shuffleWriteQuantiles else Nil) + if (hasShuffleWrite) shuffleWriteQuantiles else Nil, + if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil, + if (hasBytesSpilled) diskBytesSpilledQuantiles else Nil) val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") @@ -178,8 +208,7 @@ private[spark] class StagePage(parent: JobProgressUI) { } } - - def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean) + def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = { def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = trace.map(e => {e.toString}) @@ -205,6 +234,14 @@ private[spark] class StagePage(parent: JobProgressUI) { val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms => if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("") + val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled} + val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") + val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + + val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled} + val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") + val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + {info.index} {info.taskId} @@ -234,6 +271,14 @@ private[spark] class StagePage(parent: JobProgressUI) { {shuffleWriteReadable} }} + {if (bytesSpilled) { + + {memoryBytesSpilledReadable} + + + {diskBytesSpilledReadable} + + }} {exception.map(e => {e.className} ({e.description})
    diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index e3bcd895aa28f..c63f47cc45f51 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -77,7 +77,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } // Number of pairs in the in-memory map - private var numPairsInMemory = 0 + private var numPairsInMemory = 0L // Number of in-memory pairs inserted before tracking the map's shuffle memory usage private val trackMemoryThreshold = 1000 @@ -85,6 +85,10 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( // How many times we have spilled so far private var spillCount = 0 + // Number of bytes spilled in total + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false) private val comparator = new KCComparator[K, C] @@ -150,6 +154,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( writer.commit() } finally { // Partial failures cannot be tolerated; do not revert partial writes + _diskBytesSpilled += writer.bytesWritten writer.close() } currentMap = new SizeTrackingAppendOnlyMap[K, C] @@ -161,8 +166,12 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( shuffleMemoryMap(Thread.currentThread().getId) = 0 } numPairsInMemory = 0 + _memoryBytesSpilled += mapSize } + def memoryBytesSpilled: Long = _memoryBytesSpilled + def diskBytesSpilled: Long = _diskBytesSpilled + /** * Return an iterator that merges the in-memory map with the spilled maps. * If no spill has occurred, simply return the in-memory map's iterator. diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index ef957bb0e5d17..c3391f3e535bc 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -9,22 +9,19 @@ import org.apache.spark.SparkContext._ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { - override def beforeEach() { - val conf = new SparkConf(false) - conf.set("spark.shuffle.externalSorting", "true") - sc = new SparkContext("local", "test", conf) - } - - val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i) - val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => { + private val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i) + private val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => { buffer += i } - val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] = + private val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] = (buf1, buf2) => { buf1 ++= buf2 } test("simple insert") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) @@ -48,6 +45,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("insert with collision") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) @@ -67,6 +67,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("ordering") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) map1.insert(1, 10) @@ -109,6 +112,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("null keys and values") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) map.insert(1, 5) @@ -147,6 +153,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("simple aggregator") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + // reduceByKey val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1)) val result1 = rdd.reduceByKey(_+_).collect() @@ -159,6 +168,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("simple cogroup") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) val rdd1 = sc.parallelize(1 to 4).map(i => (i, i)) val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i)) val result = rdd1.cogroup(rdd2).collect() @@ -175,56 +186,56 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("spilling") { - // TODO: Figure out correct memory parameters to actually induce spilling - // System.setProperty("spark.shuffle.buffer.mb", "1") - // System.setProperty("spark.shuffle.buffer.fraction", "0.05") + // TODO: Use SparkConf (which currently throws connection reset exception) + System.setProperty("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext("local-cluster[1,1,512]", "test") - // reduceByKey - should spill exactly 6 times - val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i)) + // reduceByKey - should spill ~8 times + val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i)) val resultA = rddA.reduceByKey(math.max(_, _)).collect() - assert(resultA.length == 5000) + assert(resultA.length == 50000) resultA.foreach { case(k, v) => k match { case 0 => assert(v == 1) - case 2500 => assert(v == 5001) - case 4999 => assert(v == 9999) + case 25000 => assert(v == 50001) + case 49999 => assert(v == 99999) case _ => } } - // groupByKey - should spill exactly 11 times - val rddB = sc.parallelize(0 until 10000).map(i => (i/4, i)) + // groupByKey - should spill ~17 times + val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i)) val resultB = rddB.groupByKey().collect() - assert(resultB.length == 2500) + assert(resultB.length == 25000) resultB.foreach { case(i, seq) => i match { case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3)) - case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003)) - case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999)) + case 12500 => assert(seq.toSet == Set[Int](50000, 50001, 50002, 50003)) + case 24999 => assert(seq.toSet == Set[Int](99996, 99997, 99998, 99999)) case _ => } } - // cogroup - should spill exactly 7 times - val rddC1 = sc.parallelize(0 until 1000).map(i => (i, i)) - val rddC2 = sc.parallelize(0 until 1000).map(i => (i%100, i)) + // cogroup - should spill ~7 times + val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i)) + val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i)) val resultC = rddC1.cogroup(rddC2).collect() - assert(resultC.length == 1000) + assert(resultC.length == 10000) resultC.foreach { case(i, (seq1, seq2)) => i match { case 0 => assert(seq1.toSet == Set[Int](0)) - assert(seq2.toSet == Set[Int](0, 100, 200, 300, 400, 500, 600, 700, 800, 900)) - case 500 => - assert(seq1.toSet == Set[Int](500)) + assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000)) + case 5000 => + assert(seq1.toSet == Set[Int](5000)) assert(seq2.toSet == Set[Int]()) - case 999 => - assert(seq1.toSet == Set[Int](999)) + case 9999 => + assert(seq1.toSet == Set[Int](9999)) assert(seq2.toSet == Set[Int]()) case _ => } } - } - // TODO: Test memory allocation for multiple concurrently running tasks + System.clearProperty("spark.shuffle.memoryFraction") + } }