From 997fbe893135e5a082aaa6cc319827bd23ab008d Mon Sep 17 00:00:00 2001 From: Raymond Liu Date: Tue, 27 Aug 2013 10:47:13 +0800 Subject: [PATCH] Add YarnClientClusterScheduler and Backend. With this scheduler, the user application is launched locally, While the executor will be launched by YARN on remote nodes. This enables spark-shell to run upon YARN. --- .../scala/org/apache/spark/SparkContext.scala | 25 ++ docs/running-on-yarn.md | 27 +- .../org/apache/spark/deploy/yarn/Client.scala | 14 +- .../spark/deploy/yarn/ClientArguments.scala | 32 ++- .../spark/deploy/yarn/WorkerLauncher.scala | 246 ++++++++++++++++++ .../cluster/YarnClientClusterScheduler.scala | 47 ++++ .../cluster/YarnClientSchedulerBackend.scala | 109 ++++++++ 7 files changed, 480 insertions(+), 20 deletions(-) create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala create mode 100644 yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala create mode 100644 yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 72540c712a..b14d88b50f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -209,6 +209,31 @@ class SparkContext( scheduler.initialize(backend) scheduler + case "yarn-client" => + val scheduler = try { + val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") + val cons = clazz.getConstructor(classOf[SparkContext]) + cons.newInstance(this).asInstanceOf[ClusterScheduler] + + } catch { + case th: Throwable => { + throw new SparkException("YARN mode not available ?", th) + } + } + + val backend = try { + val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") + val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext]) + cons.newInstance(scheduler, this).asInstanceOf[StandaloneSchedulerBackend] + } catch { + case th: Throwable => { + throw new SparkException("YARN mode not available ?", th) + } + } + + scheduler.initialize(backend) + scheduler + case _ => if (MESOS_REGEX.findFirstIn(master).isEmpty) { logWarning("Master %s does not match expected format, parsing as Mesos URL".format(master)) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index c611db0af4..d6dd2fa755 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -40,6 +40,10 @@ System Properties: Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster. This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager. +There are two scheduler mode that can be used to launch spark application on YARN. + +## Launch spark application by YARN Client with yarn-standalone mode. + The command to launch the YARN Client is as follows: SPARK_JAR= ./spark-class org.apache.spark.deploy.yarn.Client \ @@ -47,6 +51,7 @@ The command to launch the YARN Client is as follows: --class \ --args \ --num-workers \ + --master-class --master-memory \ --worker-memory \ --worker-cores \ @@ -76,10 +81,28 @@ For example: $ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout Pi is roughly 3.13794 -The above starts a YARN Client programs which periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running. +The above starts a YARN Client programs which start the default Application Master. Then SparkPi will be run as a child thread of Application Master, YARN Client will periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running. + +With this mode, your application is actually run on the remote machine where the Application Master is run upon. Thus application that involve local interaction will not work well, e.g. spark-shell. + +## Launch spark application with yarn-client mode. + +With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR + +In order to tune worker core/number/memory etc. You need to export SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_WORKER_INSTANCES e.g. by ./conf/spark-env.sh + +For example: + + SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \ + SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \ + ./run-example org.apache.spark.examples.SparkPi yarn-client + + + SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \ + SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \ + MASTER=yarn-client ./spark-shell # Important Notes -- When your application instantiates a Spark context it must use a special "yarn-standalone" master url. This starts the scheduler without forcing it to connect to a cluster. A good way to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the example above. - We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed. - The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 844c707834..46405ae5b7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -46,8 +46,9 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) val credentials = UserGroupInformation.getCurrentUser().getCredentials(); - - def run() { + + // for client user who want to monitor app status by itself. + def runApp() = { init(yarnConf) start() logClusterResourceDetails() @@ -66,11 +67,14 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) submitApp(appContext) - + appId + } + + def run() { + val appId = runApp() monitorApplication(appId) System.exit(0) } - def logClusterResourceDetails() { val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics @@ -255,7 +259,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val commands = List[String](javaCommand + " -server " + JAVA_OPTS + - " org.apache.spark.deploy.yarn.ApplicationMaster" + + " " + args.amClass + " --class " + args.userClass + " --jar " + args.userJar + userArgsToString(args) + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index cd651904d2..dc32b559ea 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -32,6 +32,7 @@ class ClientArguments(val args: Array[String]) { var numWorkers = 2 var amQueue = System.getProperty("QUEUE", "default") var amMemory: Int = 512 + var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" // TODO var inputFormatInfo: List[InputFormatInfo] = null @@ -58,18 +59,22 @@ class ClientArguments(val args: Array[String]) { userArgsBuffer += value args = tail - case ("--master-memory") :: MemoryParam(value) :: tail => - amMemory = value + case ("--master-class") :: value :: tail => + amClass = value args = tail - case ("--num-workers") :: IntParam(value) :: tail => - numWorkers = value + case ("--master-memory") :: MemoryParam(value) :: tail => + amMemory = value args = tail case ("--worker-memory") :: MemoryParam(value) :: tail => workerMemory = value args = tail + case ("--num-workers") :: IntParam(value) :: tail => + numWorkers = value + args = tail + case ("--worker-cores") :: IntParam(value) :: tail => workerCores = value args = tail @@ -100,15 +105,16 @@ class ClientArguments(val args: Array[String]) { System.err.println( "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Options:\n" + - " --jar JAR_PATH Path to your application's JAR file (required)\n" + - " --class CLASS_NAME Name of your application's main class (required)\n" + - " --args ARGS Arguments to be passed to your application's main class.\n" + - " Mutliple invocations are possible, each will be passed in order.\n" + - " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + - " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + - " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" + " --jar JAR_PATH Path to your application's JAR file (required)\n" + + " --class CLASS_NAME Name of your application's main class (required)\n" + + " --args ARGS Arguments to be passed to your application's main class.\n" + + " Mutliple invocations are possible, each will be passed in order.\n" + + " --num-workers NUM Number of workers to start (Default: 2)\n" + + " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + + " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" + + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" ) System.exit(exitCode) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala new file mode 100644 index 0000000000..c1295fbe98 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.Socket +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.api.protocolrecords._ +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.ipc.YarnRPC +import org.apache.hadoop.yarn.util.{ConverterUtils, Records} +import akka.actor._ +import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import akka.remote.RemoteClientShutdown +import akka.actor.Terminated +import akka.remote.RemoteClientDisconnected +import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.scheduler.SplitInfo + +class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging { + + def this(args: ApplicationMasterArguments) = this(args, new Configuration()) + + private val rpc: YarnRPC = YarnRPC.create(conf) + private var resourceManager: AMRMProtocol = null + private var appAttemptId: ApplicationAttemptId = null + private var reporterThread: Thread = null + private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + + private var yarnAllocator: YarnAllocationHandler = null + private var driverClosed:Boolean = false + + val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1 + var actor: ActorRef = null + + // This actor just working as a monitor to watch on Driver Actor. + class MonitorActor(driverUrl: String) extends Actor { + + var driver: ActorRef = null + + override def preStart() { + logInfo("Listen to driver: " + driverUrl) + driver = context.actorFor(driverUrl) + driver ! "hello" + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(driver) // Doesn't work with remote actors, but useful for testing + } + + override def receive = { + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + logInfo("Driver terminated or disconnected! Shutting down.") + driverClosed = true + } + } + + def run() { + + appAttemptId = getApplicationAttemptId() + resourceManager = registerWithResourceManager() + val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() + + // Compute number of threads for akka + val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() + + if (minimumMemory > 0) { + val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) + + if (numCore > 0) { + // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 + // TODO: Uncomment when hadoop is on a version which has this fixed. + // args.workerCores = numCore + } + } + + waitForSparkMaster() + + // Allocate all containers + allocateWorkers() + + // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout + // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. + + val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) + // must be <= timeoutInterval/ 2. + // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. + // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. + val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + reporterThread = launchReporterThread(interval) + + // Wait for the reporter thread to Finish. + reporterThread.join() + + finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) + actorSystem.shutdown() + + logInfo("Exited") + System.exit(0) + } + + private def getApplicationAttemptId(): ApplicationAttemptId = { + val envs = System.getenv() + val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV) + val containerId = ConverterUtils.toContainerId(containerIdString) + val appAttemptId = containerId.getApplicationAttemptId() + logInfo("ApplicationAttemptId: " + appAttemptId) + return appAttemptId + } + + private def registerWithResourceManager(): AMRMProtocol = { + val rmAddress = NetUtils.createSocketAddr(yarnConf.get( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)) + logInfo("Connecting to ResourceManager at " + rmAddress) + return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] + } + + private def registerApplicationMaster(): RegisterApplicationMasterResponse = { + logInfo("Registering the ApplicationMaster") + val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest]) + .asInstanceOf[RegisterApplicationMasterRequest] + appMasterRequest.setApplicationAttemptId(appAttemptId) + // Setting this to master host,port - so that the ApplicationReport at client has some sensible info. + // Users can then monitor stderr/stdout on that node if required. + appMasterRequest.setHost(Utils.localHostName()) + appMasterRequest.setRpcPort(0) + // What do we provide here ? Might make sense to expose something sensible later ? + appMasterRequest.setTrackingUrl("") + return resourceManager.registerApplicationMaster(appMasterRequest) + } + + private def waitForSparkMaster() { + logInfo("Waiting for spark driver to be reachable.") + var driverUp = false + val hostport = args.userArgs(0) + val (driverHost, driverPort) = Utils.parseHostPort(hostport) + while(!driverUp) { + try { + val socket = new Socket(driverHost, driverPort) + socket.close() + logInfo("Master now available: " + driverHost + ":" + driverPort) + driverUp = true + } catch { + case e: Exception => + logError("Failed to connect to driver at " + driverHost + ":" + driverPort) + Thread.sleep(100) + } + } + System.setProperty("spark.driver.host", driverHost) + System.setProperty("spark.driver.port", driverPort.toString) + + val driverUrl = "akka://spark@%s:%s/user/%s".format( + driverHost, driverPort.toString, StandaloneSchedulerBackend.ACTOR_NAME) + + actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") + } + + + private def allocateWorkers() { + + // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map() + + yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData) + + logInfo("Allocating " + args.numWorkers + " workers.") + // Wait until all containers have finished + // TODO: This is a bit ugly. Can we make it nicer? + // TODO: Handle container failure + while(yarnAllocator.getNumWorkersRunning < args.numWorkers) { + yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + Thread.sleep(100) + } + + logInfo("All workers have launched.") + + } + + // TODO: We might want to extend this to allocate more containers in case they die ! + private def launchReporterThread(_sleepTime: Long): Thread = { + val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + + val t = new Thread { + override def run() { + while (!driverClosed) { + val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning + if (missingWorkerCount > 0) { + logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") + yarnAllocator.allocateContainers(missingWorkerCount) + } + else sendProgress() + Thread.sleep(sleepTime) + } + } + } + // setting to daemon status, though this is usually not a good idea. + t.setDaemon(true) + t.start() + logInfo("Started progress reporter thread - sleep time : " + sleepTime) + return t + } + + private def sendProgress() { + logDebug("Sending progress") + // simulated with an allocate request with no nodes requested ... + yarnAllocator.allocateContainers(0) + } + + def finishApplicationMaster(status: FinalApplicationStatus) { + + logInfo("finish ApplicationMaster with " + status) + val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) + .asInstanceOf[FinishApplicationMasterRequest] + finishReq.setAppAttemptId(appAttemptId) + finishReq.setFinishApplicationStatus(status) + resourceManager.finishApplicationMaster(finishReq) + } + +} + + +object WorkerLauncher { + def main(argStrings: Array[String]) { + val args = new ApplicationMasterArguments(argStrings) + new WorkerLauncher(args).run() + } +} diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala new file mode 100644 index 0000000000..63a0449e5a --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark._ +import org.apache.hadoop.conf.Configuration +import org.apache.spark.deploy.yarn.YarnAllocationHandler +import org.apache.spark.util.Utils + +/** + * + * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. + */ +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { + + def this(sc: SparkContext) = this(sc, new Configuration()) + + // By default, rack is unknown + override def getRackForHost(hostPort: String): Option[String] = { + val host = Utils.parseHostPort(hostPort)._1 + val retval = YarnAllocationHandler.lookupRack(conf, host) + if (retval != null) Some(retval) else None + } + + override def postStartHook() { + + // The yarn application is running, but the worker might not yet ready + // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt + Thread.sleep(2000L) + logInfo("YarnClientClusterScheduler.postStartHook done") + } +} diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala new file mode 100644 index 0000000000..98badc2dc4 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} +import org.apache.spark.{SparkException, Logging, SparkContext} +import org.apache.spark.deploy.yarn.{Client, ClientArguments} + +private[spark] class YarnClientSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext) + extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + var client: Client = null + var appId: ApplicationId = null + + override def start() { + super.start() + + val defalutWorkerCores = "2" + val defalutWorkerMemory = "512m" + val defaultWorkerNumber = "1" + + val userJar = System.getenv("SPARK_YARN_APP_JAR") + var workerCores = System.getenv("SPARK_WORKER_CORES") + var workerMemory = System.getenv("SPARK_WORKER_MEMORY") + var workerNumber = System.getenv("SPARK_WORKER_INSTANCES") + + if (userJar == null) + throw new SparkException("env SPARK_YARN_APP_JAR is not set") + + if (workerCores == null) + workerCores = defalutWorkerCores + if (workerMemory == null) + workerMemory = defalutWorkerMemory + if (workerNumber == null) + workerNumber = defaultWorkerNumber + + val driverHost = System.getProperty("spark.driver.host") + val driverPort = System.getProperty("spark.driver.port") + val hostport = driverHost + ":" + driverPort + + val argsArray = Array[String]( + "--class", "notused", + "--jar", userJar, + "--args", hostport, + "--worker-memory", workerMemory, + "--worker-cores", workerCores, + "--num-workers", workerNumber, + "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" + ) + + val args = new ClientArguments(argsArray) + client = new Client(args) + appId = client.runApp() + waitForApp() + } + + def waitForApp() { + + // TODO : need a better way to find out whether the workers are ready or not + // maybe by resource usage report? + while(true) { + val report = client.getApplicationReport(appId) + + logInfo("Application report from ASM: \n" + + "\t appMasterRpcPort: " + report.getRpcPort() + "\n" + + "\t appStartTime: " + report.getStartTime() + "\n" + + "\t yarnAppState: " + report.getYarnApplicationState() + "\n" + ) + + // Ready to go, or already gone. + val state = report.getYarnApplicationState() + if (state == YarnApplicationState.RUNNING) { + return + } else if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + throw new SparkException("Yarn application already ended," + + "might be killed or not able to launch application master.") + } + + Thread.sleep(1000) + } + } + + override def stop() { + super.stop() + client.stop() + logInfo("Stoped") + } + +}