From 83096610ca370644f9466d25c5c3df1137e6f8b3 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Mon, 15 Jul 2019 13:10:50 +0000 Subject: [PATCH] parse and pass spark.executor.extraJavaOptions to executors --- .../cluster/nomad/ExecutorTask.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/ExecutorTask.scala b/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/ExecutorTask.scala index 05f5188c68a77..28069b2923bde 100644 --- a/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/ExecutorTask.scala +++ b/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/ExecutorTask.scala @@ -21,6 +21,7 @@ import com.hashicorp.nomad.apimodel.Task import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.config.EXECUTOR_MEMORY +import org.apache.spark.util.Utils private[spark] object ExecutorTask extends SparkNomadTaskType("executor", "executor", EXECUTOR_MEMORY) { @@ -48,8 +49,7 @@ private[spark] object ExecutorTask conf.getExecutorEnv.foreach((task.addEnv _).tupled) - val executorConf: Seq[(String, String)] = { - + val executorConf = { val explicitConf = Seq( "spark.executor.port" -> executorPort.placeholder, "spark.blockManager.port" -> blockManagerPort.placeholder @@ -61,14 +61,17 @@ private[spark] object ExecutorTask ) && !PROPERTIES_NOT_TO_FORWARD_TO_EXECUTOR.contains(name) } - explicitConf ++ forwardedConf + (explicitConf ++ forwardedConf.toSeq).map { + case (k, v) => s"-D$k=$v" + .replaceAllLiterally("\\", "\\\\") + .replaceAllLiterally("\"", "\\\"") + }.map('"' + _ + '"') } - task.addEnv("SPARK_EXECUTOR_OPTS", - executorConf.map { case (k, v) => s"-D$k=$v" - .replaceAllLiterally("\\", "\\\\") - .replaceAllLiterally("\"", "\\\"") - }.map('"' + _ + '"').mkString(" ")) + val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions") + .map(Utils.splitCommandString).getOrElse(Seq.empty) + + task.addEnv("SPARK_EXECUTOR_OPTS", (extraJavaOpts ++ executorConf).mkString(" ")) task.addEnv("SPARK_EXECUTOR_MEMORY", jvmMemory(conf, task))