Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) add API to specify server/proxy configuration #13

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jubatusonyarn/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ val SCALA_VERSION = "2.10.4"
val VERSION = "1.1"

val JUBATUS_DEPENDENCIES = Seq(
("us.jubat" % "jubatus" % "0.7.1").exclude("org.jboss.netty", "netty"),
("us.jubat" % "jubatus" % "0.8.0").exclude("org.jboss.netty", "netty"),
"org.apache.hadoop" % "hadoop-common" % "2.3.0-cdh5.1.3" % "provided",
"org.apache.hadoop" % "hadoop-hdfs" % "2.3.0-cdh5.1.3" % "provided",
"org.apache.hadoop" % "hadoop-yarn-client" % "2.3.0-cdh5.1.3" % "provided"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ class ApplicationMaster extends HasLogger {
private def containerJarPath(aBasePath: Path): Path = new Path(aBasePath, "container/jubatus-on-yarn-container.jar")
private val containerJarName: String = "jubatus-on-yarn-container.jar"
private val containerMainClass: String = "us.jubat.yarn.container.ContainerApp"
private val containerMemory: Int = 128

private val mYarnConfig = new YarnConfiguration()

// 終了までブロックする
def run(aParams: ApplicationMasterParams, aApplicationMasterPort: Int): FinalApplicationStatus = {
logger.debug(s"ApplicationMasterParams (${aParams.toString()})")

val tHandler = new ApplicationMasterHandler(aParams, aApplicationMasterPort)

val tResourceManager = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](500, tHandler)
Expand All @@ -56,9 +57,18 @@ class ApplicationMaster extends HasLogger {
tPriority.setPriority(aParams.priority)

val tResource = Records.newRecord(classOf[Resource])
tResource.setMemory(aParams.memory + containerMemory)
tResource.setMemory(aParams.memory + aParams.containerMemory)
tResource.setVirtualCores(aParams.virtualCores)

var containerNodes: Array[String] = null
var containerRacks: Array[String] = null
if (!aParams.containerNodes.isEmpty()) {
containerNodes = aParams.containerNodes.split(",").toArray[String]
}
if (!aParams.containerRacks.isEmpty()) {
containerRacks = aParams.containerRacks.split(",").toArray[String]
}

// コンテナを起動
(1 to aParams.nodes).foreach { _ =>
logger.info(
Expand All @@ -67,7 +77,9 @@ class ApplicationMaster extends HasLogger {
+ s"\tmemory: ${tResource.getMemory}\n"
+ s"\tvirtualCores: ${tResource.getVirtualCores}"
)
tResourceManager.addContainerRequest(new ContainerRequest(tResource, null, null, tPriority))
val containerReq = new ContainerRequest(tResource, containerNodes, containerRacks, tPriority)
tResourceManager.addContainerRequest(containerReq)
logger.debug(s"ContainerRequest( Nodes:${containerReq.getNodes}, Racks:${containerReq.getRacks}")
}

// コンテナの終了を待機
Expand Down Expand Up @@ -140,7 +152,7 @@ class ApplicationMaster extends HasLogger {
s"bash $entryScriptName"
+ s" $containerJarName"
+ s" $containerMainClass"
+ s" $containerMemory"
+ s" ${aParams.containerMemory}"

// jar にわたす
+ s" ${aParams.applicationName}" // --application-name
Expand All @@ -152,6 +164,13 @@ class ApplicationMaster extends HasLogger {
+ s" ${aParams.learningMachineName}" // --name
+ s" ${aParams.learningMachineType}" // juba{*}
+ s" ${aParams.zooKeepers}" // --zookeeper
+ s" ${aParams.thread}" // --thread
+ s" ${aParams.timeout}" // --timeout
+ s" ${aParams.mixer}" // --mixer
+ s" ${aParams.intervalSec}" // --interval_sec
+ s" ${aParams.intervalCount}" // --interval_count
+ s" ${aParams.zookeeperTimeout}" // --zookeeper_timeout
+ s" ${aParams.interconnectTimeout}" // --interconnect_timeout

+ s" 1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout"
+ s" 2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ class ApplicationMasterParams {
@org.kohsuke.args4j.Option(name = "--virtual-cores")
var virtualCores: Int = 1

@org.kohsuke.args4j.Option(name = "--container-memory")
var containerMemory: Int = 128

@org.kohsuke.args4j.Option(name = "--container-nodes")
var containerNodes: String = ""

@org.kohsuke.args4j.Option(name = "--container-racks")
var containerRacks: String = ""


@org.kohsuke.args4j.Option(name = "--learning-machine-name")
var learningMachineName: String = ""
Expand Down Expand Up @@ -148,4 +157,38 @@ class ApplicationMasterParams {

@org.kohsuke.args4j.Option(name = "--base-path")
var basePath: String = ""

@org.kohsuke.args4j.Option(name = "--thread")
var thread: Int = 2

@org.kohsuke.args4j.Option(name = "--timeout")
var timeout: Int = 10

@org.kohsuke.args4j.Option(name = "--mixer")
var mixer: String = "linear_mixer"

@org.kohsuke.args4j.Option(name = "--interval_sec")
var intervalSec: Int = 16

@org.kohsuke.args4j.Option(name = "--interval_count")
var intervalCount: Int = 512

@org.kohsuke.args4j.Option(name = "--zookeeper_timeout")
var zookeeperTimeout: Int = 10

@org.kohsuke.args4j.Option(name = "--interconnect_timeout")
var interconnectTimeout: Int = 10

override def toString(): String = {
val text = s"""applicationName: $applicationName, nodes: $nodes, priority: $priority,
memory: $memory, virtualCores: $virtualCores, containerMemory: $containerMemory,
containerNodes: $containerNodes, containerRacks: $containerRacks, learningMachineName: $learningMachineName,
learningMachineType: $learningMachineType, zooKeepers: $zooKeepers, managementAddress: $managementAddress,
managementPort: $managementPort, applicationMasterNodeAddress: $applicationMasterNodeAddress,
jubatusProxyPort: $jubatusProxyPort, jubatusProxyProcessId: $jubatusProxyProcessId,
thread: $thread, timeout: $timeout, mixer: $mixer, intervalSec: $intervalSec, intervalCount: $intervalCount,
zookeeperTimeout: $zookeeperTimeout, interconnectTimeout: $interconnectTimeout
""".stripMargin.trim
text
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ APPLICATION_MASTER_PORT="${7}"
LEARNING_MACHINE_NAME="${8}"
LEARNING_MACHINE_TYPE="${9}"
ZOOKEEPER="${10}"
THREAD="${11}"
TIMEOUT="${12}"
MIXER="${13}"
INTERVAL_SEC="${14}"
INTERVAL_COUNT="${15}"
ZOOKEEPER_TIMEOUT="${16}"
INTERCONNECT_TIMEOUT="${17}"

IP_ADDRESS=`grep $(hostname) /etc/hosts | awk '{print $1}'`
LISTEN_IF=`netstat -ie | grep -B1 ${IP_ADDRESS} | head -n1 | awk '{print $1}'`
Expand All @@ -27,8 +34,10 @@ for i in `seq 10`; do
fi
done

echo juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=10 --interval_count=0 --rpc-port=${JUBATUS_SERVER_PORT} --name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} >> /tmp/Container 2>&1
juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=10 --interval_count=0 --rpc-port=${JUBATUS_SERVER_PORT} --name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} &
echo juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=${INTERVAL_SEC} --interval_count=${INTERVAL_COUNT} --rpc-port=${JUBATUS_SERVER_PORT} \
--name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} --thread ${THREAD} --timeout ${TIMEOUT} --mixer ${MIXER} --zookeeper_timeout ${ZOOKEEPER_TIMEOUT} --interconnect_timeout ${INTERCONNECT_TIMEOUT}
juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=${INTERVAL_SEC} --interval_count=${INTERVAL_COUNT} --rpc-port=${JUBATUS_SERVER_PORT} \
--name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} --thread ${THREAD} --timeout ${TIMEOUT} --mixer ${MIXER} --zookeeper_timeout ${ZOOKEEPER_TIMEOUT} --interconnect_timeout ${INTERCONNECT_TIMEOUT} &
JUBATUS_SERVER_PROCESS_ID=$!

# jubatus server の起動待機
Expand All @@ -37,7 +46,7 @@ for i in `seq 10`; do
continue 2
fi

echo jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status >> /tmp/Container 2>&1
echo jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status
if (jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status \
| awk '/active '${LEARNING_MACHINE_NAME}' members:/ {flag=1; next} /active/ {flag=0} flag==1 {print}' \
| grep "^${IP_ADDRESS}_${JUBATUS_SERVER_PORT}$"); then
Expand All @@ -57,11 +66,11 @@ fi
echo $JAVA_HOME/bin/java -Xmx${CONTAINER_MEMORY_SIZE}M ${CONTAINER_JRA_MAIN_CLASS} --seq ${SEQ} \
--application-name ${APPLICATION_NAME} --application-master-address ${APPLICATION_MASTER_ADDRESS} --application-master-port ${APPLICATION_MASTER_PORT} \
--container-node-address ${IP_ADDRESS} --jubatus-server-port ${JUBATUS_SERVER_PORT} --jubatus-server-process-id ${JUBATUS_SERVER_PROCESS_ID} \
--learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} >> /tmp/Container 2>&1
--learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE}
$JAVA_HOME/bin/java -Xmx${CONTAINER_MEMORY_SIZE}M ${CONTAINER_JRA_MAIN_CLASS} --seq ${SEQ} \
--application-name ${APPLICATION_NAME} --application-master-address ${APPLICATION_MASTER_ADDRESS} --application-master-port ${APPLICATION_MASTER_PORT} \
--container-node-address ${IP_ADDRESS} --jubatus-server-port ${JUBATUS_SERVER_PORT} --jubatus-server-process-id ${JUBATUS_SERVER_PROCESS_ID} \
--learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} >> /tmp/Container 2>&1
--learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE}
EXIT_CODE=$?
ps ${JUBATUS_SERVER_PROCESS_ID} && kill ${JUBATUS_SERVER_PROCESS_ID}
exit $EXIT_CODE
Loading