diff --git a/jubatusonyarn/build.sbt b/jubatusonyarn/build.sbt index 2fc734c..256d5d5 100644 --- a/jubatusonyarn/build.sbt +++ b/jubatusonyarn/build.sbt @@ -10,7 +10,7 @@ val SCALA_VERSION = "2.10.4" val VERSION = "1.1" val JUBATUS_DEPENDENCIES = Seq( - ("us.jubat" % "jubatus" % "0.7.1").exclude("org.jboss.netty", "netty"), + ("us.jubat" % "jubatus" % "0.8.0").exclude("org.jboss.netty", "netty"), "org.apache.hadoop" % "hadoop-common" % "2.3.0-cdh5.1.3" % "provided", "org.apache.hadoop" % "hadoop-hdfs" % "2.3.0-cdh5.1.3" % "provided", "org.apache.hadoop" % "hadoop-yarn-client" % "2.3.0-cdh5.1.3" % "provided" diff --git a/jubatusonyarn/jubatus-on-yarn-application-master/src/main/scala/us/jubat/yarn/applicationmaster/ApplicationMaster.scala b/jubatusonyarn/jubatus-on-yarn-application-master/src/main/scala/us/jubat/yarn/applicationmaster/ApplicationMaster.scala index dfa966a..516d92b 100644 --- a/jubatusonyarn/jubatus-on-yarn-application-master/src/main/scala/us/jubat/yarn/applicationmaster/ApplicationMaster.scala +++ b/jubatusonyarn/jubatus-on-yarn-application-master/src/main/scala/us/jubat/yarn/applicationmaster/ApplicationMaster.scala @@ -38,12 +38,13 @@ class ApplicationMaster extends HasLogger { private def containerJarPath(aBasePath: Path): Path = new Path(aBasePath, "container/jubatus-on-yarn-container.jar") private val containerJarName: String = "jubatus-on-yarn-container.jar" private val containerMainClass: String = "us.jubat.yarn.container.ContainerApp" - private val containerMemory: Int = 128 private val mYarnConfig = new YarnConfiguration() // 終了までブロックする def run(aParams: ApplicationMasterParams, aApplicationMasterPort: Int): FinalApplicationStatus = { + logger.debug(s"ApplicationMasterParams (${aParams.toString()})") + val tHandler = new ApplicationMasterHandler(aParams, aApplicationMasterPort) val tResourceManager = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](500, tHandler) @@ -56,9 +57,18 @@ class ApplicationMaster extends HasLogger { tPriority.setPriority(aParams.priority) val tResource = Records.newRecord(classOf[Resource]) - tResource.setMemory(aParams.memory + containerMemory) + tResource.setMemory(aParams.memory + aParams.containerMemory) tResource.setVirtualCores(aParams.virtualCores) + var containerNodes: Array[String] = null + var containerRacks: Array[String] = null + if (!aParams.containerNodes.isEmpty()) { + containerNodes = aParams.containerNodes.split(",").toArray[String] + } + if (!aParams.containerRacks.isEmpty()) { + containerRacks = aParams.containerRacks.split(",").toArray[String] + } + // コンテナを起動 (1 to aParams.nodes).foreach { _ => logger.info( @@ -67,7 +77,9 @@ class ApplicationMaster extends HasLogger { + s"\tmemory: ${tResource.getMemory}\n" + s"\tvirtualCores: ${tResource.getVirtualCores}" ) - tResourceManager.addContainerRequest(new ContainerRequest(tResource, null, null, tPriority)) + val containerReq = new ContainerRequest(tResource, containerNodes, containerRacks, tPriority) + tResourceManager.addContainerRequest(containerReq) + logger.debug(s"ContainerRequest( Nodes:${containerReq.getNodes}, Racks:${containerReq.getRacks}") } // コンテナの終了を待機 @@ -140,7 +152,7 @@ class ApplicationMaster extends HasLogger { s"bash $entryScriptName" + s" $containerJarName" + s" $containerMainClass" - + s" $containerMemory" + + s" ${aParams.containerMemory}" // jar にわたす + s" ${aParams.applicationName}" // --application-name @@ -152,6 +164,13 @@ class ApplicationMaster extends HasLogger { + s" ${aParams.learningMachineName}" // --name + s" ${aParams.learningMachineType}" // juba{*} + s" ${aParams.zooKeepers}" // --zookeeper + + s" ${aParams.thread}" // --thread + + s" ${aParams.timeout}" // --timeout + + s" ${aParams.mixer}" // --mixer + + s" ${aParams.intervalSec}" // --interval_sec + + s" ${aParams.intervalCount}" // --interval_count + + s" ${aParams.zookeeperTimeout}" // --zookeeper_timeout + + s" ${aParams.interconnectTimeout}" // --interconnect_timeout + s" 1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout" + s" 2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr" diff --git a/jubatusonyarn/jubatus-on-yarn-application-master/src/main/scala/us/jubat/yarn/applicationmaster/ApplicationMasterApp.scala b/jubatusonyarn/jubatus-on-yarn-application-master/src/main/scala/us/jubat/yarn/applicationmaster/ApplicationMasterApp.scala index 4ea20d8..d0adf75 100644 --- a/jubatusonyarn/jubatus-on-yarn-application-master/src/main/scala/us/jubat/yarn/applicationmaster/ApplicationMasterApp.scala +++ b/jubatusonyarn/jubatus-on-yarn-application-master/src/main/scala/us/jubat/yarn/applicationmaster/ApplicationMasterApp.scala @@ -118,6 +118,15 @@ class ApplicationMasterParams { @org.kohsuke.args4j.Option(name = "--virtual-cores") var virtualCores: Int = 1 + @org.kohsuke.args4j.Option(name = "--container-memory") + var containerMemory: Int = 128 + + @org.kohsuke.args4j.Option(name = "--container-nodes") + var containerNodes: String = "" + + @org.kohsuke.args4j.Option(name = "--container-racks") + var containerRacks: String = "" + @org.kohsuke.args4j.Option(name = "--learning-machine-name") var learningMachineName: String = "" @@ -148,4 +157,38 @@ class ApplicationMasterParams { @org.kohsuke.args4j.Option(name = "--base-path") var basePath: String = "" + + @org.kohsuke.args4j.Option(name = "--thread") + var thread: Int = 2 + + @org.kohsuke.args4j.Option(name = "--timeout") + var timeout: Int = 10 + + @org.kohsuke.args4j.Option(name = "--mixer") + var mixer: String = "linear_mixer" + + @org.kohsuke.args4j.Option(name = "--interval_sec") + var intervalSec: Int = 16 + + @org.kohsuke.args4j.Option(name = "--interval_count") + var intervalCount: Int = 512 + + @org.kohsuke.args4j.Option(name = "--zookeeper_timeout") + var zookeeperTimeout: Int = 10 + + @org.kohsuke.args4j.Option(name = "--interconnect_timeout") + var interconnectTimeout: Int = 10 + + override def toString(): String = { + val text = s"""applicationName: $applicationName, nodes: $nodes, priority: $priority, + memory: $memory, virtualCores: $virtualCores, containerMemory: $containerMemory, + containerNodes: $containerNodes, containerRacks: $containerRacks, learningMachineName: $learningMachineName, + learningMachineType: $learningMachineType, zooKeepers: $zooKeepers, managementAddress: $managementAddress, + managementPort: $managementPort, applicationMasterNodeAddress: $applicationMasterNodeAddress, + jubatusProxyPort: $jubatusProxyPort, jubatusProxyProcessId: $jubatusProxyProcessId, + thread: $thread, timeout: $timeout, mixer: $mixer, intervalSec: $intervalSec, intervalCount: $intervalCount, + zookeeperTimeout: $zookeeperTimeout, interconnectTimeout: $interconnectTimeout + """.stripMargin.trim + text + } } diff --git a/jubatusonyarn/jubatus-on-yarn-application-master/src/test/resources/entrypoint.sh b/jubatusonyarn/jubatus-on-yarn-application-master/src/test/resources/entrypoint.sh index 6152237..9bae1ea 100644 --- a/jubatusonyarn/jubatus-on-yarn-application-master/src/test/resources/entrypoint.sh +++ b/jubatusonyarn/jubatus-on-yarn-application-master/src/test/resources/entrypoint.sh @@ -14,6 +14,13 @@ APPLICATION_MASTER_PORT="${7}" LEARNING_MACHINE_NAME="${8}" LEARNING_MACHINE_TYPE="${9}" ZOOKEEPER="${10}" +THREAD="${11}" +TIMEOUT="${12}" +MIXER="${13}" +INTERVAL_SEC="${14}" +INTERVAL_COUNT="${15}" +ZOOKEEPER_TIMEOUT="${16}" +INTERCONNECT_TIMEOUT="${17}" IP_ADDRESS=`grep $(hostname) /etc/hosts | awk '{print $1}'` LISTEN_IF=`netstat -ie | grep -B1 ${IP_ADDRESS} | head -n1 | awk '{print $1}'` @@ -27,8 +34,10 @@ for i in `seq 10`; do fi done - echo juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=10 --interval_count=0 --rpc-port=${JUBATUS_SERVER_PORT} --name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} >> /tmp/Container 2>&1 - juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=10 --interval_count=0 --rpc-port=${JUBATUS_SERVER_PORT} --name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} & + echo juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=${INTERVAL_SEC} --interval_count=${INTERVAL_COUNT} --rpc-port=${JUBATUS_SERVER_PORT} \ + --name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} --thread ${THREAD} --timeout ${TIMEOUT} --mixer ${MIXER} --zookeeper_timeout ${ZOOKEEPER_TIMEOUT} --interconnect_timeout ${INTERCONNECT_TIMEOUT} + juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=${INTERVAL_SEC} --interval_count=${INTERVAL_COUNT} --rpc-port=${JUBATUS_SERVER_PORT} \ + --name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} --thread ${THREAD} --timeout ${TIMEOUT} --mixer ${MIXER} --zookeeper_timeout ${ZOOKEEPER_TIMEOUT} --interconnect_timeout ${INTERCONNECT_TIMEOUT} & JUBATUS_SERVER_PROCESS_ID=$! # jubatus server の起動待機 @@ -37,7 +46,7 @@ for i in `seq 10`; do continue 2 fi - echo jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status >> /tmp/Container 2>&1 + echo jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status if (jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status \ | awk '/active '${LEARNING_MACHINE_NAME}' members:/ {flag=1; next} /active/ {flag=0} flag==1 {print}' \ | grep "^${IP_ADDRESS}_${JUBATUS_SERVER_PORT}$"); then @@ -57,11 +66,11 @@ fi echo $JAVA_HOME/bin/java -Xmx${CONTAINER_MEMORY_SIZE}M ${CONTAINER_JRA_MAIN_CLASS} --seq ${SEQ} \ --application-name ${APPLICATION_NAME} --application-master-address ${APPLICATION_MASTER_ADDRESS} --application-master-port ${APPLICATION_MASTER_PORT} \ --container-node-address ${IP_ADDRESS} --jubatus-server-port ${JUBATUS_SERVER_PORT} --jubatus-server-process-id ${JUBATUS_SERVER_PROCESS_ID} \ - --learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} >> /tmp/Container 2>&1 + --learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} $JAVA_HOME/bin/java -Xmx${CONTAINER_MEMORY_SIZE}M ${CONTAINER_JRA_MAIN_CLASS} --seq ${SEQ} \ --application-name ${APPLICATION_NAME} --application-master-address ${APPLICATION_MASTER_ADDRESS} --application-master-port ${APPLICATION_MASTER_PORT} \ --container-node-address ${IP_ADDRESS} --jubatus-server-port ${JUBATUS_SERVER_PORT} --jubatus-server-process-id ${JUBATUS_SERVER_PROCESS_ID} \ - --learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} >> /tmp/Container 2>&1 + --learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} EXIT_CODE=$? ps ${JUBATUS_SERVER_PROCESS_ID} && kill ${JUBATUS_SERVER_PROCESS_ID} exit $EXIT_CODE diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/JubatusYarnApplication.scala b/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/JubatusYarnApplication.scala index fc38759..05ac24c 100644 --- a/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/JubatusYarnApplication.scala +++ b/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/JubatusYarnApplication.scala @@ -15,7 +15,7 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA package us.jubat.yarn.client -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, FileSystem} import scala.concurrent.Future import scala.util.{Success, Failure, Try} import java.net.InetAddress @@ -23,15 +23,30 @@ import us.jubat.yarn.common._ import scala.Some import us.jubat.yarn.client.JubatusYarnApplication.ApplicationContext import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, ApplicationReport} +import org.apache.hadoop.yarn.conf.YarnConfiguration // TODO ExecutionContextをとりあえず追加。これで問題ないかあとで確認。 import scala.concurrent.ExecutionContext.Implicits.global +object Resource { + val defaultMasterMemory: Int = 128 + val defaultJubatusProxyMemory: Int = 32 + val defaultMasterCores: Int = 1 + val defaultPriority: Int = 0 + val defaultContainerMemory: Int = 128 + val defaultJubatusServerMemory: Int = 256 + val defaultContainerCores: Int = 1 +} +case class Resource(priority: Int = Resource.defaultPriority, memory: Int = Resource.defaultJubatusServerMemory, + virtualCores: Int = Resource.defaultContainerCores, masterMemory: Int = Resource.defaultMasterMemory, + proxyMemory: Int = Resource.defaultJubatusProxyMemory, masterCores: Int = Resource.defaultMasterCores, + containerMemory: Int = Resource.defaultContainerMemory, containerNodes: List[String] = null, containerRacks: List[String] = null) -case class Resource(priority: Int, memory: Int, virtualCores: Int) +case class JubatusYarnApplicationStatus(jubatusProxy: java.util.Map[String, java.util.Map[String, String]], jubatusServers: java.util.Map[String, java.util.Map[String, String]], yarnApplication: java.util.Map[String, Any]) -case class JubatusYarnApplicationStatus(jubatusProxy: java.util.Map[String, java.util.Map[String, String]], jubatusServers: java.util.Map[String, java.util.Map[String, String]], yarnApplication: ApplicationReport) +case class JubatusClusterConfiguration(learningMachineName: String, learningMachineType: LearningMachineType, zookeepers: List[Location], configString: String, + configFile: Path = null, resource: Resource, nodeCount: Int, applicationName: String, serverConfig: ServerConfig, proxyConfig: ProxyConfig, basePath: Path = new Path("hdfs:///jubatus-on-yarn")) object JubatusYarnApplication extends HasLogger { @@ -84,8 +99,49 @@ object JubatusYarnApplication extends HasLogger { * @param aNodeCount number of cluster * @return [[JubatusYarnApplication]] */ + @deprecated("not recommended use the start(JubatusClusterConfiguration)") def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int): Future[JubatusYarnApplication] = { - start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn")) + start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"), null) + } + + /** + * JubatusYarnApplication を起動します。 + * + * juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。 + * 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。 + * + * @param aLearningMachineName learning machine name + * @param aLearningMachineType learning machine type + * @param aZookeepers ZooKeeper locations + * @param aConfigString config json string + * @param aResource computer resources in the cluster + * @param aNodeCount number of cluster + * @param aApplicationName yarn-application name + * @return [[JubatusYarnApplication]] + */ + @deprecated("not recommended use the start(JubatusClusterConfiguration)") + def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aApplicationName: String): Future[JubatusYarnApplication] = { + start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"), aApplicationName) + } + + /** + * JubatusYarnApplication を起動します。 + * + * juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。 + * 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。 + * + * @param aLearningMachineName learning machine name + * @param aLearningMachineType learning machine type + * @param aZookeepers ZooKeeper locations + * @param aConfigString config json string + * @param aResource computer resources in the cluster + * @param aNodeCount number of cluster + * @param aBasePath base path of jar and sh files + * @return [[JubatusYarnApplication]] + */ + @deprecated("not recommended use the start(JubatusClusterConfiguration)") + def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path): Future[JubatusYarnApplication] = { + start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, aBasePath, null) } /** @@ -101,9 +157,11 @@ object JubatusYarnApplication extends HasLogger { * @param aResource computer resources in the cluster * @param aNodeCount number of cluster * @param aBasePath base path of jar and sh files + * @param aApplicationName yarn-application name * @return [[JubatusYarnApplication]] */ - def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path): Future[JubatusYarnApplication] = Future { + @deprecated("not recommended use the start(JubatusClusterConfiguration)") + def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path, aApplicationName: String): Future[JubatusYarnApplication] = Future { require(aResource.memory > 0, "specify memory than 1MB.") require(aNodeCount > 0, "specify node count than 1") @@ -114,8 +172,32 @@ object JubatusYarnApplication extends HasLogger { case None => throw new IllegalStateException("Service not running.") case Some(tYarnClientController) => // ApplicationMaster 起動 - logger.info(s"startJubatusApplication $aLearningMachineName, $aLearningMachineType, $aZookeepers, $aConfigString, $aResource, $aNodeCount") - val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, aBasePath) + logger.info(s"startJubatusApplication $aLearningMachineName, $aLearningMachineType, $aZookeepers, $aConfigString, $aResource, $aNodeCount, $aApplicationName") + val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, aBasePath, aApplicationName) + waitForStarted(ApplicationContext(tYarnClientController, tApplicationMasterProxy, tService)) + } + } + + /** + * JubatusYarnApplication を起動します。 + * + * juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。 + * 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。 + * + * @param aJubatusClusterConfiguration argument of start method + * @return [[JubatusYarnApplication]] + */ + def start(aJubatusClusterConfiguration: JubatusClusterConfiguration): Future[JubatusYarnApplication] = Future { + + val tService = new JubatusYarnService() + tService.start() + + tService.yarnClientController match { + case None => throw new IllegalStateException("Service not running.") + case Some(tYarnClientController) => + // ApplicationMaster 起動 + logger.info(s"startJubatusApplication $aJubatusClusterConfiguration") + val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aJubatusClusterConfiguration) waitForStarted(ApplicationContext(tYarnClientController, tApplicationMasterProxy, tService)) } } @@ -134,8 +216,29 @@ object JubatusYarnApplication extends HasLogger { * @param aNodeCount number of cluster * @return [[JubatusYarnApplication]] */ + @deprecated("not recommended use the start(JubatusClusterConfiguration)") def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int): Future[JubatusYarnApplication] = { - start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn")) + start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"), null) + } + + /** + * JubatusYarnApplication を起動します。 + * + * juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。 + * 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。 + * + * @param aLearningMachineName learning machine name + * @param aLearningMachineType learning machine type + * @param aZookeepers ZooKeeper locations + * @param aConfigFile config file + * @param aResource computer resources in the cluster + * @param aNodeCount number of cluster + * @param aApplicationName yarn-application name + * @return [[JubatusYarnApplication]] + */ + @deprecated("not recommended use the start(JubatusClusterConfiguration)") + def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aApplicationName: String): Future[JubatusYarnApplication] = { + start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"), aApplicationName) } /** @@ -152,7 +255,28 @@ object JubatusYarnApplication extends HasLogger { * @param aNodeCount number of cluster * @return [[JubatusYarnApplication]] */ - def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path): Future[JubatusYarnApplication] = Future { + @deprecated("not recommended use the start(JubatusClusterConfiguration)") + def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path): Future[JubatusYarnApplication] = { + start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, aBasePath, null) + } + + /** + * JubatusYarnApplication を起動します。 + * + * juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。 + * 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。 + * + * @param aLearningMachineName learning machine name + * @param aLearningMachineType learning machine type + * @param aZookeepers ZooKeeper locations + * @param aConfigFile config file + * @param aResource computer resources in the cluster + * @param aNodeCount number of cluster + * @param aApplicationName yarn-application name + * @return [[JubatusYarnApplication]] + */ + @deprecated("not recommended use the start(JubatusClusterConfiguration)") + def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path, aApplicationName: String): Future[JubatusYarnApplication] = Future { require(aResource.memory > 0, "specify memory than 1MB.") require(aNodeCount > 0, "specify node count than 1") @@ -163,8 +287,8 @@ object JubatusYarnApplication extends HasLogger { case None => throw new IllegalStateException("Service not running.") case Some(tYarnClientController) => // ApplicationMaster 起動 - logger.info(s"startJubatusApplication $aLearningMachineName, $aLearningMachineType, $aZookeepers, $aConfigFile, $aResource, $aNodeCount") - val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, aBasePath) + logger.info(s"startJubatusApplication $aLearningMachineName, $aLearningMachineType, $aZookeepers, $aConfigFile, $aResource, $aNodeCount, $aApplicationName") + val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, aBasePath, aApplicationName) waitForStarted(ApplicationContext(tYarnClientController, tApplicationMasterProxy, tService)) } } @@ -222,6 +346,24 @@ class JubatusYarnApplication(val jubatusProxy: Location, val jubatusServers: Lis */ def loadModel(aModelPathPrefix: Path, aModelId: String): Try[JubatusYarnApplication] = Try { logger.info(s"loadModel $aModelPathPrefix, $aModelId") + + val tHdfs = FileSystem.get(new YarnConfiguration()) + val srcPath = new Path(aModelPathPrefix, aModelId) + if (!tHdfs.exists(srcPath)) { + val msg = s"model path does not exist ($srcPath)" + logger.error(msg) + throw new RuntimeException(msg) + } + + for (i <- 0 to jubatusServers.size - 1) { + val srcFile = new Path(srcPath, s"$i.jubatus") + if (!tHdfs.exists(srcFile)) { + val msg = s"model file does not exist ($srcFile)" + logger.error(msg) + throw new RuntimeException(msg) + } + } + aContext.controller.loadModel(aModelPathPrefix, aModelId) this } diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/YarnClient.scala b/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/YarnClient.scala index f9d5898..45e43bc 100644 --- a/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/YarnClient.scala +++ b/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/YarnClient.scala @@ -25,14 +25,18 @@ import org.apache.hadoop.yarn.api.records.{Resource => YarnResource, _} import org.apache.hadoop.yarn.client.api.{YarnClient => HadoopYarnClient} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} -import us.jubat.yarn.common.{HasLogger, LearningMachineType, Location} +import us.jubat.yarn.common.{HasLogger, LearningMachineType, Location, ServerConfig, ProxyConfig} import scala.collection.JavaConverters._ import scala.io.Source trait YarnClient { + def submitApplicationMaster(aJubatusClusterConfiguration: JubatusClusterConfiguration, aManagementLocation: Location): ApplicationId + + @deprecated("not recommended use the submitApplicationMaster(JubatusClusterConfiguration, Location)") def submitApplicationMaster(aApplicationName: String, aLearningMachineInstanceName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodes: Int, aManagementLocation: Location, aBasePath: Path): ApplicationId + @deprecated("not recommended use the submitApplicationMaster(JubatusClusterConfiguration, Location)") def submitApplicationMaster(aApplicationName: String, aLearningMachineInstanceName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodes: Int, aManagementLocation: Location, aBasePath: Path): ApplicationId def getStatus(aApplicationId: ApplicationId): ApplicationReport @@ -51,14 +55,11 @@ class DefaultYarnClient extends YarnClient with HasLogger { private def jubaConfigBasePath(aBasePath: Path): Path = new Path(aBasePath, "application-master/jubaconfig") private val jubaConfigName: String = "jubaconfig.json" - private val jubaProxyMemory: Int = 32 private def applicationMasterJarPath(aBasePath: Path): Path = new Path(aBasePath, "application-master/jubatus-on-yarn-application-master.jar") private val applicationMasterJarName: String = "jubatus-on-yarn-application-master.jar" private val applicationMasterMainClass: String = "us.jubat.yarn.applicationmaster.ApplicationMasterApp" - private val applicationMasterMemory: Int = 128 - private val applicationMasterVirtualCores: Int = 1 private val mYarnConfig = new YarnConfiguration() private val mYarnClient = { @@ -79,6 +80,7 @@ class DefaultYarnClient extends YarnClient with HasLogger { tResource } + @deprecated("not recommended use the submitApplicationMaster(JubatusClusterConfiguration, Location)") override def submitApplicationMaster(aApplicationName: String, aLearningMachineInstanceName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodes: Int, aManagementLocation: Location, aBasePath: Path): ApplicationId = { val tHdfsPath = new Path(jubaConfigBasePath(aBasePath), s"${aApplicationName.replaceAll(":", "_")}.json") @@ -92,16 +94,27 @@ class DefaultYarnClient extends YarnClient with HasLogger { submitApplicationMaster(aApplicationName, aLearningMachineInstanceName, aLearningMachineType, aZookeepers, tHdfsPath, aResource, aNodes, aManagementLocation, aBasePath) } - + @deprecated("not recommended use the submitApplicationMaster(JubatusClusterConfiguration, Location)") override def submitApplicationMaster(aApplicationName: String, aLearningMachineInstanceName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodes: Int, aManagementLocation: Location, aBasePath: Path): ApplicationId = { - logger.info(s"call submitApplicationMaster($aApplicationName, $aLearningMachineInstanceName, $aLearningMachineType, $aZookeepers, $aConfigFile, $aResource, $aNodes, $aManagementLocation)") + val jubaClusterConfig = JubatusClusterConfiguration(aLearningMachineInstanceName, aLearningMachineType, aZookeepers, "", + aConfigFile, aResource, aNodes, aApplicationName, ServerConfig(), ProxyConfig(), aBasePath) + submitApplicationMaster(jubaClusterConfig, aManagementLocation) + } + + override def submitApplicationMaster(aJubatusClusterConfiguration: JubatusClusterConfiguration, aManagementLocation: Location): ApplicationId = { + logger.info(s"call submitApplicationMaster($aJubatusClusterConfiguration, $aManagementLocation)") + + var configFile = aJubatusClusterConfiguration.configFile + if (aJubatusClusterConfiguration.configFile == null) { + configFile = createConfigFile(aJubatusClusterConfiguration.applicationName, aJubatusClusterConfiguration.configString, aJubatusClusterConfiguration.basePath) + } val tApplicationMasterContext = Records.newRecord(classOf[ContainerLaunchContext]) tApplicationMasterContext.setLocalResources( Map( - entryScriptName -> toLocalResource(entryScriptPath(aBasePath)), - jubaConfigName -> toLocalResource(aConfigFile), - applicationMasterJarName -> toLocalResource(applicationMasterJarPath(aBasePath)) + entryScriptName -> toLocalResource(entryScriptPath(aJubatusClusterConfiguration.basePath)), + jubaConfigName -> toLocalResource(configFile), + applicationMasterJarName -> toLocalResource(applicationMasterJarPath(aJubatusClusterConfiguration.basePath)) ).asJava ) @@ -122,31 +135,59 @@ class DefaultYarnClient extends YarnClient with HasLogger { case LearningMachineType.Anomaly => "anomaly" case LearningMachineType.Recommender => "recommender" } + + var containerNodes: String = """\"\"""" + if (aJubatusClusterConfiguration.resource.containerNodes != null) { + containerNodes = s"""\"${aJubatusClusterConfiguration.resource.containerNodes.mkString(",")}\"""" + } + var containerRacks: String = """\"\"""" + if (aJubatusClusterConfiguration.resource.containerRacks != null) { + containerRacks = s"""\"${aJubatusClusterConfiguration.resource.containerRacks.mkString(",")}\"""" + } + val tCommand = ( s"bash $entryScriptName" // ApplicationMaster の jar 起動用 java コマンド + s" $applicationMasterJarName" + s" $applicationMasterMainClass" - + s" $applicationMasterMemory" + + s" ${aJubatusClusterConfiguration.resource.masterMemory}" // ApplicationMaster - + s" $aApplicationName" // --application-name + + s" ${aJubatusClusterConfiguration.applicationName}" // --application-name + s" ${aManagementLocation.hostAddress}" // --management-address + s" ${aManagementLocation.port}" // --management-port - + s" $aNodes" // --nodes - + s" ${aResource.priority}" // --priority - + s" ${aResource.memory}" // --memory - + s" ${aResource.virtualCores}" // --virtual-cores + + s" ${aJubatusClusterConfiguration.nodeCount}" // --nodes + + s" ${aJubatusClusterConfiguration.resource.priority}" // --priority + + s" ${aJubatusClusterConfiguration.resource.memory}" // --memory + + s" ${aJubatusClusterConfiguration.resource.virtualCores}" // --virtual-cores + + s" ${aJubatusClusterConfiguration.resource.containerMemory}" // --container-memory + + s" ${containerNodes}" // --container-nodes + + s" ${containerRacks}" // --container-racks // ApplicationMaster, juba*_proxy, jubaconfig - + s" $aLearningMachineInstanceName" // --learning-machine-name / --name - + s" ${typeToString(aLearningMachineType)}" // --learning-machine-type / juba{}_proxy - + s" ${aZookeepers.map { z => s"${z.hostAddress}:${z.port}"}.mkString(",")}" // --zookeeper / --zookeeper + + s" ${aJubatusClusterConfiguration.learningMachineName}" // --learning-machine-name / --name + + s" ${typeToString(aJubatusClusterConfiguration.learningMachineType)}" // --learning-machine-type / juba{}_proxy + + s" ${aJubatusClusterConfiguration.zookeepers.map { z => s"${z.hostAddress}:${z.port}"}.mkString(",")}" // --zookeeper / --zookeeper // jubaconfig + s" $jubaConfigName" // -file - + s" $aBasePath" // ApplicationMaster + + s" ${aJubatusClusterConfiguration.basePath}" // ApplicationMaster + + s" ${aJubatusClusterConfiguration.serverConfig.thread}" // --thread + + s" ${aJubatusClusterConfiguration.serverConfig.timeout}" // --timeout + + s" ${aJubatusClusterConfiguration.serverConfig.mixer.name}" // --mixer + + s" ${aJubatusClusterConfiguration.serverConfig.intervalSec}" // --interval_sec + + s" ${aJubatusClusterConfiguration.serverConfig.intervalCount}" // --interval_count + + s" ${aJubatusClusterConfiguration.serverConfig.zookeeperTimeout}" // --zookeeper_timeout + + s" ${aJubatusClusterConfiguration.serverConfig.interconnectTimeout}" // --interconnect_timeout + + // juba*_proxy + + s" ${aJubatusClusterConfiguration.proxyConfig.thread}" // --thread + + s" ${aJubatusClusterConfiguration.proxyConfig.timeout}" // --timeout + + s" ${aJubatusClusterConfiguration.proxyConfig.zookeeperTimeout}" // --zookeeper_timeout + + s" ${aJubatusClusterConfiguration.proxyConfig.interconnectTimeout}" // --interconnect_timeout + + s" ${aJubatusClusterConfiguration.proxyConfig.poolExpire}" // --pool_expire + + s" ${aJubatusClusterConfiguration.proxyConfig.poolSize}" // --pool_size + s" 1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout" + s" 2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr" @@ -155,20 +196,20 @@ class DefaultYarnClient extends YarnClient with HasLogger { // リソース val tResource = Records.newRecord(classOf[YarnResource]) - tResource.setMemory(jubaProxyMemory + applicationMasterMemory) - tResource.setVirtualCores(applicationMasterVirtualCores) + tResource.setMemory(aJubatusClusterConfiguration.resource.proxyMemory + aJubatusClusterConfiguration.resource.masterMemory) + tResource.setVirtualCores(aJubatusClusterConfiguration.resource.masterCores) // Application Master 登録 val tApplication = mYarnClient.createApplication() val tContext = tApplication.getApplicationSubmissionContext - tContext.setApplicationName(aApplicationName) + tContext.setApplicationName(aJubatusClusterConfiguration.applicationName) tContext.setAMContainerSpec(tApplicationMasterContext) tContext.setResource(tResource) tContext.setQueue("default") logger.info( s"submit ApplicationMaster\n" - + s"\tname: $aApplicationName\n" + + s"\tname: ${aJubatusClusterConfiguration.applicationName}\n" + s"\tcommand: $tCommand\n" + s"\tmemory: ${tResource.getMemory}\n" + s"\tvirtualCores: ${tResource.getVirtualCores}" @@ -184,4 +225,16 @@ class DefaultYarnClient extends YarnClient with HasLogger { logger.info(s"kill $aApplicationId") mYarnClient.killApplication(aApplicationId) } + + private def createConfigFile(aApplicationName: String, aConfigString: String, aBasePath: Path): Path = { + val tHdfsPath = new Path(jubaConfigBasePath(aBasePath), s"${aApplicationName.replaceAll(":", "_")}.json") + + val tTempFile = File.createTempFile(s"jubatus-on-yarn-server", ".json") + val tWriter = new PrintWriter(tTempFile) + tWriter.println(aConfigString) + tWriter.close() + + FileSystem.get(mYarnConfig).copyFromLocalFile(true, true, new Path(tTempFile.getPath), tHdfsPath) + tHdfsPath + } } diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/YarnClientController.scala b/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/YarnClientController.scala index b55d614..4159bf5 100644 --- a/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/YarnClientController.scala +++ b/jubatusonyarn/jubatus-on-yarn-client/src/main/scala/us/jubat/yarn/client/YarnClientController.scala @@ -35,7 +35,7 @@ class YarnClientController(location: Location, yarnClient: YarnClient = new Defa def isFinished = mIsFinished private var mApplicationMaster: Option[ApplicationMasterProxy] = None - private var mClient: Option[ClientBase] = None + private[client] var mClient: Option[ClientBase] = None private def getFullName(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location]): String = { s"$aName:${aLearningMachineType.name}:${aZookeepers.map(z => z.hostAddress + ":" + z.port).mkString(",")}" @@ -46,8 +46,14 @@ class YarnClientController(location: Location, yarnClient: YarnClient = new Defa mApplicationMaster.get } + @deprecated("not recommended use the startJubatusApplication(JubatusClusterConfiguration)") def startJubatusApplication(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path): ApplicationMasterProxy = { - val tFullName = getFullName(aName, aLearningMachineType, aZookeepers) + startJubatusApplication(aName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, aBasePath, null) + } + + @deprecated("not recommended use the startJubatusApplication(JubatusClusterConfiguration)") + def startJubatusApplication(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path, aApplicationName: String): ApplicationMasterProxy = { + val tFullName = Option(aApplicationName).getOrElse(getFullName(aName, aLearningMachineType, aZookeepers)) logger.info(s"starting $tFullName") val tApplicationId = yarnClient.submitApplicationMaster(tFullName, aName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, location, aBasePath) @@ -55,8 +61,14 @@ class YarnClientController(location: Location, yarnClient: YarnClient = new Defa registerApplication(tFullName, tApplicationId, aNodeCount) } + @deprecated("not recommended use the startJubatusApplication(JubatusClusterConfiguration)") def startJubatusApplication(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path): ApplicationMasterProxy = { - val tFullName = getFullName(aName, aLearningMachineType, aZookeepers) + startJubatusApplication(aName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, aBasePath, null) + } + + @deprecated("not recommended use the startJubatusApplication(JubatusClusterConfiguration)") + def startJubatusApplication(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path, aApplicationName: String): ApplicationMasterProxy = { + val tFullName = Option(aApplicationName).getOrElse(getFullName(aName, aLearningMachineType, aZookeepers)) logger.info(s"starting $tFullName") val tApplicationId = yarnClient.submitApplicationMaster(tFullName, aName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, location, aBasePath) @@ -64,6 +76,12 @@ class YarnClientController(location: Location, yarnClient: YarnClient = new Defa registerApplication(tFullName, tApplicationId, aNodeCount) } + def startJubatusApplication(aJubatusClusterConfiguration: JubatusClusterConfiguration): ApplicationMasterProxy = { + val tApplicationId = yarnClient.submitApplicationMaster(aJubatusClusterConfiguration, location) + + registerApplication(aJubatusClusterConfiguration.applicationName, tApplicationId, aJubatusClusterConfiguration.nodeCount) + } + /** * 新しい[[ApplicationMasterProxy]]を登録します。 * @@ -89,10 +107,19 @@ class YarnClientController(location: Location, yarnClient: YarnClient = new Defa def status: JubatusYarnApplicationStatus = { requireState(mApplicationMaster.isDefined) requireState(mClient.isDefined) + + val appReport = yarnClient.getStatus(mApplicationMaster.get.applicationId) + val curTime = System.currentTimeMillis() + val opTime = curTime - appReport.getStartTime() + var appMap: java.util.Map[String, Any] = new java.util.LinkedHashMap() + appMap.put("applicationReport", appReport) + appMap.put("currentTime", curTime) + appMap.put("oparatingTime", opTime) + JubatusYarnApplicationStatus( - jubatusProxy = mClient.get.getProxyStatus, + jubatusProxy = mClient.get.getProxyStatus, jubatusServers = mClient.get.getStatus, - yarnApplication = yarnClient.getStatus(mApplicationMaster.get.applicationId) + yarnApplication = appMap ) } diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/.gitignore b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/.gitignore new file mode 100644 index 0000000..d4dc803 --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/.gitignore @@ -0,0 +1,2 @@ +core-site.xml +yarn-site.xml \ No newline at end of file diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/core-site.xml.dist b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/core-site.xml.dist new file mode 100644 index 0000000..68ab82a --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/core-site.xml.dist @@ -0,0 +1,22 @@ + + + + + + + + fs.defaultFS + hdfs://[host]:[port] + + + hadoop.proxyuser.mapred.groups + * + + + hadoop.proxyuser.mapred.hosts + * + + \ No newline at end of file diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/entrypoint.sh b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/entrypoint.sh index e474bed..03e5fc4 100644 --- a/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/entrypoint.sh +++ b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/entrypoint.sh @@ -13,20 +13,40 @@ NODE_COUNT="${7}" PRIORITY="${8}" MEMORY="${9}" VIRTUAL_CORES="${10}" +CONTAINER_MEMORY="${11}" +CONTAINER_NODES="${12}" +CONTAINER_RACKS="${13}" -LEARNING_MACHINE_NAME="${11}" -LEARNING_MACHINE_TYPE="${12}" -ZOOKEEPER="${13}" +LEARNING_MACHINE_NAME="${14}" +LEARNING_MACHINE_TYPE="${15}" +ZOOKEEPER="${16}" -CONFIG_FILE="${14}" +CONFIG_FILE="${17}" -BASE_PATH="${15}" +BASE_PATH="${18}" + +# Server Config +SERVER_THREAD="${19}" +SERVER_TIMEOUT="${20}" +SERVER_MIXER="${21}" +SERVER_INTERVAL_SEC="${22}" +SERVER_INTERVAL_COUNT="${23}" +SERVER_ZOOKEEPER_TIMEOUT="${24}" +SERVER_INTERCONNECT_TIMEOUT="${25}" + +# Proxy Config +PROXY_THREAD="${26}" +PROXY_TIMEOUT="${27}" +PROXY_ZOOKEEPER_TIMEOUT="${28}" +PROXY_INTERCONNECT_TIMEOUT="${29}" +PROXY_POOL_EXPIRE="${30}" +PROXY_POOL_SIZE="${31}" IP_ADDRESS=`grep $(hostname) /etc/hosts | awk '{print $1}'` LISTEN_IF=`netstat -ie | grep -B1 ${IP_ADDRESS} | head -n1 | awk '{print $1}'` # execute `jubaconfig` command -echo jubaconfig --cmd write --zookeeper=${ZOOKEEPER} --file ${CONFIG_FILE} --name ${LEARNING_MACHINE_NAME} --type ${LEARNING_MACHINE_TYPE} >> /tmp/ApplicationMaster 2>&1 +echo jubaconfig --cmd write --zookeeper=${ZOOKEEPER} --file ${CONFIG_FILE} --name ${LEARNING_MACHINE_NAME} --type ${LEARNING_MACHINE_TYPE} jubaconfig --cmd write --zookeeper=${ZOOKEEPER} --file ${CONFIG_FILE} --name ${LEARNING_MACHINE_NAME} --type ${LEARNING_MACHINE_TYPE} # fail if jubaconfig failed if [[ $? != 0 ]] ; then @@ -42,8 +62,12 @@ for i in `seq 10`; do fi done - echo juba${LEARNING_MACHINE_TYPE}_proxy --zookeeper=${ZOOKEEPER} --rpc-port=${JUBATUS_PROXY_PORT} --listen_if ${LISTEN_IF} >> /tmp/ApplicationMaster 2>&1 - juba${LEARNING_MACHINE_TYPE}_proxy --zookeeper=${ZOOKEEPER} --rpc-port=${JUBATUS_PROXY_PORT} --listen_if ${LISTEN_IF} & + echo juba${LEARNING_MACHINE_TYPE}_proxy --zookeeper=${ZOOKEEPER} --rpc-port=${JUBATUS_PROXY_PORT} --listen_if ${LISTEN_IF} \ + --thread ${PROXY_THREAD} --timeout ${PROXY_TIMEOUT} --zookeeper_timeout ${PROXY_ZOOKEEPER_TIMEOUT} --interconnect_timeout ${PROXY_INTERCONNECT_TIMEOUT} \ + --pool_expire ${PROXY_POOL_EXPIRE} --pool_size ${PROXY_POOL_SIZE} + juba${LEARNING_MACHINE_TYPE}_proxy --zookeeper=${ZOOKEEPER} --rpc-port=${JUBATUS_PROXY_PORT} --listen_if ${LISTEN_IF} \ + --thread ${PROXY_THREAD} --timeout ${PROXY_TIMEOUT} --zookeeper_timeout ${PROXY_ZOOKEEPER_TIMEOUT} --interconnect_timeout ${PROXY_INTERCONNECT_TIMEOUT} \ + --pool_expire ${PROXY_POOL_EXPIRE} --pool_size ${PROXY_POOL_SIZE} & JUBATUS_PROXY_PROCESS_ID=$! # jubatus_proxy の起動待機 @@ -52,7 +76,7 @@ for i in `seq 10`; do continue 2 fi - echo jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status >> /tmp/ApplicationMaster 2>&1 + echo jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status if (jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status \ | awk '/active jubaproxy members:/ {flag=1; next} /active/ {flag=0} flag==1 {print}' \ | grep "^${IP_ADDRESS}_${JUBATUS_PROXY_PORT}$"); then @@ -70,15 +94,19 @@ fi # launch `ApplicationMaster` echo $JAVA_HOME/bin/java -Xmx${APPLICATION_MASTER_MEMORY}M ${APPLICATION_MASTER_MAIN_CLASS} --application-name ${APPLICATION_NAME} --nodes ${NODE_COUNT} --priority ${PRIORITY} --memory ${MEMORY} \ - --virtual-cores ${VIRTUAL_CORES} --learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} \ + --virtual-cores ${VIRTUAL_CORES} --container-memory ${CONTAINER_MEMORY} --container-nodes "${CONTAINER_NODES}" --container-racks "${CONTAINER_RACKS}" \ + --learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} \ --zookeeper ${ZOOKEEPER} --management-address ${MANAGEMENT_ADDRESS} --management-port ${MANAGEMENT_PORT} \ --application-master-node-address ${IP_ADDRESS} --jubatus-proxy-port ${JUBATUS_PROXY_PORT} --jubatus-proxy-process-id ${JUBATUS_PROXY_PROCESS_ID} \ - --base-path ${BASE_PATH} >> /tmp/ApplicationMaster 2>&1 + --base-path ${BASE_PATH} --thread ${SERVER_THREAD} --timeout ${SERVER_TIMEOUT} --mixer ${SERVER_MIXER} --interval_sec ${SERVER_INTERVAL_SEC} \ + --interval_count ${SERVER_INTERVAL_COUNT} --zookeeper_timeout ${SERVER_ZOOKEEPER_TIMEOUT} --interconnect_timeout ${SERVER_INTERCONNECT_TIMEOUT} $JAVA_HOME/bin/java -Xmx${APPLICATION_MASTER_MEMORY}M ${APPLICATION_MASTER_MAIN_CLASS} --application-name ${APPLICATION_NAME} --nodes ${NODE_COUNT} --priority ${PRIORITY} --memory ${MEMORY} \ - --virtual-cores ${VIRTUAL_CORES} --learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} \ + --virtual-cores ${VIRTUAL_CORES} --container-memory ${CONTAINER_MEMORY} --container-nodes "${CONTAINER_NODES}" --container-racks "${CONTAINER_RACKS}" \ + --learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} \ --zookeeper ${ZOOKEEPER} --management-address ${MANAGEMENT_ADDRESS} --management-port ${MANAGEMENT_PORT} \ --application-master-node-address ${IP_ADDRESS} --jubatus-proxy-port ${JUBATUS_PROXY_PORT} --jubatus-proxy-process-id ${JUBATUS_PROXY_PROCESS_ID} \ - --base-path ${BASE_PATH} >> /tmp/ApplicationMaster 2>&1 + --base-path ${BASE_PATH} --thread ${SERVER_THREAD} --timeout ${SERVER_TIMEOUT} --mixer ${SERVER_MIXER} --interval_sec ${SERVER_INTERVAL_SEC} \ + --interval_count ${SERVER_INTERVAL_COUNT} --zookeeper_timeout ${SERVER_ZOOKEEPER_TIMEOUT} --interconnect_timeout ${SERVER_INTERCONNECT_TIMEOUT} EXIT_CODE=$? ps ${JUBATUS_PROXY_PROCESS_ID} && kill ${JUBATUS_PROXY_PROCESS_ID} exit $EXIT_CODE diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/jubatus_config.json b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/jubatus_config.json new file mode 100644 index 0000000..f360a96 --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/jubatus_config.json @@ -0,0 +1 @@ +{"method":"AROW","parameter":{"regularization_weight":1.0},"converter":{"num_filter_types":{},"num_filter_rules":[],"string_filter_types":{},"string_filter_rules":[],"num_types":{},"num_rules":[{"key":"*","type":"num"}],"string_types":{"unigram":{"method":"ngram","char_num":"1"}},"string_rules":[{"key":"*","type":"unigram","sample_weight":"bin","global_weight":"bin"}]}} \ No newline at end of file diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/yarn-site.xml.dist b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/yarn-site.xml.dist new file mode 100644 index 0000000..59e4df2 --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-client/src/test/resources/yarn-site.xml.dist @@ -0,0 +1,53 @@ + + + + + + + yarn.nodemanager.aux-services + mapreduce_shuffle + + + + yarn.nodemanager.aux-services.mapreduce_shuffle.class + org.apache.hadoop.mapred.ShuffleHandler + + + + yarn.log-aggregation-enable + true + + + + List of directories to store localized files in. + yarn.nodemanager.local-dirs + file:///var/lib/hadoop-yarn/cache/${user.name}/nm-local-dir + + + + Where to store container logs. + yarn.nodemanager.log-dirs + file:///var/log/hadoop-yarn/containers + + + + Classpath for typical applications. + yarn.application.classpath + + $HADOOP_CONF_DIR, + $HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*, + $HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*, + $HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*, + $HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/* + + + \ No newline at end of file diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/JubatusYarnApplicationSpec.scala b/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/JubatusYarnApplicationSpec.scala new file mode 100644 index 0000000..01970ca --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/JubatusYarnApplicationSpec.scala @@ -0,0 +1,202 @@ +package us.jubat.yarn.client + +import org.scalatest._ +import us.jubat.yarn.common._ +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.fs.permission.FsPermission +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.util._ +import scala.concurrent.duration.Duration +import scala.sys.process.{Process, ProcessBuilder} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.api.records.ApplicationReport + +class JubatusYarnApplicationSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + + val machineType = LearningMachineType.Classifier + val zookeeper = new Location("localhost", 2181) + val configString = """{"method":"AROW","parameter":{"regularization_weight":1.0},"converter":{"num_filter_types":{},"num_filter_rules":[],"string_filter_types":{},"string_filter_rules":[],"num_types":{},"num_rules":[{"key":"*","type":"num"}],"string_types":{"unigram":{"method":"ngram","char_num":"1"}},"string_rules":[{"key":"*","type":"unigram","sample_weight":"bin","global_weight":"bin"}]}}""" + val basePath = new Path("hdfs:///jubatus-on-yarn/") + val configPath = new Path("hdfs:///jubatus-on-yarn/test/jubatus_config.json") + + override def beforeAll(): Unit = { + //テストデータの配置 + val conf = new Configuration() + val fs = configPath.getFileSystem(conf) + if (!fs.exists(configPath)) { + val localPath = new Path("jubatus-on-yarn-client/src/test/resources/jubatus_config.json") + fs.copyFromLocalFile(localPath, configPath) + } + } + + // start()結果からアプリケーション名を取得する + private def getAppicationName(future: Future[JubatusYarnApplication]): String = { + var applicationName = "" + val result = future.andThen { + case Success(j) => + val appReport: ApplicationReport = j.status.yarnApplication.get("applicationReport").asInstanceOf[ApplicationReport] + applicationName = appReport.getName + j.kill() + case Failure(t) => + print("CREATE MODEL failed: " + t.getMessage) + t.printStackTrace() + } + Await.result(result, Duration.Inf) + return applicationName + } + + "start ()" should "check ApplicationName" in { + //config parameter is String + //specific paramter: No Path and No ApplicationName + var future = JubatusYarnApplication.start("model1", machineType, List(zookeeper), configString, Resource(1, 1, 1), 3) + var resultName = getAppicationName(future) + resultName shouldBe "model1:" + machineType.name + ":" + zookeeper.hostAddress + ":" + zookeeper.port + + //specific paramter: Path and No ApplicationName + future = JubatusYarnApplication.start("model2", machineType, List(zookeeper), configString, Resource(1, 1, 1), 3, basePath) + resultName = getAppicationName(future) + resultName shouldBe "model2:" + machineType.name + ":" + zookeeper.hostAddress + ":" + zookeeper.port + + //specific paramter: No Path and ApplicationName + future = JubatusYarnApplication.start("model3", machineType, List(zookeeper), configString, Resource(1, 1, 1), 3, "dummyApplicationName3") + resultName = getAppicationName(future) + resultName shouldBe "dummyApplicationName3" + + //specific paramter: Path and ApplicationName + future = JubatusYarnApplication.start("model4", machineType, List(zookeeper), configString, Resource(1, 1, 1), 3, basePath, "dummyApplicationName4") + resultName = getAppicationName(future) + resultName shouldBe "dummyApplicationName4" + + //config parameter is Path + //specific paramter: No Path and No ApplicationName + future = JubatusYarnApplication.start("model5", machineType, List(zookeeper), configPath, Resource(1, 1, 1), 3) + resultName = getAppicationName(future) + resultName shouldBe "model5:" + machineType.name + ":" + zookeeper.hostAddress + ":" + zookeeper.port + + //specific paramter: Path and No ApplicationName + future = JubatusYarnApplication.start("model6", machineType, List(zookeeper), configPath, Resource(1, 1, 1), 3, basePath) + resultName = getAppicationName(future) + resultName shouldBe "model6:" + machineType.name + ":" + zookeeper.hostAddress + ":" + zookeeper.port + + //specific paramter: No Path and ApplicationName + future = JubatusYarnApplication.start("model7", machineType, List(zookeeper), configPath, Resource(1, 1, 1), 3, "dummyApplicationName7") + resultName = getAppicationName(future) + resultName shouldBe "dummyApplicationName7" + + //specific paramter: Path and ApplicationName + future = JubatusYarnApplication.start("model8", machineType, List(zookeeper), configPath, Resource(1, 1, 1), 3, basePath, "dummyApplicationName8") + resultName = getAppicationName(future) + resultName shouldBe "dummyApplicationName8" + } + + it should "success server config" in { + val serverConfig = ServerConfig(4, 30, Mixer.Random, 30, 1024, 15, 20) + val jubaClusterConfig = JubatusClusterConfiguration("model1", LearningMachineType.Classifier, List(zookeeper), + configString, null, Resource(), 2, "TestApp1", serverConfig, ProxyConfig(), basePath) + val future = JubatusYarnApplication.start(jubaClusterConfig) + Await.ready(future, Duration.Inf) + future.value.get match { + case Success(juba) => + // パラメータをログで目視確認 + Await.ready(juba.stop(), Duration.Inf) + + case Failure(t) => + t.printStackTrace() + fail() + } + } + + it should "success proxy config" in { + val proxyConfig = ProxyConfig(2, 30, 15, 20, 0, 0) + val jubaClusterConfig = JubatusClusterConfiguration("model1", LearningMachineType.Classifier, List(zookeeper), + configString, null, Resource(), 2, "TestApp1", ServerConfig(), proxyConfig, basePath) + val future = JubatusYarnApplication.start(jubaClusterConfig) + Await.ready(future, Duration.Inf) + future.value.get match { + case Success(juba) => + // パラメータをログで目視確認 + Await.ready(juba.stop(), Duration.Inf) + + case Failure(t) => + t.printStackTrace() + fail() + } + } + + "loadModel()" should "loadModel for classifier" in { + var future = JubatusYarnApplication.start("test", machineType, List(zookeeper), configString, Resource(1, 1, 1), 1) + Await.ready(future, Duration.Inf) + val result = future.value.get + result match { + case Success(juba) => + val tHdfs = FileSystem.get(new YarnConfiguration()) + val srcFile = new Path("hdfs:///data/models/t1/test001/0.jubatus") + if (!tHdfs.exists(srcFile)) { + juba.saveModel(new Path("hdfs:///data/models/t1"), "test001") + } + val result: Try[JubatusYarnApplication] = juba.loadModel(new Path("hdfs:///data/models/t1"), "test001") + + result shouldBe a[Success[_]] + + Await.ready(juba.stop(), Duration.Inf) + + case Failure(t) => + t.printStackTrace() + fail() + } + } + + it should "loadModel a directory doesn't exist for classifier" in { + var future = JubatusYarnApplication.start("test", machineType, List(zookeeper), configString, Resource(1, 1, 1), 1) + Await.ready(future, Duration.Inf) + val result = future.value.get + result match { + case Success(juba) => + val tHdfs = FileSystem.get(new YarnConfiguration()) + val srcPath = new Path("hdfs:///data/models/t1/test001") + if (tHdfs.exists(srcPath)) { + tHdfs.delete(srcPath, true) + } + val result: Try[JubatusYarnApplication] = juba.loadModel(new Path("hdfs:///data/models/t1"), "test001") + + result shouldBe a[Failure[_]] + + Await.ready(juba.stop(), Duration.Inf) + + case Failure(t) => + t.printStackTrace() + fail() + } + } + + it should "loadModel a file doesn't exist for classifier" in { + var future = JubatusYarnApplication.start("test", machineType, List(zookeeper), configString, Resource(1, 1, 1), 1) + Await.ready(future, Duration.Inf) + val result = future.value.get + result match { + case Success(juba) => + val tHdfs = FileSystem.get(new YarnConfiguration()) + val srcFile = new Path("hdfs:///data/models/t1/test001/0.jubatus") + if (tHdfs.exists(srcFile)) { + tHdfs.delete(srcFile, false) + } + val srcPath = new Path("hdfs:///data/models/t1/test001") + if (!tHdfs.exists(srcPath)) { + tHdfs.mkdirs(srcPath) + } + + val result: Try[JubatusYarnApplication] = juba.loadModel(new Path("hdfs:///data/models/t1"), "test001") + + result shouldBe a[Failure[_]] + + Await.ready(juba.stop(), Duration.Inf) + tHdfs.delete(srcPath, true) + + case Failure(t) => + t.printStackTrace() + fail() + } + } +} diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/YarnClientControllerSpec.scala b/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/YarnClientControllerSpec.scala index e9b4b87..cd98157 100644 --- a/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/YarnClientControllerSpec.scala +++ b/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/YarnClientControllerSpec.scala @@ -16,15 +16,19 @@ package us.jubat.yarn.client import java.net.InetAddress - import org.apache.hadoop.fs.Path -import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, ApplicationReport, ApplicationId} +import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, ApplicationReport, ApplicationId, YarnApplicationState} import org.scalatest._ import us.jubat.yarn.common.{LearningMachineType, Location} +import us.jubat.common.ClientBase -class YarnClientControllerSpec extends FlatSpec with Matchers { +class YarnClientControllerSpec extends FlatSpec with Matchers with BeforeAndAfter { class DummyYarnClient extends YarnClient { + override def submitApplicationMaster(aJubatusClusterConfiguration: JubatusClusterConfiguration, aManagementLocation: Location): ApplicationId = { + ApplicationId.newInstance(0, 0) + } + override def submitApplicationMaster(aApplicationName: String, aLearningMachineInstanceName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodes: Int, aManagementLocation: Location, aBasePath: Path): ApplicationId = { ApplicationId.newInstance(0, 0) } @@ -33,14 +37,80 @@ class YarnClientControllerSpec extends FlatSpec with Matchers { ApplicationId.newInstance(0, 0) } - override def getStatus(aApplicationId: ApplicationId): ApplicationReport = ??? + override def getStatus(aApplicationId: ApplicationId): ApplicationReport = { + val appId: ApplicationId = ApplicationId.newInstance(System.currentTimeMillis(), 1111) + ApplicationReport.newInstance(appId, null, "dummyUser", "dummyQueue", "dummyName", + "dummyHost", 0, null, YarnApplicationState.RUNNING, "", "url", System.currentTimeMillis(), 0, + FinalApplicationStatus.UNDEFINED, null, "origTrackingUrl", 0, "Classifier", null) + } override def kill(aApplicationId: ApplicationId): Unit = ??? override def getFinalStatus(aApplicationId: ApplicationId): FinalApplicationStatus = ??? } - def createController() = new YarnClientController(Location(InetAddress.getLocalHost, 0), new DummyYarnClient) + class DummyClient(host: String, port: Int, name: String) extends ClientBase(host, port, name, 10) { + override def getStatus(): java.util.Map[String, java.util.Map[String, String]] = { + val statusMap: java.util.Map[String, java.util.Map[String, String]] = new java.util.HashMap() + val subStatus: java.util.Map[String, String] = new java.util.HashMap() + subStatus.put("type", "classifier") + subStatus.put("VERSION", "0.8.1") + subStatus.put("PROGNAME", "jubaclassifier") + statusMap.put("server1", subStatus) + statusMap.put("server2", subStatus) + statusMap + } + + override def getProxyStatus(): java.util.Map[String, java.util.Map[String, String]] = { + val statusMap: java.util.Map[String, java.util.Map[String, String]] = new java.util.HashMap() + val subStatus: java.util.Map[String, String] = new java.util.HashMap() + subStatus.put("user", "user") + subStatus.put("VERSION", "0.8.1") + subStatus.put("PROGNAME", "jubaclassifier_proxy") + statusMap.put("proxy1", subStatus) + statusMap + } + } + + def createController() = new YarnClientController(Location(InetAddress.getLocalHost, 0), new DummyYarnClient()) + + "startJubatusApplication ()" should "check applicationName" in { + + val machineType = LearningMachineType.Classifier + val zookeeper1 = new Location("localhost",2188) + val zookeeper2 = new Location("127.0.0.2",2189) + + val tController = createController() + + var result = tController.startJubatusApplication("model1", machineType, List(zookeeper1,zookeeper2), "configString", Resource(0, 0, 0), 3, null) + result.name shouldBe "model1:" + machineType.name + ":" + zookeeper1.hostAddress + ":" + zookeeper1.port + "," + zookeeper2.hostAddress + ":" + zookeeper2.port + + result = tController.startJubatusApplication("model2", machineType, List(zookeeper1,zookeeper2), "configString", Resource(0, 0, 0), 3, null, "dummyApplicationName2") + result.name shouldBe "dummyApplicationName2" + + result = tController.startJubatusApplication("model3", machineType, List(zookeeper1,zookeeper2), new Path("/tmp/dummyFile"), Resource(0, 0, 0), 3, null) + result.name shouldBe "model3:" + machineType.name + ":" + zookeeper1.hostAddress + ":" + zookeeper1.port + "," + zookeeper2.hostAddress + ":" + zookeeper2.port + + result = tController.startJubatusApplication("model4", machineType, List(zookeeper1,zookeeper2), new Path("/tmp/dummyFile"), Resource(0, 0, 0), 3, null, "dummyApplicationName4") + result.name shouldBe "dummyApplicationName4" + } + + "status()" should "success getStatus" in { + + val machineType = LearningMachineType.Classifier + val zookeeper1 = new Location("localhost", 2188) + val tController = createController() + var result = tController.startJubatusApplication("model1", machineType, List(zookeeper1), "configString", Resource(0, 0, 0), 3, null) + tController.mClient = Some(new DummyClient("localhost", 9999, "name")) + + val yarnAppStatus = tController.status + yarnAppStatus.jubatusProxy.size() shouldBe 1 + yarnAppStatus.jubatusServers.size() shouldBe 2 + yarnAppStatus.yarnApplication.size() shouldBe 3 + yarnAppStatus.yarnApplication.get("applicationReport") should not be None + yarnAppStatus.yarnApplication.get("currentTime") should not be None + yarnAppStatus.yarnApplication.get("oparatingTime") should not be None + } // "start one" should "not throw exception" in { // val tController = createController() diff --git a/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/YarnClientSpec.scala b/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/YarnClientSpec.scala new file mode 100644 index 0000000..2b9c445 --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-client/src/test/scala/us/jubat/yarn/client/YarnClientSpec.scala @@ -0,0 +1,115 @@ +// Jubatus: Online machine learning framework for distributed environment +// Copyright (C) 2014-2015 Preferred Networks and Nippon Telegraph and Telephone Corporation. +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License version 2.1 as published by the Free Software Foundation. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +package us.jubat.yarn.client + +import org.scalatest._ +import us.jubat.yarn.common.{Location, LearningMachineType, ServerConfig, ProxyConfig, Mixer} +import org.apache.hadoop.fs.Path + +class YarnClientSpec extends FlatSpec with Matchers { + val zookeeper = new Location("localhost", 2181) + val configString = """{"method":"AROW","parameter":{"regularization_weight":1.0}}""" + val manageLocation = new Location("localhost", 9300) + val basePath = new Path("hdfs:///jubatus-on-yarn") + + "submitApplicationMaster ()" should "success recource config" in { + + val yarnClient = new DefaultYarnClient() + + try { + // デフォルト + var resource = Resource() + yarnClient.submitApplicationMaster("TestApp1", "model1", LearningMachineType.Classifier, List(zookeeper), + configString, resource, 3, manageLocation, basePath) + + // ログ(submit ApplicationMaster)を目視確認 + // command: bash [entrypoint] [jar] [class] 128 [ap] [host] [port] [nodes] 0 256 1 128 \"\" \"\" [model] [type] [zookeeper] [config] [basepath] [stdout] [stderr] + // memory: 32+128 = 160 + // virtualCores: 1 + + resource = Resource(1, 512, 2, 256, 128, 3, 256, List("1"), List("1", "2")) + yarnClient.submitApplicationMaster("TestApp1", "model1", LearningMachineType.Classifier, List(zookeeper), + configString, resource, 3, manageLocation, basePath) + + // ログ(submit ApplicationMaster)を目視確認 + // command: bash [entrypoint] [jar] [class] 256 [ap] [host] [port] [nodes] 1 512 2 256 "1" "1,2" [model] [type] [zookeeper] [config] [basepath] [stdout] [stderr] + // memory: 128+256 = 384 + // virtualCores: 3 + } catch { + case e: Throwable => + e.printStackTrace() + fail() + } + } + + it should "success server config" in { + + val yarnClient = new DefaultYarnClient() + + try { + // デフォルト + var serverConfig = ServerConfig() + var jubaClusterConfig = JubatusClusterConfiguration("model1", LearningMachineType.Classifier, List(zookeeper), + configString, null, Resource(), 2, "TestApp1", serverConfig, ProxyConfig(), basePath) + yarnClient.submitApplicationMaster(jubaClusterConfig, manageLocation) + + // ログ(submit ApplicationMaster)を目視確認 + // command: bash [ .... ] 2 10 linear_mixer 16 512 10 10 [ proxy ] [stdout] [stderr] + + // 値指定あり + serverConfig = ServerConfig(3, 30, Mixer.Random, 30, 1024, 15, 20) + jubaClusterConfig = JubatusClusterConfiguration("model1", LearningMachineType.Classifier, List(zookeeper), + configString, null, Resource(), 2, "TestApp1", serverConfig, ProxyConfig(), basePath) + yarnClient.submitApplicationMaster(jubaClusterConfig, manageLocation) + + // ログ(submit ApplicationMaster)を目視確認 + // command: bash [ .... ] 3 30 random_mixer 30 1024 15 20 [ proxy ] [stdout] [stderr] + } catch { + case e: Throwable => + e.printStackTrace() + fail() + } + } + + it should "success proxy config" in { + + val yarnClient = new DefaultYarnClient() + + try { + // デフォルト + var proxyConfig = ProxyConfig() + var jubaClusterConfig = JubatusClusterConfiguration("model1", LearningMachineType.Classifier, List(zookeeper), + configString, null, Resource(), 2, "TestApp1", ServerConfig(), proxyConfig, basePath) + yarnClient.submitApplicationMaster(jubaClusterConfig, manageLocation) + + // ログ(submit ApplicationMaster)を目視確認 + // command: bash [ .... ] 4 10 10 10 60 0 [stdout] [stderr] + + // 値指定あり + proxyConfig = ProxyConfig(2, 20, 30, 40, 100, 16) + jubaClusterConfig = JubatusClusterConfiguration("model1", LearningMachineType.Classifier, List(zookeeper), + configString, null, Resource(), 2, "TestApp1", ServerConfig(), proxyConfig, basePath) + yarnClient.submitApplicationMaster(jubaClusterConfig, manageLocation) + + // ログ(submit ApplicationMaster)を目視確認 + // command: bash [ .... ] 2 20 30 40 100 16 [stdout] [stderr] + } catch { + case e: Throwable => + e.printStackTrace() + fail() + } + } +} \ No newline at end of file diff --git a/jubatusonyarn/jubatus-on-yarn-common/src/main/scala/us/jubat/yarn/common/Utils.scala b/jubatusonyarn/jubatus-on-yarn-common/src/main/scala/us/jubat/yarn/common/Utils.scala index 96283cc..189da5d 100644 --- a/jubatusonyarn/jubatus-on-yarn-common/src/main/scala/us/jubat/yarn/common/Utils.scala +++ b/jubatusonyarn/jubatus-on-yarn-common/src/main/scala/us/jubat/yarn/common/Utils.scala @@ -110,3 +110,47 @@ object Predef extends scala.LowPriorityImplicits { if (!aRequirement) throw new IllegalStateException(aMessage.toString) } } + +object Mixer { + case object Linear extends Mixer("linear_mixer") + case object Random extends Mixer("random_mixer") + case object Broadcast extends Mixer("broadcast_mixer") + case object Skip extends Mixer("skip_mixer") + + def valueOf(aValue: String): Mixer = { + aValue match { + case Mixer.Linear.name => Linear + case Mixer.Random.name => Random + case Mixer.Broadcast.name => Broadcast + case Mixer.Skip.name => Skip + } + } +} +sealed abstract class Mixer(val name: String) + +object ServerConfig { + val defaultThread: Int = 2 + val defaultTimeout: Int = 10 + val defaultMixer: Mixer = Mixer.Linear + val defaultIntervalSec: Int = 16 + val defaultIntervalCount: Int = 512 + val defaultZookeeperTimeout: Int = 10 + val defaultInterconnectTimeout: Int = 10 +} +case class ServerConfig(thread: Int = ServerConfig.defaultThread, timeout: Int = ServerConfig.defaultTimeout, + mixer: Mixer = ServerConfig.defaultMixer, intervalSec: Int = ServerConfig.defaultIntervalSec, + intervalCount: Int = ServerConfig.defaultIntervalCount, zookeeperTimeout: Int = ServerConfig.defaultZookeeperTimeout, + interconnectTimeout: Int = ServerConfig.defaultInterconnectTimeout) + +object ProxyConfig { + val defaultThread: Int = 4 + val defaultTimeout: Int = 10 + val defaultZookeeperTimeout: Int = 10 + val defaultInterconnectTimeout: Int = 10 + val defaultPoolExpire: Int = 60 + val defaultPoolSize: Int = 0 +} +case class ProxyConfig(thread: Int = ProxyConfig.defaultThread, timeout: Int = ProxyConfig.defaultTimeout, + zookeeperTimeout: Int = ProxyConfig.defaultZookeeperTimeout, interconnectTimeout: Int = ProxyConfig.defaultInterconnectTimeout, + poolExpire: Int = ProxyConfig.defaultPoolExpire, poolSize: Int = ProxyConfig.defaultPoolSize) + diff --git a/jubatusonyarn/jubatus-on-yarn-container/src/main/scala/us/jubat/yarn/container/ContainerController.scala b/jubatusonyarn/jubatus-on-yarn-container/src/main/scala/us/jubat/yarn/container/ContainerController.scala index 5cfcc71..cc3a3fb 100644 --- a/jubatusonyarn/jubatus-on-yarn-container/src/main/scala/us/jubat/yarn/container/ContainerController.scala +++ b/jubatusonyarn/jubatus-on-yarn-container/src/main/scala/us/jubat/yarn/container/ContainerController.scala @@ -100,21 +100,24 @@ class ContainerController(name: String, seq: Int, applicationMasterLocation: Loc val tLocalPathString = s"${tIpPort}_${tType}_$aId.jubatus" - // CRCファイルが存在する場合は削除する。 - { - val tCRC = new File(tDir, s".$tLocalPathString.crc") - if(tCRC.exists()) tCRC.delete() - } + try { + // CRCファイルが存在する場合は削除する。 + { + val tCRC = new File(tDir, s".$tLocalPathString.crc") + if(tCRC.exists()) tCRC.delete() + } - // ローカルファイルをコピー - { - val tHdfs = FileSystem.get(new YarnConfiguration()) - val tFromLocal = new Path(tDir, tLocalPathString) - val tToHdfs = new Path(aPathPrefix, s"$aId/$seq.jubatus") - tHdfs.copyFromLocalFile(true, true, tFromLocal, tToHdfs) + // ローカルファイルをコピー + { + val tHdfs = FileSystem.get(new YarnConfiguration()) + val tFromLocal = new Path(tDir, tLocalPathString) + val tToHdfs = new Path(aPathPrefix, s"$aId/$seq.jubatus") + tHdfs.copyFromLocalFile(true, true, tFromLocal, tToHdfs) + } + } finally { + mStatus = ControllerStatus.Wait + logger.debug(s"save: mStatus:$mStatus") } - - mStatus = ControllerStatus.Wait } /** @@ -127,9 +130,13 @@ class ContainerController(name: String, seq: Int, applicationMasterLocation: Loc def load(aPathPrefix: String, aId: String): Unit = { requireState(mStatus == ControllerStatus.Wait) mStatus = ControllerStatus.Loading - val (tIpPort, tDir, tType) = getJubatusConfig - FileSystem.get(new YarnConfiguration()).copyToLocalFile(false, new Path(aPathPrefix, s"$aId/$seq.jubatus"), new Path(tDir, s"${tIpPort}_${tType}_$aId.jubatus")) - mStatus = ControllerStatus.Wait + try { + val (tIpPort, tDir, tType) = getJubatusConfig + FileSystem.get(new YarnConfiguration()).copyToLocalFile(false, new Path(aPathPrefix, s"$aId/$seq.jubatus"), new Path(tDir, s"${tIpPort}_${tType}_$aId.jubatus")) + } finally { + mStatus = ControllerStatus.Wait + logger.debug(s"load: mStatus:$mStatus") + } } /** diff --git a/jubatusonyarn/jubatus-on-yarn-container/src/test/resources/core-site.xml.dist b/jubatusonyarn/jubatus-on-yarn-container/src/test/resources/core-site.xml.dist new file mode 100644 index 0000000..68ab82a --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-container/src/test/resources/core-site.xml.dist @@ -0,0 +1,22 @@ + + + + + + + + fs.defaultFS + hdfs://[host]:[port] + + + hadoop.proxyuser.mapred.groups + * + + + hadoop.proxyuser.mapred.hosts + * + + \ No newline at end of file diff --git a/jubatusonyarn/jubatus-on-yarn-container/src/test/resources/yarn-site.xml.dist b/jubatusonyarn/jubatus-on-yarn-container/src/test/resources/yarn-site.xml.dist new file mode 100644 index 0000000..59e4df2 --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-container/src/test/resources/yarn-site.xml.dist @@ -0,0 +1,53 @@ + + + + + + + yarn.nodemanager.aux-services + mapreduce_shuffle + + + + yarn.nodemanager.aux-services.mapreduce_shuffle.class + org.apache.hadoop.mapred.ShuffleHandler + + + + yarn.log-aggregation-enable + true + + + + List of directories to store localized files in. + yarn.nodemanager.local-dirs + file:///var/lib/hadoop-yarn/cache/${user.name}/nm-local-dir + + + + Where to store container logs. + yarn.nodemanager.log-dirs + file:///var/log/hadoop-yarn/containers + + + + Classpath for typical applications. + yarn.application.classpath + + $HADOOP_CONF_DIR, + $HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*, + $HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*, + $HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*, + $HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/* + + + \ No newline at end of file diff --git a/jubatusonyarn/jubatus-on-yarn-container/src/test/scala/us/jubat/yarn/container/ContainerControllerSpec.scala b/jubatusonyarn/jubatus-on-yarn-container/src/test/scala/us/jubat/yarn/container/ContainerControllerSpec.scala new file mode 100644 index 0000000..bac4c9f --- /dev/null +++ b/jubatusonyarn/jubatus-on-yarn-container/src/test/scala/us/jubat/yarn/container/ContainerControllerSpec.scala @@ -0,0 +1,199 @@ +// Jubatus: Online machine learning framework for distributed environment +// Copyright (C) 2014-2015 Preferred Networks and Nippon Telegraph and Telephone Corporation. +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License version 2.1 as published by the Free Software Foundation. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +package us.jubat.yarn.container + +import org.scalatest._ +import java.net.{InetSocketAddress, InetAddress} +import us.jubat.yarn.common._ +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.eclipse.jetty.server.Server +import scala.util.{Failure, Success, Try} + +class ContainerControllerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + + private var Controller: ContainerController = null + private var tAmJettyServer: JettyServer = null + private var tCnJettyServer: JettyServer = null + private var dmyJubaServer: DummyJubatusServer = null + + override def beforeAll(): Unit = { + dmyJubaServer = new DummyJubatusServer + dmyJubaServer.start(9300) + + val dmyAmServlet = new DummyApplicationMasterServlet + tAmJettyServer = new JettyServer("/v1", dmyAmServlet) + tAmJettyServer.start() + + val tDmyCnServlet = new ContainerServlet + tCnJettyServer = new JettyServer("/v1", tDmyCnServlet) + tCnJettyServer.start() + + val amLocation = new Location(InetAddress.getLocalHost, tAmJettyServer.getPort) + val cnLocation = new Location(InetAddress.getLocalHost, tCnJettyServer.getPort) + val location = new Location(InetAddress.getLocalHost, 9300) + val tJubatusConfig = new JubatusConfig(LearningMachineType.Classifier, "test", location, 5, 999) + Controller = new ContainerController("test", 0, amLocation, cnLocation, tJubatusConfig) + } + + override protected def afterAll(): Unit = { + dmyJubaServer.stop() + tAmJettyServer.stop() + tCnJettyServer.stop() + } + + "save()" should "save success" in { + val sFile = new java.io.File("/tmp/192.168.122.231_9300_classifier_test001.jubatus") + if (!sFile.exists()) { + sFile.createNewFile() + } + + val tHdfs = FileSystem.get(new YarnConfiguration()) + val tToHdfs = new Path("hdfs:///data/models/t1/test001/0.jubatus") + if (tHdfs.exists(tToHdfs)) { + tHdfs.delete(tToHdfs, false) + } + + try { + Controller.save("hdfs:///data/models/t1", "test001") + } catch { + case e: Throwable => + e.printStackTrace() + fail() + } + + tHdfs.exists(tToHdfs) shouldBe true + } + + it should "save error exception" in { + val sFile = new java.io.File("/tmp/192.168.122.231_9300_classifier_test001.jubatus") + if (sFile.exists()) { + sFile.delete() + } + + val tHdfs = FileSystem.get(new YarnConfiguration()) + val tToHdfs = new Path("hdfs:///data/models/t1/test001/0.jubatus") + if (tHdfs.exists(tToHdfs)) { + tHdfs.delete(tToHdfs, false) + } + + try { + Controller.save("hdfs:///data/models/t1", "test001") + } catch { + case e: Throwable => + e.printStackTrace() + } + + tHdfs.exists(tToHdfs) shouldBe false + } + + "load()" should "load success" in { + val dstFile = new java.io.File("/tmp/192.168.122.231_9300_classifier_test001.jubatus") + if (dstFile.exists()) { + dstFile.delete() + } + + val tHdfs = FileSystem.get(new YarnConfiguration()) + val srcPath = new Path("hdfs:///data/models/t1/test001") + if (!tHdfs.exists(srcPath)) { + tHdfs.mkdirs(srcPath) + } + val srcFile = new Path("hdfs:///data/models/t1/test001/0.jubatus") + if (!tHdfs.exists(srcFile)) { + tHdfs.create(srcFile) + } + + try { + Controller.load("hdfs:///data/models/t1", "test001") + } catch { + case e: Throwable => + e.printStackTrace() + fail() + } + + dstFile.exists() shouldBe true + } + + it should "load error exception" in { + val dstFile = new java.io.File("/tmp/192.168.122.231_9300_classifier_test001.jubatus") + if (dstFile.exists()) { + dstFile.delete() + } + + val tHdfs = FileSystem.get(new YarnConfiguration()) + val srcFile = new Path("hdfs:///data/models/t1/test001/0.jubatus") + if (tHdfs.exists(srcFile)) { + tHdfs.delete(srcFile, false) + } + + try { + Controller.load("hdfs:///data/models/t1", "test001") + } catch { + case e: Throwable => + e.printStackTrace() + } + + dstFile.exists() shouldBe false + } +} + +class DummyApplicationMasterServlet() extends RestServlet { + + put("/:seq/status") { + logger.info( + s"""put("/:seq=${params("seq")}/status") is called. + |${request.body} + """.stripMargin) + "OK" + } + + put("/:seq/location") { + logger.info( + s"""put("/:seq=${params("seq")}/location") is called. + |${request.body} + """.stripMargin) + "OK" + } +} + +class DummyJubatusServer { + var server: org.msgpack.rpc.Server = null + + class JubaServer { + def get_status(): java.util.Map[String, java.util.Map[String, String]] = { + var ret: java.util.Map[String, java.util.Map[String, String]] = new java.util.HashMap() + var ret2: java.util.Map[String, String] = new java.util.HashMap() + ret2.put("datadir", "file:///tmp") + ret2.put("type", "classifier") + ret.put("192.168.122.231_9300", ret2) + ret + } + + } + + def start(id: Int) { + server = new org.msgpack.rpc.Server() + server.serve(new JubaServer()) + server.listen(new InetSocketAddress(id)) + println("*** DummyJubatusServer start ***") + } + + def stop() { + server.close() + println("*** DummyJubatusServer stop ***") + } +} +