Skip to content

Commit

Permalink
Merge pull request apache#401 from andrewor14/master
Browse files Browse the repository at this point in the history
External sorting - Add number of bytes spilled to Web UI

Additionally, update test suite for external sorting to induce spilling.
  • Loading branch information
pwendell committed Jan 14, 2014
2 parents 08b9fec + 8399341 commit 0ca0d4d
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 47 deletions.
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ case class Aggregator[K, V, C] (
private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
Expand All @@ -47,17 +48,18 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
val combiners =
new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
combiners.iterator
}
}

def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
Expand All @@ -75,6 +77,8 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
combiners.iterator
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private[spark] class Executor(
//System.exit(1)
}
} finally {
// TODO: Unregister shuffle memory only for ShuffleMapTask
// TODO: Unregister shuffle memory only for ResultTask
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ class TaskMetrics extends Serializable {
*/
var resultSerializationTime: Long = _

/**
* The number of in-memory bytes spilled by this task
*/
var memoryBytesSpilled: Long = _

/**
* The number of on-disk bytes spilled by this task
*/
var diskBytesSpilled: Long = _

/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)

override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {

val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
Expand Down Expand Up @@ -150,6 +151,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
new InterruptibleIterator(context, map.iterator)
}
}
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ private[spark] class ExecutorSummary {
var succeededTasks : Int = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
var memoryBytesSpilled : Long = 0
var diskBytesSpilled : Long = 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
<th>Shuffle Spill (Memory)</th>
<th>Shuffle Spill (Disk)</th>
</thead>
<tbody>
{createExecutorTable()}
Expand Down Expand Up @@ -80,6 +82,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<td>{v.succeededTasks}</td>
<td>{Utils.bytesToString(v.shuffleRead)}</td>
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
<td>{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td>{Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTime = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageIdToTasksComplete = HashMap[Int, Int]()
val stageIdToTasksFailed = HashMap[Int, Int]()
Expand All @@ -78,6 +80,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
stageIdToMemoryBytesSpilled.remove(s.stageId)
stageIdToDiskBytesSpilled.remove(s.stageId)
stageIdToTasksActive.remove(s.stageId)
stageIdToTasksComplete.remove(s.stageId)
stageIdToTasksFailed.remove(s.stageId)
Expand Down Expand Up @@ -149,6 +153,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
Option(taskEnd.taskMetrics).foreach { taskMetrics =>
taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += taskMetrics.memoryBytesSpilled
y.diskBytesSpilled += taskMetrics.diskBytesSpilled
}
}
case _ => {}
Expand Down Expand Up @@ -184,6 +190,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite

stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
val memoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled).getOrElse(0L)
stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled

stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
val diskBytesSpilled = metrics.map(m => m.diskBytesSpilled).getOrElse(0L)
stageIdToDiskBytesSpilled(sid) += diskBytesSpilled

val taskList = stageIdToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
Expand Down
53 changes: 49 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
val hasShuffleWrite = shuffleWriteBytes > 0
val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L)
val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L)
val hasBytesSpilled = (memoryBytesSpilled > 0 && diskBytesSpilled > 0)

var activeTime = 0L
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
Expand All @@ -81,6 +84,16 @@ private[spark] class StagePage(parent: JobProgressUI) {
{Utils.bytesToString(shuffleWriteBytes)}
</li>
}
{if (hasBytesSpilled)
<li>
<strong>Shuffle spill (memory): </strong>
{Utils.bytesToString(memoryBytesSpilled)}
</li>
<li>
<strong>Shuffle spill (disk): </strong>
{Utils.bytesToString(diskBytesSpilled)}
</li>
}
</ul>
</div>

Expand All @@ -89,9 +102,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Seq("Errors")

val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)

// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
Expand Down Expand Up @@ -153,13 +167,29 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)

val memoryBytesSpilledSizes = validTasks.map {
case(info, metrics, exception) =>
metrics.get.memoryBytesSpilled.toDouble
}
val memoryBytesSpilledQuantiles = "Shuffle spill (memory)" +:
getQuantileCols(memoryBytesSpilledSizes)

val diskBytesSpilledSizes = validTasks.map {
case(info, metrics, exception) =>
metrics.get.diskBytesSpilled.toDouble
}
val diskBytesSpilledQuantiles = "Shuffle spill (disk)" +:
getQuantileCols(diskBytesSpilledSizes)

val listings: Seq[Seq[String]] = Seq(
serializationQuantiles,
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil,
if (hasBytesSpilled) diskBytesSpilledQuantiles else Nil)

val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
Expand All @@ -178,8 +208,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
}


def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
Expand All @@ -205,6 +234,14 @@ private[spark] class StagePage(parent: JobProgressUI) {
val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")

val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")

val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")

<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
Expand Down Expand Up @@ -234,6 +271,14 @@ private[spark] class StagePage(parent: JobProgressUI) {
{shuffleWriteReadable}
</td>
}}
{if (bytesSpilled) {
<td sorttable_customkey={memoryBytesSpilledSortable}>
{memoryBytesSpilledReadable}
</td>
<td sorttable_customkey={diskBytesSpilledSortable}>
{diskBytesSpilledReadable}
</td>
}}
<td>{exception.map(e =>
<span>
{e.className} ({e.description})<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,18 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}

// Number of pairs in the in-memory map
private var numPairsInMemory = 0
private var numPairsInMemory = 0L

// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000

// How many times we have spilled so far
private var spillCount = 0

// Number of bytes spilled in total
private var _memoryBytesSpilled = 0L
private var _diskBytesSpilled = 0L

private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
private val comparator = new KCComparator[K, C]
Expand Down Expand Up @@ -150,6 +154,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
_diskBytesSpilled += writer.bytesWritten
writer.close()
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
Expand All @@ -161,8 +166,12 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
_memoryBytesSpilled += mapSize
}

def memoryBytesSpilled: Long = _memoryBytesSpilled
def diskBytesSpilled: Long = _diskBytesSpilled

/**
* Return an iterator that merges the in-memory map with the spilled maps.
* If no spill has occurred, simply return the in-memory map's iterator.
Expand Down
Loading

0 comments on commit 0ca0d4d

Please sign in to comment.