Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidations of shuffle files from different map tasks #635

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))

val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
for (((address, size), index) <- statuses.zipWithIndex) {
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
val splitsByAddress = new HashMap[BlockManagerId, HashMap[Int, Long]]
for ((address, groupId, size) <- statuses) {
val groupedSplits = splitsByAddress.getOrElseUpdate(address, new HashMap[Int, Long])
val currSize = groupedSplits.getOrElse(groupId, 0L)
if (size > currSize) groupedSplits.put(groupId, size)
}

val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = splitsByAddress.toSeq.map {
case (address, splits) =>
(address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
(address, splits.toSeq.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
}

logDebug("Fetched grouped splits: " + blocksByAddress)
def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[(K, V)] = {
val blockId = blockPair._1
val blockOption = blockPair._2
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ private[spark] class MapOutputTracker extends Logging {
private val fetching = new HashSet[Int]

// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
// Return an array of map output locations of the specific reduceId, one for each ShuffleMapTask, in the form of
// (BlockManagerId, groupId of the shuffle file, size of the shuffle file when the task writes its output)
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Int, Long)] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment on what the Int and Long represent.

val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
Expand Down Expand Up @@ -280,15 +282,15 @@ private[spark] object MapOutputTracker {
private def convertMapStatuses(
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
statuses: Array[MapStatus]): Array[(BlockManagerId, Int, Long)] = {
assert (statuses != null)
statuses.map {
status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
(status.location, status.groupId, decompressSize(status.compressedSizes(reduceId)))
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/spark/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
} else if (mapSideCombine) {
val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
logDebug("serializerClass=" + serializerClass)
partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
logDebug("serializerClass=" + serializerClass)
values.mapPartitions(aggregator.combineValuesByKey(_), true)
}
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ import java.io.{ObjectOutput, ObjectInput, Externalizable}
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* The map output sizes are compressed using MapOutputTracker.compressSize.
*/
private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
private[spark] class MapStatus(var location: BlockManagerId, var groupId: Int, var compressedSizes: Array[Byte])
extends Externalizable {

def this() = this(null, null) // For deserialization only
def this() = this(null, 0, null) // For deserialization only

def writeExternal(out: ObjectOutput) {
location.writeExternal(out)
out.writeInt(groupId)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}

def readExternal(in: ObjectInput) {
location = BlockManagerId(in)
groupId = in.readInt()
compressedSizes = new Array[Byte](in.readInt())
in.readFully(compressedSizes)
}
Expand Down
25 changes: 11 additions & 14 deletions core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,48 +132,45 @@ private[spark] class ShuffleMapTask(

val blockManager = SparkEnv.get.blockManager
var shuffle: ShuffleBlocks = null
var buckets: ShuffleWriterGroup = null
var group: ShuffleWriterGroup = null

try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partition)
group = shuffle.acquireWriters(partition)

// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, taskContext)) {
val pair = elem.asInstanceOf[(Any, Any)]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair)
group.writers(bucketId).write(pair)
}

// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.size()
totalBytes += size
MapOutputTracker.compressSize(size)
val compressedSizes: Array[Byte] = group.writers.map { writer: BlockObjectWriter =>
totalBytes += writer.commit()
MapOutputTracker.compressSize(writer.size())
}

// Update shuffle metrics.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

return new MapStatus(blockManager.blockManagerId, compressedSizes)
return new MapStatus(blockManager.blockManagerId, group.id, compressedSizes)
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (buckets != null) {
buckets.writers.foreach(_.revertPartialWrites())
if (group != null) {
group.writers.foreach(_.revertPartialWrites())
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && buckets != null) {
shuffle.releaseWriters(buckets)
if (shuffle != null && group != null) {
shuffle.releaseWriters(group)
}
// Execute the callbacks on task completion.
taskContext.executeOnCompleteCallbacks()
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ private[spark] class BlockManager(
* never deletes (recent) items.
*/
def getLocalFromDisk(blockId: String, serializer: Serializer): Option[Iterator[Any]] = {
shuffleBlockManager.closeBlock(blockId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comment to the ShuffleBlockManager explaining the blocks are closed here?

diskStore.getValues(blockId, serializer).orElse(
sys.error("Block " + blockId + " not found on disk, though it should be"))
}
Expand Down Expand Up @@ -382,6 +383,8 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (ShuffleBlockManager.isShuffle(blockId)) {
//close the shuffle Writers for blockId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do shuflfe blocks still walk through this code path? If no, I would just throw an exception if it is a shuffle block here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remote shuffle blocks still go here

shuffleBlockManager.closeBlock(blockId)
return diskStore.getBytes(blockId) match {
case Some(bytes) =>
Some(bytes)
Expand Down
28 changes: 25 additions & 3 deletions core/src/main/scala/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,27 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
extends BlockStore(blockManager) with Logging {

class DiskBlockObjectWriter(blockId: String, serializer: Serializer, bufferSize: Int)
extends BlockObjectWriter(blockId) {
extends BlockObjectWriter(blockId) with Logging {

private val f: File = createFile(blockId /*, allowAppendExisting */)
private var f: File = createFile(blockId)

// The file channel, used for repositioning / truncating the file.
private var channel: FileChannel = null
private var bs: OutputStream = null
private var objOut: SerializationStream = null
private var lastValidPosition = 0L
private var initialPosition = 0L

override def open(): DiskBlockObjectWriter = {
val fos = new FileOutputStream(f, true)
channel = fos.getChannel()
bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(fos))
objOut = serializer.newInstance().serializeStream(bs)

//commit possible file header
commit()
initialPosition = lastValidPosition

this
}

Expand All @@ -59,7 +65,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
// Flush the partial writes, and set valid length to be the length of the entire file.
// Return the number of bytes written for this commit.
override def commit(): Long = {
// NOTE: Flush the serializer first and then the compressed/buffered output stream
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
Expand All @@ -68,11 +73,28 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}

override def revertPartialWrites() {
// Revert by discarding current writes, except that if no values have been committed,
// we revert by recreate the file (otherwise there are errors when reading objects from the file later on
if (lastValidPosition == initialPosition)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialPosition is set to the size of the file when the file is opened. Isn't it problematic if we open an existing file, did some writes and then want to revert, we could delete the old file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shuffle file is closed when we get a shuffle request, and it should never be re-opened (and if you re-open the file and append it, you may corrupt the file as it may have a tailer). Maybe we should first delete the shuffle file if it exits when we first open it (e.g., in case we need to re-run the map tasks).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we re-run the map tasks, wouldn't that wipe out the shuffle outputs for all other previous map tasks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle file is closed when all the map tasks are done (after the shuffle request is received). If we need to re-run a map task afterwards,
(1) There is no easy way to remove the results of the previous run of this task from the shuffle file
(2) You cannot re-open a closed shuffle file and append to it (e.g., kyro will append some tailer when closing the file)
(3) If a fetch fails, Spark will re-run all the map tasks (for the same shuffle) whose output locations are at the failing node; so it's OK to delete their outputs.

I actually also tried throwing random exceptions during shuffle and it works fine. Again, is there a good way to include such tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the response. It would be useful to actually put whatever you just wrote into the code comment block.

For failure testing, can we do something similar to what FailureSutie does?

recreateFile()
else
discardWrites()
}

private def recreateFile () {
close ()
f.delete()
f = createFile(blockId)
open()
}

private def discardWrites () {
// Discard current writes. We do this by flushing the outstanding writes and
// truncate the file to the last valid position.
objOut.flush()
bs.flush()
channel.truncate(lastValidPosition)
channel.position(lastValidPosition)
}

override def write(value: Any) {
Expand Down
Loading