Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added stageId <--> jobId mapping in DAGScheduler
Browse files Browse the repository at this point in the history
  ...and make sure that DAGScheduler data structures are cleaned up on job completion.
  Initial effort and discussion at mesos/spark#842
markhamstra committed Nov 22, 2013
1 parent 7b383c4 commit af9e8ae
Showing 8 changed files with 271 additions and 76 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
@@ -244,12 +244,12 @@ private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
case Some(bytes) =>
return bytes
case None =>
statuses = mapStatuses(shuffleId)
statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
epochGotten = epoch
}
}
// If we got here, we failed to find the serialized locations in the cache, so we pulled
// out a snapshot of the locations as "locs"; let's serialize and return that
// out a snapshot of the locations as "statuses"; let's serialize and return that
val bytes = MapOutputTracker.serializeMapStatuses(statuses)
logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
// Add them into the table only if the epoch hasn't changed while we were working
@@ -274,6 +274,10 @@ private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
override def updateEpoch(newEpoch: Long) {
// This might be called on the MapOutputTrackerMaster if we're running in local mode.
}

def has(shuffleId: Int): Boolean = {
cachedSerializedStatuses.get(shuffleId).isDefined || mapStatuses.contains(shuffleId)
}
}

private[spark] object MapOutputTracker {
Original file line number Diff line number Diff line change
@@ -181,7 +181,9 @@ private[spark] class ClusterScheduler(
backend.killTask(tid, execId)
}
}
tsm.error("Stage %d was cancelled".format(stageId))
logInfo("Stage %d was cancelled".format(stageId))
tsm.removeAllRunningTasks()
taskSetFinished(tsm)
}
}

Loading

0 comments on commit af9e8ae

Please sign in to comment.