Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added stageId <--> jobId mapping in DAGScheduler #842

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
cleanup
markhamstra committed Sep 5, 2013
commit d202a5227d9546ed953d48dc421800e260f3d2de
112 changes: 56 additions & 56 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
@@ -289,46 +289,58 @@ class DAGScheduler(
}

private def jobIdToStageIdsRemove(jobId: Int) {
def removeStage(stageId: Int) {
// data structures based on Stage
stageIdToStage.get(stageId).foreach { s => {
stageToInfos -= s
if (pendingTasks.contains(s)) {
logError("Tasks still pending for stage %d even though there are no more jobs registered for that stage."
.format(stageId))
pendingTasks -= s
}
if (waiting.contains(s)) {
logError("Still waiting on stage %d even though there are no more jobs registered for that stage."
.format(stageId))
waiting -= s
}
if (running.contains(s)) {
logError("Stage %d still running even though there are no more jobs registered for that stage."
.format(stageId))
running -= s
}
if (failed.contains(s)) {
logError("Stage %d still registered as failed even though there are no more jobs registered for that stage."
.format(stageId))
failed -= s
}
}}
// data structures based on StageId
stageIdToStage -= stageId
stageIdToJobIds -= stageId

logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size))
}

if (!jobIdToStageIds.contains(jobId)) {
logError("Trying to remove unregistered job " + jobId)
} else {
val registeredStages = jobIdToStageIds(jobId)
if (registeredStages.isEmpty) {
logError("No stages registered for job " + jobId)
} else { stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach {
case (stageId, jobSet) => {
if (!jobSet.contains(jobId)) {
logError("Job %d not registered for stage %d even though that stage was registered for the job"
.format(jobId, stageId))
} else {
jobSet -= jobId
}
if (jobSet.isEmpty) {
stageIdToStage.get(stageId).foreach { s =>
stageToInfos -= s
if (pendingTasks.contains(s)) {
logError("Tasks still pending for stage " + stageId + " even though there are no more jobs registered for that stage.")
pendingTasks -= s
}
if (waiting.contains(s)) {
logError("Still waiting on stage " + stageId + " even though there are no more jobs registered for that stage.")
waiting -= s
}
if (running.contains(s)) {
logError("Stage " + stageId + " still running even though there are no more jobs registered for that stage.")
running -= s
}
if (failed.contains(s)) {
logError("Stage " + stageId + " still registered as failed even though there are no more jobs registered for that stage.")
failed -= s
}
} else {
stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach {
case (stageId, jobSet) => {
if (!jobSet.contains(jobId)) {
logError("Job %d not registered for stage %d even though that stage was registered for the job"
.format(jobId, stageId))
} else {
jobSet -= jobId
}
if (jobSet.isEmpty) { // nobody needs this stage any more
removeStage(stageId)
}
stageIdToStage -= stageId
stageIdToJobIds -= stageId
logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size))
}
}
}}
}}
jobIdToStageIds -= jobId
}
}
@@ -901,30 +913,18 @@ class DAGScheduler(
}

private def cleanup(cleanupTime: Long) {
var sizeBefore = stageIdToStage.size
stageIdToStage.clearOldValues(cleanupTime)
logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size)

sizeBefore = shuffleToMapStage.size
shuffleToMapStage.clearOldValues(cleanupTime)
logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)

sizeBefore = pendingTasks.size
pendingTasks.clearOldValues(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)

sizeBefore = stageToInfos.size
stageToInfos.clearOldValues(cleanupTime)
logInfo("stageToInfos " + sizeBefore + " --> " + stageToInfos.size)

sizeBefore = jobIdToStageIds.size
jobIdToStageIds.clearOldValues(cleanupTime)
logInfo("jobIdToStageIds " + sizeBefore + " --> " + jobIdToStageIds.size)

sizeBefore = stageIdToJobIds.size
stageIdToJobIds.clearOldValues(cleanupTime)
logInfo("stageIdToJobIds " + sizeBefore + " --> " + stageIdToJobIds.size)

Map(
"stageIdToStage" -> stageIdToStage,
"shuffleToMapStage" -> shuffleToMapStage,
"pendingTasks" -> pendingTasks,
"stageToInfos" -> stageToInfos,
"jobIdToStageIds" -> jobIdToStageIds,
"stageIdToJobIds" -> stageIdToJobIds).
foreach { case(s, t) => {
val sizeBefore = t.size
t.clearOldValues(cleanupTime)
logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
}}
}

def stop() {