Skip to content

Commit

Permalink
SPARK-1639. Tidy up some Spark on YARN code
Browse files Browse the repository at this point in the history
This contains a bunch of small tidyings of the Spark on YARN code.

I focused on the yarn stable code.  @tgravescs, let me know if you'd like me to make these for the alpha code as well.

Author: Sandy Ryza <[email protected]>

Closes apache#561 from sryza/sandy-spark-1639 and squashes the following commits:

72b6a02 [Sandy Ryza] Fix comment and set name on driver thread
c2190b2 [Sandy Ryza] SPARK-1639. Tidy up some Spark on YARN code
  • Loading branch information
sryza authored and tgravescs committed Jun 11, 2014
1 parent 6e11930 commit 2a4225d
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
Expand Down Expand Up @@ -416,6 +416,7 @@ object ApplicationMaster {
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
private val ALLOCATE_HEARTBEAT_INTERVAL = 100

def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
Expand Down Expand Up @@ -467,13 +468,22 @@ object ApplicationMaster {
})
}

// Wait for initialization to complete and atleast 'some' nodes can get allocated.
modified
}


/**
* Returns when we've either
* 1) received all the requested executors,
* 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
* 3) hit an error that causes us to terminate trying to get containers.
*/
def waitForInitialAllocations() {
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}
modified
}

def main(argStrings: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn

import java.io.File
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
Expand All @@ -37,7 +36,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{Apps, Records}
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf, SparkContext}

/**
Expand Down Expand Up @@ -169,14 +168,13 @@ trait ClientBase extends Logging {
destPath
}

def qualifyForLocal(localURI: URI): Path = {
private def qualifyForLocal(localURI: URI): Path = {
var qualifiedURI = localURI
// If not specified assume these are in the local filesystem to keep behavior like Hadoop
// If not specified, assume these are in the local filesystem to keep behavior like Hadoop
if (qualifiedURI.getScheme() == null) {
qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString)
}
val qualPath = new Path(qualifiedURI)
qualPath
new Path(qualifiedURI)
}

def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
Expand Down Expand Up @@ -305,13 +303,13 @@ trait ClientBase extends Logging {

val amMemory = calculateAMMemory(newApp)

val JAVA_OPTS = ListBuffer[String]()
val javaOpts = ListBuffer[String]()

// Add Xmx for AM memory
JAVA_OPTS += "-Xmx" + amMemory + "m"
javaOpts += "-Xmx" + amMemory + "m"

val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir
javaOpts += "-Djava.io.tmpdir=" + tmpDir

// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
Expand All @@ -325,11 +323,11 @@ trait ClientBase extends Logging {
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
JAVA_OPTS += "-XX:+UseConcMarkSweepGC"
JAVA_OPTS += "-XX:+CMSIncrementalMode"
JAVA_OPTS += "-XX:+CMSIncrementalPacing"
JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0"
JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
javaOpts += "-XX:+UseConcMarkSweepGC"
javaOpts += "-XX:+CMSIncrementalMode"
javaOpts += "-XX:+CMSIncrementalPacing"
javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}

// SPARK_JAVA_OPTS is deprecated, but for backwards compatibility:
Expand All @@ -344,22 +342,22 @@ trait ClientBase extends Logging {
// If we are being launched in client mode, forward the spark-conf options
// onto the executor launcher
for ((k, v) <- sparkConf.getAll) {
JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
} else {
// If we are being launched in standalone mode, capture and forward any spark
// system properties (e.g. set by spark-class).
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts)
sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p")
sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p")
}
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
javaOpts += ClientBase.getLog4jConfiguration(localResources)

// Command for the ApplicationMaster
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
JAVA_OPTS ++
javaOpts ++
Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
userArgsToString(args),
"--executor-memory", args.executorMemory.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{Logging, SparkConf}

Expand All @@ -46,30 +46,30 @@ trait ExecutorRunnableUtil extends Logging {
executorCores: Int,
localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
val JAVA_OPTS = ListBuffer[String]()
val javaOpts = ListBuffer[String]()
// Set the JVM memory
val executorMemoryString = executorMemory + "m"
JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "

// Set extra Java options for the executor, if defined
sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
JAVA_OPTS += opts
javaOpts += opts
}

JAVA_OPTS += "-Djava.io.tmpdir=" +
javaOpts += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
javaOpts += ClientBase.getLog4jConfiguration(localResources)

// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }

sparkConf.getAkkaConf.
foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
Expand All @@ -88,11 +88,11 @@ trait ExecutorRunnableUtil extends Logging {
// multi-tennent machines
// The options are based on
// http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
JAVA_OPTS += " -XX:+CMSIncrementalMode "
JAVA_OPTS += " -XX:+CMSIncrementalPacing "
JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
javaOpts += " -XX:+UseConcMarkSweepGC "
javaOpts += " -XX:+CMSIncrementalMode "
javaOpts += " -XX:+CMSIncrementalPacing "
javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
}
*/

Expand All @@ -104,7 +104,7 @@ trait ExecutorRunnableUtil extends Logging {
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
"-XX:OnOutOfMemoryError='kill %p'") ++
JAVA_OPTS ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
masterAddress.toString,
slaveId.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)

def this(sc: SparkContext) = this(sc, new Configuration())

// Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
// Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
// Subsequent creations are ignored - since nodes are already allocated by then.

// Nothing else for now ... initialize application master : which needs a SparkContext to
// determine how to allocate.
// Note that only the first creation of a SparkContext influences (and ideally, there must be
// only one SparkContext, right ?). Subsequent creations are ignored since executors are already
// allocated by then.

// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
Expand All @@ -48,6 +49,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
if (sparkContextInitialized){
ApplicationMaster.waitForInitialAllocations()
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(3000L)
}
Expand Down
Loading

0 comments on commit 2a4225d

Please sign in to comment.