From 2a4225dd944441d3f735625bb6bae6fca8fd06ca Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 11 Jun 2014 07:57:28 -0500 Subject: [PATCH] SPARK-1639. Tidy up some Spark on YARN code This contains a bunch of small tidyings of the Spark on YARN code. I focused on the yarn stable code. @tgravescs, let me know if you'd like me to make these for the alpha code as well. Author: Sandy Ryza Closes #561 from sryza/sandy-spark-1639 and squashes the following commits: 72b6a02 [Sandy Ryza] Fix comment and set name on driver thread c2190b2 [Sandy Ryza] SPARK-1639. Tidy up some Spark on YARN code --- .../spark/deploy/yarn/ApplicationMaster.scala | 16 +- .../apache/spark/deploy/yarn/ClientBase.scala | 38 ++-- .../deploy/yarn/ExecutorRunnableUtil.scala | 28 +-- .../cluster/YarnClusterScheduler.scala | 10 +- .../spark/deploy/yarn/ApplicationMaster.scala | 197 +++++++++--------- .../org/apache/spark/deploy/yarn/Client.scala | 10 +- .../spark/deploy/yarn/ExecutorLauncher.scala | 40 ++-- 7 files changed, 161 insertions(+), 178 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8f0ecb855718e..1cc9c33cd2d02 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -277,7 +277,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, yarnAllocator.allocateContainers( math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) ApplicationMaster.incrementAllocatorLoop(1) - Thread.sleep(100) + Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, @@ -416,6 +416,7 @@ object ApplicationMaster { // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. private val ALLOCATOR_LOOP_WAIT_COUNT = 30 + private val ALLOCATE_HEARTBEAT_INTERVAL = 100 def incrementAllocatorLoop(by: Int) { val count = yarnAllocatorLoop.getAndAdd(by) @@ -467,13 +468,22 @@ object ApplicationMaster { }) } - // Wait for initialization to complete and atleast 'some' nodes can get allocated. + modified + } + + + /** + * Returns when we've either + * 1) received all the requested executors, + * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms, + * 3) hit an error that causes us to terminate trying to get containers. + */ + def waitForInitialAllocations() { yarnAllocatorLoop.synchronized { while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { yarnAllocatorLoop.wait(1000L) } } - modified } def main(argStrings: Array[String]) { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 801e8b381588f..29a35680c0e72 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn import java.io.File import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} -import java.nio.ByteBuffer import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, ListBuffer, Map} @@ -37,7 +36,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.{Apps, Records} +import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SparkConf, SparkContext} /** @@ -169,14 +168,13 @@ trait ClientBase extends Logging { destPath } - def qualifyForLocal(localURI: URI): Path = { + private def qualifyForLocal(localURI: URI): Path = { var qualifiedURI = localURI - // If not specified assume these are in the local filesystem to keep behavior like Hadoop + // If not specified, assume these are in the local filesystem to keep behavior like Hadoop if (qualifiedURI.getScheme() == null) { qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString) } - val qualPath = new Path(qualifiedURI) - qualPath + new Path(qualifiedURI) } def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = { @@ -305,13 +303,13 @@ trait ClientBase extends Logging { val amMemory = calculateAMMemory(newApp) - val JAVA_OPTS = ListBuffer[String]() + val javaOpts = ListBuffer[String]() // Add Xmx for AM memory - JAVA_OPTS += "-Xmx" + amMemory + "m" + javaOpts += "-Xmx" + amMemory + "m" val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) - JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir + javaOpts += "-Djava.io.tmpdir=" + tmpDir // TODO: Remove once cpuset version is pushed out. // The context is, default gc for server class machines ends up using all cores to do gc - @@ -325,11 +323,11 @@ trait ClientBase extends Logging { if (useConcurrentAndIncrementalGC) { // In our expts, using (default) throughput collector has severe perf ramifications in // multi-tenant machines - JAVA_OPTS += "-XX:+UseConcMarkSweepGC" - JAVA_OPTS += "-XX:+CMSIncrementalMode" - JAVA_OPTS += "-XX:+CMSIncrementalPacing" - JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0" - JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10" + javaOpts += "-XX:+UseConcMarkSweepGC" + javaOpts += "-XX:+CMSIncrementalMode" + javaOpts += "-XX:+CMSIncrementalPacing" + javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" + javaOpts += "-XX:CMSIncrementalDutyCycle=10" } // SPARK_JAVA_OPTS is deprecated, but for backwards compatibility: @@ -344,22 +342,22 @@ trait ClientBase extends Logging { // If we are being launched in client mode, forward the spark-conf options // onto the executor launcher for ((k, v) <- sparkConf.getAll) { - JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" + javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } } else { // If we are being launched in standalone mode, capture and forward any spark // system properties (e.g. set by spark-class). for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) { - JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" + javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } - sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts) - sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p") + sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts) + sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p") } - JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources) + javaOpts += ClientBase.getLog4jConfiguration(localResources) // Command for the ApplicationMaster val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ - JAVA_OPTS ++ + javaOpts ++ Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar, userArgsToString(args), "--executor-memory", args.executorMemory.toString, diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 32f8861dc9503..43dbb2464f929 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} +import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{Logging, SparkConf} @@ -46,19 +46,19 @@ trait ExecutorRunnableUtil extends Logging { executorCores: Int, localResources: HashMap[String, LocalResource]): List[String] = { // Extra options for the JVM - val JAVA_OPTS = ListBuffer[String]() + val javaOpts = ListBuffer[String]() // Set the JVM memory val executorMemoryString = executorMemory + "m" - JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " + javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " // Set extra Java options for the executor, if defined sys.props.get("spark.executor.extraJavaOptions").foreach { opts => - JAVA_OPTS += opts + javaOpts += opts } - JAVA_OPTS += "-Djava.io.tmpdir=" + + javaOpts += "-Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) - JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources) + javaOpts += ClientBase.getLog4jConfiguration(localResources) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend @@ -66,10 +66,10 @@ trait ExecutorRunnableUtil extends Logging { // authentication settings. sparkConf.getAll. filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. - foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" } + foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } sparkConf.getAkkaConf. - foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" } + foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. @@ -88,11 +88,11 @@ trait ExecutorRunnableUtil extends Logging { // multi-tennent machines // The options are based on // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline - JAVA_OPTS += " -XX:+UseConcMarkSweepGC " - JAVA_OPTS += " -XX:+CMSIncrementalMode " - JAVA_OPTS += " -XX:+CMSIncrementalPacing " - JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 " - JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " + javaOpts += " -XX:+UseConcMarkSweepGC " + javaOpts += " -XX:+CMSIncrementalMode " + javaOpts += " -XX:+CMSIncrementalPacing " + javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 " + javaOpts += " -XX:CMSIncrementalDutyCycle=10 " } */ @@ -104,7 +104,7 @@ trait ExecutorRunnableUtil extends Logging { // TODO: If the OOM is not recoverable by rescheduling it on different node, then do // 'something' to fail job ... akin to blacklisting trackers in mapred ? "-XX:OnOutOfMemoryError='kill %p'") ++ - JAVA_OPTS ++ + javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", masterAddress.toString, slaveId.toString, diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index a4638cc863611..39cdd2e8a522b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -33,10 +33,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) def this(sc: SparkContext) = this(sc, new Configuration()) - // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate - // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?) - // Subsequent creations are ignored - since nodes are already allocated by then. - + // Nothing else for now ... initialize application master : which needs a SparkContext to + // determine how to allocate. + // Note that only the first creation of a SparkContext influences (and ideally, there must be + // only one SparkContext, right ?). Subsequent creations are ignored since executors are already + // allocated by then. // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { @@ -48,6 +49,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) override def postStartHook() { val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc) if (sparkContextInitialized){ + ApplicationMaster.waitForInitialAllocations() // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt Thread.sleep(3000L) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 33a60d978c586..6244332f23737 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -19,13 +19,12 @@ package org.apache.spark.deploy.yarn import java.io.IOException import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} +import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.net.NetUtils import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.protocolrecords._ @@ -33,8 +32,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.util.{ConverterUtils, Records} +import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} @@ -77,17 +75,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // than user specified and /tmp. System.setProperty("spark.local.dir", getLocalDirs()) - // set the web ui port to be ephemeral for yarn so we don't conflict with + // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty("spark.ui.port", "0") - // when running the AM, the Spark master is always "yarn-cluster" + // When running the AM, the Spark master is always "yarn-cluster" System.setProperty("spark.master", "yarn-cluster") - // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using. + // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) - appAttemptId = getApplicationAttemptId() + appAttemptId = ApplicationMaster.getApplicationAttemptId() + logInfo("ApplicationAttemptId: " + appAttemptId) isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) @@ -99,7 +98,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ApplicationMaster.register(this) // Call this to force generation of secret so it gets populated into the - // hadoop UGI. This has to happen before the startUserClass which does a + // Hadoop UGI. This has to happen before the startUserClass which does a // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) @@ -121,7 +120,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // Allocate all containers allocateExecutors() - // Wait for the user class to Finish + // Launch thread that will heartbeat to the RM so it won't think the app has died. + launchReporterThread() + + // Wait for the user class to finish userThread.join() System.exit(0) @@ -141,7 +143,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) } - /** Get the Yarn approved local directories. */ + // Get the Yarn approved local directories. private def getLocalDirs(): String = { // Hadoop 0.23 and 2.x have different Environment variable names for the // local dirs, so lets check both. We assume one of the 2 is set. @@ -150,18 +152,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, .orElse(Option(System.getenv("LOCAL_DIRS"))) localDirs match { - case None => throw new Exception("Yarn Local dirs can't be empty") + case None => throw new Exception("Yarn local dirs can't be empty") case Some(l) => l } - } - - private def getApplicationAttemptId(): ApplicationAttemptId = { - val envs = System.getenv() - val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()) - val containerId = ConverterUtils.toContainerId(containerIdString) - val appAttemptId = containerId.getApplicationAttemptId() - logInfo("ApplicationAttemptId: " + appAttemptId) - appAttemptId } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { @@ -173,25 +166,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( args.userClass, - false /* initialize */ , + false, Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { - - var successed = false + var succeeded = false try { // Copy - var mainArgs: Array[String] = new Array[String](args.userArgs.size) + val mainArgs = new Array[String](args.userArgs.size) args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) mainMethod.invoke(null, mainArgs) - // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR - // userThread will stop here unless it has uncaught exception thrown out - // It need shutdown hook to set SUCCEEDED - successed = true + // Some apps have "System.exit(0)" at the end. The user thread will stop here unless + // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. + succeeded = true } finally { - logDebug("finishing main") + logDebug("Finishing main") isLastAMRetry = true - if (successed) { + if (succeeded) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED) @@ -199,11 +190,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } } + t.setName("Driver") t.start() t } - // This need to happen before allocateExecutors() + // This needs to happen before allocateExecutors() private def waitForSparkContextInitialized() { logInfo("Waiting for Spark context initialization") try { @@ -231,7 +223,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkContext.preferredNodeLocationData, sparkContext.getConf) } else { - logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d". + logWarning("Unable to retrieve SparkContext in spite of waiting for %d, maxNumTries = %d". format(numTries * waitTime, maxNumTries)) this.yarnAllocator = YarnAllocationHandler.newAllocator( yarnConf, @@ -242,48 +234,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } } finally { - // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT : - // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks. - ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) + // In case of exceptions, etc - ensure that the loop in + // ApplicationMaster#sparkContextInitialized() breaks. + ApplicationMaster.doneWithInitialAllocations() } } private def allocateExecutors() { try { - logInfo("Allocating " + args.numExecutors + " executors.") - // Wait until all containers have finished + logInfo("Requesting" + args.numExecutors + " executors.") + // Wait until all containers have launched yarnAllocator.addResourceRequests(args.numExecutors) yarnAllocator.allocateResources() // Exits the loop if the user thread exits. + + var iters = 0 while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { checkNumExecutorsFailed() allocateMissingExecutor() yarnAllocator.allocateResources() - ApplicationMaster.incrementAllocatorLoop(1) - Thread.sleep(100) + if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) { + ApplicationMaster.doneWithInitialAllocations() + } + Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) + iters += 1 } } finally { - // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, - // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. - ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) + // In case of exceptions, etc - ensure that the loop in + // ApplicationMaster#sparkContextInitialized() breaks. + ApplicationMaster.doneWithInitialAllocations() } logInfo("All executors have launched.") - - // Launch a progress reporter thread, else the app will get killed after expiration - // (def: 10mins) timeout. - if (userThread.isAlive) { - // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. - val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - - // we want to be reasonably responsive without causing too many requests to RM. - val schedulerInterval = - sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) - - // must be <= timeoutInterval / 2. - val interval = math.min(timeoutInterval / 2, schedulerInterval) - - launchReporterThread(interval) - } } private def allocateMissingExecutor() { @@ -303,47 +284,35 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } - private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime + private def launchReporterThread(): Thread = { + // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. + val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) + + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) + + // must be <= timeoutInterval / 2. + val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval)) val t = new Thread { override def run() { while (userThread.isAlive) { checkNumExecutorsFailed() allocateMissingExecutor() - sendProgress() - Thread.sleep(sleepTime) + logDebug("Sending progress") + yarnAllocator.allocateResources() + Thread.sleep(interval) } } } // Setting to daemon status, though this is usually not a good idea. t.setDaemon(true) t.start() - logInfo("Started progress reporter thread - sleep time : " + sleepTime) + logInfo("Started progress reporter thread - heartbeat interval : " + interval) t } - private def sendProgress() { - logDebug("Sending progress") - // Simulated with an allocate request with no nodes requested. - yarnAllocator.allocateResources() - } - - /* - def printContainers(containers: List[Container]) = { - for (container <- containers) { - logInfo("Launching shell command on a new container." - + ", containerId=" + container.getId() - + ", containerNode=" + container.getNodeId().getHost() - + ":" + container.getNodeId().getPort() - + ", containerNodeURI=" + container.getNodeHttpAddress() - + ", containerState" + container.getState() - + ", containerResourceMemory" - + container.getResource().getMemory()) - } - } - */ - def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") { synchronized { if (isFinished) { @@ -351,7 +320,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } isFinished = true - logInfo("finishApplicationMaster with " + status) + logInfo("Unregistering ApplicationMaster with " + status) if (registered) { val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl) @@ -386,7 +355,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, def run() { logInfo("AppMaster received a signal.") - // we need to clean up staging dir before HDFS is shut down + // We need to clean up staging dir before HDFS is shut down // make sure we don't delete it until this is the last AM if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() } @@ -401,21 +370,24 @@ object ApplicationMaster { // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. private val ALLOCATOR_LOOP_WAIT_COUNT = 30 + private val ALLOCATE_HEARTBEAT_INTERVAL = 100 private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]() val sparkContextRef: AtomicReference[SparkContext] = - new AtomicReference[SparkContext](null /* initialValue */) + new AtomicReference[SparkContext](null) - val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0) + // Variable used to notify the YarnClusterScheduler that it should stop waiting + // for the initial set of executors to be started and get on with its business. + val doneWithInitialAllocationsMonitor = new Object() - def incrementAllocatorLoop(by: Int) { - val count = yarnAllocatorLoop.getAndAdd(by) - if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { - yarnAllocatorLoop.synchronized { - // to wake threads off wait ... - yarnAllocatorLoop.notifyAll() - } + @volatile var isDoneWithInitialAllocations = false + + def doneWithInitialAllocations() { + isDoneWithInitialAllocations = true + doneWithInitialAllocationsMonitor.synchronized { + // to wake threads off wait ... + doneWithInitialAllocationsMonitor.notifyAll() } } @@ -423,7 +395,10 @@ object ApplicationMaster { applicationMasters.add(master) } - // TODO(harvey): See whether this should be discarded - it isn't used anywhere atm... + /** + * Called from YarnClusterScheduler to notify the AM code that a SparkContext has been + * initialized in the user code. + */ def sparkContextInitialized(sc: SparkContext): Boolean = { var modified = false sparkContextRef.synchronized { @@ -431,7 +406,7 @@ object ApplicationMaster { sparkContextRef.notifyAll() } - // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do + // Add a shutdown hook - as a best effort in case users do not call sc.stop or do // System.exit. // Should not really have to do this, but it helps YARN to evict resources earlier. // Not to mention, prevent the Client from declaring failure even though we exited properly. @@ -454,13 +429,29 @@ object ApplicationMaster { }) } - // Wait for initialization to complete and atleast 'some' nodes can get allocated. - yarnAllocatorLoop.synchronized { - while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { - yarnAllocatorLoop.wait(1000L) + // Wait for initialization to complete and at least 'some' nodes to get allocated. + modified + } + + /** + * Returns when we've either + * 1) received all the requested executors, + * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms, + * 3) hit an error that causes us to terminate trying to get containers. + */ + def waitForInitialAllocations() { + doneWithInitialAllocationsMonitor.synchronized { + while (!isDoneWithInitialAllocations) { + doneWithInitialAllocationsMonitor.wait(1000L) } } - modified + } + + def getApplicationAttemptId(): ApplicationAttemptId = { + val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) + val containerId = ConverterUtils.toContainerId(containerIdString) + val appAttemptId = containerId.getApplicationAttemptId() + appAttemptId } def main(argStrings: Array[String]) { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 393edd1f2d670..24027618c1f35 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,14 +21,12 @@ import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.util.{Apps, Records} +import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SparkConf} @@ -102,7 +100,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def logClusterResourceDetails() { val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics - logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " + + logInfo("Got Cluster metric info from ResourceManager, number of NodeManagers: " + clusterMetrics.getNumNodeManagers) val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue) @@ -133,7 +131,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def submitApp(appContext: ApplicationSubmissionContext) = { // Submit the application to the applications manager. - logInfo("Submitting application to ASM") + logInfo("Submitting application to ResourceManager") yarnClient.submitApplication(appContext) } @@ -149,7 +147,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa Thread.sleep(interval) val report = yarnClient.getApplicationReport(appId) - logInfo("Application report from ASM: \n" + + logInfo("Application report from ResourceManager: \n" + "\t application identifier: " + appId.toString() + "\n" + "\t appId: " + appId.getId() + "\n" + "\t clientToAMToken: " + report.getClientToAMToken() + "\n" + diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index d93e5bb0225d5..4f8854a09a1e5 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -72,8 +72,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp override def preStart() { logInfo("Listen to driver: " + driverUrl) driver = context.actorSelection(driverUrl) - // Send a hello message thus the connection is actually established, - // thus we can monitor Lifecycle Events. + // Send a hello message to establish the connection, after which + // we can monitor Lifecycle Events. driver ! "Hello" context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } @@ -95,7 +95,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp amClient.init(yarnConf) amClient.start() - appAttemptId = getApplicationAttemptId() + appAttemptId = ApplicationMaster.getApplicationAttemptId() registerApplicationMaster() waitForSparkMaster() @@ -141,18 +141,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } } - private def getApplicationAttemptId(): ApplicationAttemptId = { - val envs = System.getenv() - val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()) - val containerId = ConverterUtils.toContainerId(containerIdString) - val appAttemptId = containerId.getApplicationAttemptId() - logInfo("ApplicationAttemptId: " + appAttemptId) - appAttemptId - } - private def registerApplicationMaster(): RegisterApplicationMasterResponse = { logInfo("Registering the ApplicationMaster") - // TODO:(Raymond) Find out Spark UI address and fill in here? + // TODO: Find out client's Spark UI address and fill in here? amClient.registerApplicationMaster(Utils.localHostName(), 0, "") } @@ -185,8 +176,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp private def allocateExecutors() { - - // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. + // TODO: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map() @@ -198,8 +188,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp preferredNodeLocationData, sparkConf) - logInfo("Allocating " + args.numExecutors + " executors.") - // Wait until all containers have finished + logInfo("Requesting " + args.numExecutors + " executors.") + // Wait until all containers have launched yarnAllocator.addResourceRequests(args.numExecutors) yarnAllocator.allocateResources() while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { @@ -221,7 +211,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } } - // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime @@ -229,7 +218,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp override def run() { while (!driverClosed) { allocateMissingExecutor() - sendProgress() + logDebug("Sending progress") + yarnAllocator.allocateResources() Thread.sleep(sleepTime) } } @@ -241,20 +231,14 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp t } - private def sendProgress() { - logDebug("Sending progress") - // simulated with an allocate request with no nodes requested ... - yarnAllocator.allocateResources() - } - def finishApplicationMaster(status: FinalApplicationStatus) { - logInfo("finish ApplicationMaster with " + status) - amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) + logInfo("Unregistering ApplicationMaster with " + status) + val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") + amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl) } } - object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings)