Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For SPARK-527, Support spark-shell when running on YARN #868

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,31 @@ class SparkContext(
scheduler.initialize(backend)
scheduler

case "yarn-client" =>
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(this).asInstanceOf[ClusterScheduler]

} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
}
}

val backend = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
cons.newInstance(scheduler, this).asInstanceOf[StandaloneSchedulerBackend]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
}
}

scheduler.initialize(backend)
scheduler

case _ =>
if (MESOS_REGEX.findFirstIn(master).isEmpty) {
logWarning("Master %s does not match expected format, parsing as Mesos URL".format(master))
Expand Down
27 changes: 25 additions & 2 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@ System Properties:
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager.

There are two scheduler mode that can be used to launch spark application on YARN.

## Launch spark application by YARN Client with yarn-standalone mode.

The command to launch the YARN Client is as follows:

SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
--num-workers <NUMBER_OF_WORKER_MACHINES> \
--master-class <ApplicationMaster_CLASS>
--master-memory <MEMORY_FOR_MASTER> \
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
Expand Down Expand Up @@ -76,10 +81,28 @@ For example:
$ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout
Pi is roughly 3.13794

The above starts a YARN Client programs which periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running.
The above starts a YARN Client programs which start the default Application Master. Then SparkPi will be run as a child thread of Application Master, YARN Client will periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running.

With this mode, your application is actually run on the remote machine where the Application Master is run upon. Thus application that involve local interaction will not work well, e.g. spark-shell.

## Launch spark application with yarn-client mode.

With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR

In order to tune worker core/number/memory etc. You need to export SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_WORKER_INSTANCES e.g. by ./conf/spark-env.sh

For example:

SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
./run-example org.apache.spark.examples.SparkPi yarn-client


SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
MASTER=yarn-client ./spark-shell

# Important Notes

- When your application instantiates a Spark context it must use a special "yarn-standalone" master url. This starts the scheduler without forcing it to connect to a cluster. A good way to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the example above.
- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
14 changes: 9 additions & 5 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val credentials = UserGroupInformation.getCurrentUser().getCredentials();

def run() {

// for client user who want to monitor app status by itself.
def runApp() = {
init(yarnConf)
start()
logClusterResourceDetails()
Expand All @@ -66,11 +67,14 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())

submitApp(appContext)

appId
}

def run() {
val appId = runApp()
monitorApplication(appId)
System.exit(0)
}


def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
Expand Down Expand Up @@ -255,7 +259,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
" org.apache.spark.deploy.yarn.ApplicationMaster" +
" " + args.amClass +
" --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ClientArguments(val args: Array[String]) {
var numWorkers = 2
var amQueue = System.getProperty("QUEUE", "default")
var amMemory: Int = 512
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
// TODO
var inputFormatInfo: List[InputFormatInfo] = null

Expand All @@ -58,18 +59,22 @@ class ClientArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail

case ("--master-memory") :: MemoryParam(value) :: tail =>
amMemory = value
case ("--master-class") :: value :: tail =>
amClass = value
args = tail

case ("--num-workers") :: IntParam(value) :: tail =>
numWorkers = value
case ("--master-memory") :: MemoryParam(value) :: tail =>
amMemory = value
args = tail

case ("--worker-memory") :: MemoryParam(value) :: tail =>
workerMemory = value
args = tail

case ("--num-workers") :: IntParam(value) :: tail =>
numWorkers = value
args = tail

case ("--worker-cores") :: IntParam(value) :: tail =>
workerCores = value
args = tail
Expand Down Expand Up @@ -100,15 +105,16 @@ class ClientArguments(val args: Array[String]) {
System.err.println(
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
)
System.exit(exitCode)
}
Expand Down
Loading