Skip to content

Commit

Permalink
Merge pull request apache#409 from tdas/unpersist
Browse files Browse the repository at this point in the history
Automatically unpersisting RDDs that have been cleaned up from DStreams

Earlier RDDs generated by DStreams were forgotten but not unpersisted. The system relied on the natural BlockManager LRU to drop the data. The cleaner.ttl was a hammer to clean up RDDs but it is something that needs to be set separately and need to be set very conservatively (at best, few minutes). This automatic unpersisting allows the system to handle this automatically, which reduces memory usage. As a side effect it will also improve GC performance as there are less number of objects stored in memory. In fact, for some workloads, it may allow RDDs to be cached as deserialized, which speeds up processing without too much GC overheads.

This is disabled by default. To enable it set configuration spark.streaming.unpersist to true. In future release, this will be set to true by default.

Also, reduced sleep time in TaskSchedulerImpl.stop() from 5 second to 1 second. From my conversation with Matei, there does not seem to be any good reason for the sleep for letting messages be sent out be so long.
  • Loading branch information
pwendell committed Jan 14, 2014
2 parents b07bc02 + 27311b1 commit 08b9fec
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,8 @@ private[spark] class TaskSchedulerImpl(
taskResultGetter.stop()
}

// sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
// TODO: Do something better !
Thread.sleep(5000L)
// sleeping for an arbitrary 1 seconds to ensure that messages are sent out.
Thread.sleep(1000L)
}

override def defaultParallelism() = backend.defaultParallelism()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
}


/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. However, the reduction is done incrementally
Expand Down Expand Up @@ -410,7 +409,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
}

/**
* Enable periodic checkpointing of RDDs of this DStream
* Enable periodic checkpointing of RDDs of this DStream.
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ abstract class DStream[T: ClassTag] (
private[streaming] def clearMetadata(time: Time) {
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
generatedRDDs --= oldRDDs.keys
if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {
logDebug("Unpersisting old RDDs: " + oldRDDs.keys.mkString(", "))
oldRDDs.values.foreach(_.unpersist(false))
}
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearMetadata(time))
Expand Down Expand Up @@ -760,7 +764,12 @@ abstract class DStream[T: ClassTag] (
this.foreachRDD(saveFunc)
}

def register() {
/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
def register(): DStream[T] = {
ssc.registerOutputStream(this)
this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
}
case None =>
logInfo("Nothing to delete")
logDebug("Nothing to delete")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._

import util.ManualClock
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag

class BasicOperationsSuite extends TestSuiteBase {
test("map") {
Expand Down Expand Up @@ -395,40 +397,31 @@ class BasicOperationsSuite extends TestSuiteBase {
Thread.sleep(1000)
}

test("forgetting of RDDs - map and window operations") {
assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq

val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq
test("rdd cleanup - map and window") {
val rememberDuration = Seconds(3)

assert(input.size === 10, "Number of inputs have changed")

def operation(s: DStream[Int]): DStream[(Int, Int)] = {
s.map(x => (x % 10, 1))
.window(Seconds(2), Seconds(1))
.window(Seconds(4), Seconds(2))
}

val ssc = setupStreams(input, operation _)
ssc.remember(rememberDuration)
runStreams[(Int, Int)](ssc, input.size, input.size / 2)

val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
val windowedStream1 = windowedStream2.dependencies.head
val operatedStream = runCleanupTest(conf, operation _,
numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
val mappedStream = windowedStream1.dependencies.head

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
assert(clock.time === Seconds(10).milliseconds)

// IDEALLY
// WindowedStream2 should remember till 7 seconds: 10, 8,
// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5
// MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3,
// Checkpoint remember durations
assert(windowedStream2.rememberDuration === rememberDuration)
assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
assert(mappedStream.rememberDuration ===
rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)

// IN THIS TEST
// WindowedStream2 should remember till 7 seconds: 10, 8,
// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
// MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
// MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2

// WindowedStream2
assert(windowedStream2.generatedRDDs.contains(Time(10000)))
Expand All @@ -445,4 +438,37 @@ class BasicOperationsSuite extends TestSuiteBase {
assert(mappedStream.generatedRDDs.contains(Time(2000)))
assert(!mappedStream.generatedRDDs.contains(Time(1000)))
}

test("rdd cleanup - updateStateByKey") {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
}
val stateStream = runCleanupTest(
conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))

assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
assert(stateStream.generatedRDDs.contains(Time(10000)))
assert(!stateStream.generatedRDDs.contains(Time(4000)))
}

/** Test cleanup of RDDs in DStream metadata */
def runCleanupTest[T: ClassTag](
conf2: SparkConf,
operation: DStream[Int] => DStream[T],
numExpectedOutput: Int = cleanupTestInput.size,
rememberDuration: Duration = null
): DStream[T] = {

// Setup the stream computation
assert(batchDuration === Seconds(1),
"Batch duration has changed from 1 second, check cleanup tests")
val ssc = setupStreams(cleanupTestInput, operation)
val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
assert(clock.time === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
operatedStream
}
}

0 comments on commit 08b9fec

Please sign in to comment.