From 47b5c92a213d67d81a056818f10d22c12f8ba37e Mon Sep 17 00:00:00 2001 From: yangw Date: Tue, 3 Sep 2019 21:52:35 +0800 Subject: [PATCH 01/16] add scala inference code --- export_for_scala.py | 50 ++++++++++ scala-inference/pom.xml | 111 ++++++++++++++++++++++ scala-inference/src/main/scala/Main.scala | 53 +++++++++++ 3 files changed, 214 insertions(+) create mode 100644 export_for_scala.py create mode 100644 scala-inference/pom.xml create mode 100644 scala-inference/src/main/scala/Main.scala diff --git a/export_for_scala.py b/export_for_scala.py new file mode 100644 index 0000000..a28d978 --- /dev/null +++ b/export_for_scala.py @@ -0,0 +1,50 @@ +import os + +from zoo import init_nncontext, Sample +import tensorflow as tf +import numpy as np +from data_utils import load_agg_selected_data_mem +from ARMem.config import Config +from ARMem.model import Model + +# to reproduce the results in test_mem_model.py +# please set PARALLELISM to 1 and BATCH_PER_THREAD to 1022 +from zoo.util.tf import export_tf + + +if __name__ == "__main__": + config = Config() + + config.latest_model=False + + model = Model(config) + + # init or get SparkContext + sc = init_nncontext() + + # create test data + _, _, test_x, _, _, test_y, _, _, test_m, test_dt = \ + load_agg_selected_data_mem(data_path=config.data_path, + x_len=config.x_len, + y_len=config.y_len, + foresight=config.foresight, + cell_ids=config.test_cell_ids, + dev_ratio=config.dev_ratio, + test_len=config.test_len, + seed=config.seed) + + test_x = np.concatenate([test_x] * 200, axis=0) + test_m = np.concatenate([test_m] * 200, axis=0) + + np.save("./data/test_x.npy", test_x) + np.save("./data/test_m.npy", test_m) + + model_dir = config.model_dir + + # export a TensorFlow model to frozen inference graph. + with tf.Session() as sess: + saver = tf.train.Saver() + saver.restore(sess, os.path.join(model_dir, config.model)) + + export_tf(sess, "./tfnet", inputs=[model.input_x, model.memories], outputs=[model.predictions]) + diff --git a/scala-inference/pom.xml b/scala-inference/pom.xml new file mode 100644 index 0000000..d8690b5 --- /dev/null +++ b/scala-inference/pom.xml @@ -0,0 +1,111 @@ + + + 4.0.0 + + com.intel.analytics + mem-inference + 1.0-SNAPSHOT + + + 1.8 + 2.11 + 2.11.8 + 2.2.4 + 2.1 + 2.1.1 + compile + UTF-8 + UTF-8 + + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.apache.spark + spark-core_${scala.major.version} + ${spark.version} + + + + org.apache.spark + spark-sql_${scala.major.version} + ${spark.version} + + + org.apache.spark + spark-hive_${scala.major.version} + ${spark.version} + + + org.apache.spark + spark-mllib_${scala.major.version} + ${spark.version} + + + + com.intel.analytics.zoo + analytics-zoo-bigdl_0.8.0-spark_${spark.version} + 0.5.1 + provided + + + org.jetbrains.bio + npy + 0.3.3 + + + org.jetbrains.kotlin + kotlin-stdlib + 1.0.1 + + + + + + src/main/scala + + + org.scala-tools + maven-scala-plugin + + + + + compile + + + + + + maven-assembly-plugin + + + + Main + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/scala-inference/src/main/scala/Main.scala b/scala-inference/src/main/scala/Main.scala new file mode 100644 index 0000000..8436e72 --- /dev/null +++ b/scala-inference/src/main/scala/Main.scala @@ -0,0 +1,53 @@ +import java.nio.file.Paths + +import com.intel.analytics.bigdl.tensor.Tensor +import com.intel.analytics.bigdl.utils.T +import com.intel.analytics.zoo.pipeline.api.net.TFNet +import org.jetbrains.bio.npy.NpyFile + +object Main { + + def main(args: Array[String]): Unit = { + + val tfnetPath = args(0) + val testXPath = args(1) + val testMPath = args(2) + val batchSize = args(3).toInt + + println(s"tfnet path is ${tfnetPath}") + println(s"test_x path is ${testXPath}") + println(s"test_m path is ${testMPath}") + println(s"batch size is ${batchSize}") + + + val xPath: java.nio.file.Path = Paths.get(testXPath) + + val testX = NpyFile.read(xPath, Int.MaxValue) + + val mPath: java.nio.file.Path = Paths.get(testMPath) + + val testM = NpyFile.read(mPath, Int.MaxValue) + + val inputXData = Tensor[Float](testX.asFloatArray(), testX.getShape) + val inputMData = Tensor[Float](testM.asFloatArray(), testM.getShape) + + val length = testM.getShape()(0) + + val tfnet = TFNet(tfnetPath, TFNet.SessionConfig(0, 0)) + + val start = System.nanoTime() + val outputData = Tensor[Float](length, 8) + var i = 0 + while (i < Math.ceil(length / (1.0 * batchSize))) { + val input = T(inputXData.narrow(1, batchSize * i + 1, Math.min(batchSize, length - batchSize * i)), + inputMData.narrow(1, batchSize * i + 1, Math.min(batchSize, length - batchSize * i))) + + val result = tfnet.forward(input) + outputData.narrow(1, batchSize * i + 1, Math.min(batchSize, length - batchSize * i)).copy(result.toTensor) + i += 1 + } + + val end = System.nanoTime() + println(s"time is ${(end - start)/1.0e9}s") + } +} \ No newline at end of file From 79b51380f1465a2c1a0494ea1c708b1136ed9e4d Mon Sep 17 00:00:00 2001 From: yangw Date: Wed, 4 Sep 2019 16:23:41 +0800 Subject: [PATCH 02/16] add code and instructions --- export_for_scala.py | 9 ++-- scala-inference/README.md | 10 +++++ scala-inference/bin/run-python-export.sh | 14 +++++++ scala-inference/bin/run-scala-inference.sh | 14 +++++++ .../bin/spark-submit-scala-with-zoo.sh | 41 +++++++++++++++++++ 5 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 scala-inference/README.md create mode 100644 scala-inference/bin/run-python-export.sh create mode 100644 scala-inference/bin/run-scala-inference.sh create mode 100644 scala-inference/bin/spark-submit-scala-with-zoo.sh diff --git a/export_for_scala.py b/export_for_scala.py index a28d978..d4bf148 100644 --- a/export_for_scala.py +++ b/export_for_scala.py @@ -10,7 +10,8 @@ # to reproduce the results in test_mem_model.py # please set PARALLELISM to 1 and BATCH_PER_THREAD to 1022 from zoo.util.tf import export_tf - +import os +dir_path = os.path.dirname(os.path.realpath(__file__)) if __name__ == "__main__": config = Config() @@ -36,8 +37,8 @@ test_x = np.concatenate([test_x] * 200, axis=0) test_m = np.concatenate([test_m] * 200, axis=0) - np.save("./data/test_x.npy", test_x) - np.save("./data/test_m.npy", test_m) + np.save(os.path.join(dir_path, "data/test_x.npy"), test_x) + np.save(os.path.join(dir_path, "data/test_m.npy"), test_m) model_dir = config.model_dir @@ -46,5 +47,5 @@ saver = tf.train.Saver() saver.restore(sess, os.path.join(model_dir, config.model)) - export_tf(sess, "./tfnet", inputs=[model.input_x, model.memories], outputs=[model.predictions]) + export_tf(sess, os.path.join(dir_path, "tfnet"), inputs=[model.input_x, model.memories], outputs=[model.predictions]) diff --git a/scala-inference/README.md b/scala-inference/README.md new file mode 100644 index 0000000..0c86f5b --- /dev/null +++ b/scala-inference/README.md @@ -0,0 +1,10 @@ +## Steps to run scala inference code + +You will need Spark, Analytics-Zoo and TensorFlow as described in `inference_mem_model_zoo_readme.md` + +1. go to directory scala-inference and run `mvn package` and it will generate a few jar files the `target` directory. + +2. run `bash bin/run-python-export.sh` to export the tensorflow model to a directory named tfnet under the project root +directory and export the preprocessed data into the data directory. + +3. run `bash bin/run-scala-inference.sh` to run inference benchmark in scala. \ No newline at end of file diff --git a/scala-inference/bin/run-python-export.sh b/scala-inference/bin/run-python-export.sh new file mode 100644 index 0000000..ea3db72 --- /dev/null +++ b/scala-inference/bin/run-python-export.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +if [ -z "${ANALYTICS_ZOO_HOME}" ]; then + echo "please first download analytics zoo and set ANALYTICS_ZOO_HOME" + exit 1 +fi + +full_path=$(realpath $0) +dir_path=$(dirname $full_path) + +bash $ANALYTICS_ZOO_HOME/bin/spark-submit-with-zoo.sh $dir_path/../../export_for_scala.py + + + diff --git a/scala-inference/bin/run-scala-inference.sh b/scala-inference/bin/run-scala-inference.sh new file mode 100644 index 0000000..b6492b2 --- /dev/null +++ b/scala-inference/bin/run-scala-inference.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +if [ -z "${ANALYTICS_ZOO_HOME}" ]; then + echo "please first download analytics zoo and set ANALYTICS_ZOO_HOME" + exit 1 +fi + +full_path=$(realpath $0) +dir_path=$(dirname $full_path) + +bash $dir_path/spark-submit-scala-with-zoo.sh --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 + + + diff --git a/scala-inference/bin/spark-submit-scala-with-zoo.sh b/scala-inference/bin/spark-submit-scala-with-zoo.sh new file mode 100644 index 0000000..daf4b88 --- /dev/null +++ b/scala-inference/bin/spark-submit-scala-with-zoo.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +# Check environment variables +if [ -z "${ANALYTICS_ZOO_HOME}" ]; then + echo "Please set ANALYTICS_ZOO_HOME environment variable" + exit 1 +fi + +if [ -z "${SPARK_HOME}" ]; then + echo "Please set SPARK_HOME environment variable" + exit 1 +fi + +# setup paths +export ANALYTICS_ZOO_JAR=`find ${ANALYTICS_ZOO_HOME}/lib -type f -name "analytics-zoo*jar-with-dependencies.jar"` +export ANALYTICS_ZOO_PY_ZIP=`find ${ANALYTICS_ZOO_HOME}/lib -type f -name "analytics-zoo*python-api.zip"` +export ANALYTICS_ZOO_CONF=${ANALYTICS_ZOO_HOME}/conf/spark-analytics-zoo.conf +export PYTHONPATH=${ANALYTICS_ZOO_PY_ZIP}:${PYTHONPATH} + +# Check files +if [ ! -f ${ANALYTICS_ZOO_CONF} ]; then + echo "Cannot find ${ANALYTICS_ZOO_CONF}" + exit 1 +fi + +if [ ! -f ${ANALYTICS_ZOO_PY_ZIP} ]; then + echo "Cannot find ${ANALYTICS_ZOO_PY_ZIP}" + exit 1 +fi + +if [ ! -f ${ANALYTICS_ZOO_JAR} ]; then + echo "Cannot find ${ANALYTICS_ZOO_JAR}" + exit 1 +fi + +${SPARK_HOME}/bin/spark-submit \ + --properties-file ${ANALYTICS_ZOO_CONF} \ + --jars ${ANALYTICS_ZOO_JAR} \ + --conf spark.driver.extraClassPath=${ANALYTICS_ZOO_JAR} \ + --conf spark.executor.extraClassPath=${ANALYTICS_ZOO_JAR} \ + $* From 1f989f87652fb786cd1973fe9611bf2b7c5c2be8 Mon Sep 17 00:00:00 2001 From: yangw Date: Fri, 6 Sep 2019 10:38:26 +0800 Subject: [PATCH 03/16] add test for spark --- scala-inference/bin/run-scala-inference.sh | 2 +- scala-inference/src/main/scala/Main.scala | 87 +++++++++++++++++++--- 2 files changed, 77 insertions(+), 12 deletions(-) diff --git a/scala-inference/bin/run-scala-inference.sh b/scala-inference/bin/run-scala-inference.sh index b6492b2..079c493 100644 --- a/scala-inference/bin/run-scala-inference.sh +++ b/scala-inference/bin/run-scala-inference.sh @@ -8,7 +8,7 @@ fi full_path=$(realpath $0) dir_path=$(dirname $full_path) -bash $dir_path/spark-submit-scala-with-zoo.sh --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 +bash $dir_path/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 64 true diff --git a/scala-inference/src/main/scala/Main.scala b/scala-inference/src/main/scala/Main.scala index 8436e72..93d6299 100644 --- a/scala-inference/src/main/scala/Main.scala +++ b/scala-inference/src/main/scala/Main.scala @@ -1,24 +1,16 @@ import java.nio.file.Paths +import com.intel.analytics.bigdl.models.utils.ModelBroadcast import com.intel.analytics.bigdl.tensor.Tensor import com.intel.analytics.bigdl.utils.T +import com.intel.analytics.zoo.common.NNContext import com.intel.analytics.zoo.pipeline.api.net.TFNet import org.jetbrains.bio.npy.NpyFile object Main { - def main(args: Array[String]): Unit = { - - val tfnetPath = args(0) - val testXPath = args(1) - val testMPath = args(2) - val batchSize = args(3).toInt - - println(s"tfnet path is ${tfnetPath}") - println(s"test_x path is ${testXPath}") - println(s"test_m path is ${testMPath}") - println(s"batch size is ${batchSize}") + def testForLocal(tfnetPath: String, testXPath: String, testMPath: String, batchSize: Int) = { val xPath: java.nio.file.Path = Paths.get(testXPath) @@ -49,5 +41,78 @@ object Main { val end = System.nanoTime() println(s"time is ${(end - start)/1.0e9}s") + + } + + def testForSpark(tfnetPath: String, testXPath: String, testMPath: String, batchSize: Int) = { + + val sc = NNContext.initNNContext() + + val xPath: java.nio.file.Path = Paths.get(testXPath) + + val testX = NpyFile.read(xPath, Int.MaxValue) + + val mPath: java.nio.file.Path = Paths.get(testMPath) + + val testM = NpyFile.read(mPath, Int.MaxValue) + + val length = testM.getShape()(0) + + val tfnet = TFNet(tfnetPath) + + val btfnet = ModelBroadcast[Float]().broadcast(sc, tfnet) + + val batchedX = testX.asFloatArray().grouped(batchSize * 10 * 8).toSeq + + val batchedM = testM.asFloatArray().grouped(batchSize * 77 * 8).toSeq + + val localInput = batchedX.zip(batchedM).map { case (x, m) => + val inputXData = Tensor[Float](x, Array(x.length/10/8, 10, 8)) + val inputMData = Tensor[Float](m, Array(m.length/77/8, 77, 8)) + val input = T(inputXData, inputMData) + input + } + + val inputRdd = sc.parallelize(localInput, 4).cache() + inputRdd.count() + + + val modelRdd = inputRdd.mapPartitions { part => + val localTFNet = btfnet.value() + Iterator.single(localTFNet) + }.cache() + + modelRdd.count() + + val results = inputRdd.zipPartitions(modelRdd) { case (dataIter, modelIter) => + val model = modelIter.next() + val result = dataIter.toArray.map(model.forward) + result.toIterator + } + + var start = System.nanoTime() + results.collect() + var end = System.nanoTime() + println(s"time is ${(end - start)/1.0e9}s") + } + + def main(args: Array[String]): Unit = { + + val tfnetPath = args(0) + val testXPath = args(1) + val testMPath = args(2) + val batchSize = args(3).toInt + val islocal = args(4).toBoolean + + println(s"tfnet path is ${tfnetPath}") + println(s"test_x path is ${testXPath}") + println(s"test_m path is ${testMPath}") + println(s"batch size is ${batchSize}") + + if (islocal) { + testForLocal(tfnetPath, testXPath, testMPath, batchSize) + } else { + testForSpark(tfnetPath, testXPath, testMPath, batchSize) + } } } \ No newline at end of file From 2f0bcdf4dafc6168bb8d20491fd08c14da4a0ab8 Mon Sep 17 00:00:00 2001 From: yangw Date: Mon, 9 Sep 2019 11:13:47 +0800 Subject: [PATCH 04/16] add pure inference time tracking --- scala-inference/bin/run-scala-inference.sh | 2 +- scala-inference/src/main/scala/Main.scala | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/scala-inference/bin/run-scala-inference.sh b/scala-inference/bin/run-scala-inference.sh index 079c493..32a46d8 100644 --- a/scala-inference/bin/run-scala-inference.sh +++ b/scala-inference/bin/run-scala-inference.sh @@ -8,7 +8,7 @@ fi full_path=$(realpath $0) dir_path=$(dirname $full_path) -bash $dir_path/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 64 true +bash $dir_path/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 false diff --git a/scala-inference/src/main/scala/Main.scala b/scala-inference/src/main/scala/Main.scala index 93d6299..049b126 100644 --- a/scala-inference/src/main/scala/Main.scala +++ b/scala-inference/src/main/scala/Main.scala @@ -84,16 +84,22 @@ object Main { modelRdd.count() - val results = inputRdd.zipPartitions(modelRdd) { case (dataIter, modelIter) => + val resultsRDD = inputRdd.zipPartitions(modelRdd) { case (dataIter, modelIter) => + val start = System.nanoTime() val model = modelIter.next() val result = dataIter.toArray.map(model.forward) - result.toIterator + val iter = result.toIterator + val end = System.nanoTime() + println(s"pure inference time in partition is ${(end - start)/1.0e9}s") + iter ++ Array(Tensor.scalar[Float](end - start)) } var start = System.nanoTime() - results.collect() + val results = resultsRDD.collect() var end = System.nanoTime() - println(s"time is ${(end - start)/1.0e9}s") + println(s"spark job time is ${(end - start)/1.0e9}s") + val eachPartitionTime = results.map(_.toTensor[Float]).filter(_.isScalar).map(_.value()) + println(s"Max pure inference time in each partition ${eachPartitionTime.max/1.0e9}") } def main(args: Array[String]): Unit = { @@ -115,4 +121,4 @@ object Main { testForSpark(tfnetPath, testXPath, testMPath, batchSize) } } -} \ No newline at end of file +} From 546a61f4d4189a972cc6569d8ec67b42ebbe2270 Mon Sep 17 00:00:00 2001 From: yangw Date: Mon, 9 Sep 2019 13:35:03 +0800 Subject: [PATCH 05/16] fix --- scala-inference/src/main/scala/Main.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/scala-inference/src/main/scala/Main.scala b/scala-inference/src/main/scala/Main.scala index 049b126..280f985 100644 --- a/scala-inference/src/main/scala/Main.scala +++ b/scala-inference/src/main/scala/Main.scala @@ -94,10 +94,7 @@ object Main { iter ++ Array(Tensor.scalar[Float](end - start)) } - var start = System.nanoTime() val results = resultsRDD.collect() - var end = System.nanoTime() - println(s"spark job time is ${(end - start)/1.0e9}s") val eachPartitionTime = results.map(_.toTensor[Float]).filter(_.isScalar).map(_.value()) println(s"Max pure inference time in each partition ${eachPartitionTime.max/1.0e9}") } From 384b9dbf2ad9d88e4d468d27fcd48c167fed99ef Mon Sep 17 00:00:00 2001 From: yangw Date: Tue, 10 Sep 2019 11:28:25 +0800 Subject: [PATCH 06/16] address comments --- scala-inference/pom.xml | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/scala-inference/pom.xml b/scala-inference/pom.xml index d8690b5..8832792 100644 --- a/scala-inference/pom.xml +++ b/scala-inference/pom.xml @@ -13,13 +13,21 @@ 2.11 2.11.8 2.2.4 - 2.1 - 2.1.1 + 2.3 + 2.3.1 compile UTF-8 UTF-8 + + + spring-lib-release + Spring Lib Release Repository + https://repo.spring.io/libs-release + + + org.scala-lang From 9bd442773d288a39b5ca7c59e6f92da95d34110a Mon Sep 17 00:00:00 2001 From: yangw Date: Wed, 11 Sep 2019 11:22:21 +0800 Subject: [PATCH 07/16] add mkl options --- .../bin/spark-submit-scala-with-zoo.sh | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/scala-inference/bin/spark-submit-scala-with-zoo.sh b/scala-inference/bin/spark-submit-scala-with-zoo.sh index daf4b88..62f13f6 100644 --- a/scala-inference/bin/spark-submit-scala-with-zoo.sh +++ b/scala-inference/bin/spark-submit-scala-with-zoo.sh @@ -17,6 +17,33 @@ export ANALYTICS_ZOO_PY_ZIP=`find ${ANALYTICS_ZOO_HOME}/lib -type f -name "analy export ANALYTICS_ZOO_CONF=${ANALYTICS_ZOO_HOME}/conf/spark-analytics-zoo.conf export PYTHONPATH=${ANALYTICS_ZOO_PY_ZIP}:${PYTHONPATH} +if [ -z "${KMP_AFFINITY}" ]; then + export KMP_AFFINITY=granularity=fine,compact,1,0 +fi + +if [ -z "${OMP_NUM_THREADS}" ]; then + if [ -z "${ZOO_NUM_MKLTHREADS}" ]; then + export OMP_NUM_THREADS=1 + else + if [ `echo $ZOO_NUM_MKLTHREADS | tr '[A-Z]' '[a-z]'` == "all" ]; then + export OMP_NUM_THREADS=`nproc` + else + export OMP_NUM_THREADS=${ZOO_NUM_MKLTHREADS} + fi + fi +fi + +if [ -z "${KMP_BLOCKTIME}" ]; then + export KMP_BLOCKTIME=0 +fi + +# verbose for OpenMP +if [[ $* == *"verbose"* ]]; then + export KMP_SETTINGS=1 + export KMP_AFFINITY=${KMP_AFFINITY},verbose +fi + + # Check files if [ ! -f ${ANALYTICS_ZOO_CONF} ]; then echo "Cannot find ${ANALYTICS_ZOO_CONF}" From 6033c857b30b8ff12d76558a2ff5916e41e3d398 Mon Sep 17 00:00:00 2001 From: yangw Date: Wed, 30 Oct 2019 10:47:50 +0800 Subject: [PATCH 08/16] add training --- ARMem/config.py | 2 +- ARMem/model.py | 14 ++++-- data_utils.py | 6 +-- run_inference_mem_model_zoo.sh | 2 +- scala-inference/bin/run-scala-inference.sh | 2 +- train_mem_model_zoo.py | 57 ++++++++++++++++++++++ 6 files changed, 73 insertions(+), 10 deletions(-) create mode 100644 train_mem_model_zoo.py diff --git a/ARMem/config.py b/ARMem/config.py index fc8daa3..4ffb872 100644 --- a/ARMem/config.py +++ b/ARMem/config.py @@ -13,7 +13,7 @@ def __init__(self): self.ar_g = 1 # data params - self.data_path = '../data/aggregated_5min_scaled.csv' + self.data_path = '/home/yang/sources/ARMemNet-BigDL/data/aggregated_5min_scaled.csv' self.nfeatures = 8 # number of col_list in "../config_preprocess.py" self.x_len = self.nsteps self.y_len = 1 diff --git a/ARMem/model.py b/ARMem/model.py index a78bfa4..99c4889 100644 --- a/ARMem/model.py +++ b/ARMem/model.py @@ -4,12 +4,15 @@ # AR_memory class Model(object): - def __init__(self, config): + def __init__(self, config, input_x=None, memories=None, targets=None): self.config = config self.global_step = tf.Variable(0, trainable=False, name="global_step") self.regularizer = layers.l2_regularizer(self.config.l2_lambda) self.sess = None self.saver = None + self.input_x = input_x + self.memories = memories + self.targets = targets self._build_model() def _build_model(self): @@ -54,9 +57,12 @@ def _build_model(self): self.initialize_session() def add_placeholder(self): - self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x") - self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets") - self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32, + if self.input_x is None: + self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x") + if self.targets is None: + self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets") + if self.memories is None: + self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32, name="memories") # self.targets = tf.placeholder(shape=[None], dtype=tf.int32, name="targets") self.dropout = tf.placeholder(dtype=tf.float32, name="dropout") diff --git a/data_utils.py b/data_utils.py index 5c5a5f7..0f0b4b2 100644 --- a/data_utils.py +++ b/data_utils.py @@ -16,7 +16,7 @@ def batch_loader(iterable, batch_size, shuffle=False): def load_agg_data( - data_path='../data/aggregated_5min_scaled.csv', + data_path='/home/yang/sources/ARMemNet-BigDL/data/aggregated_5min_scaled.csv', x_len=10, y_len=1, ncells=20, @@ -125,7 +125,7 @@ def load_agg_data( def load_agg_selected_data_mem( - data_path='../data/aggregated_5min_scaled.csv', + data_path='/home/yang/sources/ARMemNet-BigDL/data/aggregated_5min_scaled.csv', x_len=10, y_len=1, mem_len=7, @@ -302,7 +302,7 @@ def _time_concat(arg): return train_x, dev_x, te_x, train_y, dev_y, te_y, train_m, dev_m, te_m, test_dt -def load_agg_data_all(data_path='../data/aggregated_data_5min_scaled.csv', ncells=20, test_len=7): +def load_agg_data_all(data_path='/home/yang/sources/ARMemNet-BigDL/data/aggregated_data_5min_scaled.csv', ncells=20, test_len=7): data = pd.read_csv(data_path, index_col=0) data.index = pd.to_datetime(data.index) diff --git a/run_inference_mem_model_zoo.sh b/run_inference_mem_model_zoo.sh index 006dc35..248d4ce 100644 --- a/run_inference_mem_model_zoo.sh +++ b/run_inference_mem_model_zoo.sh @@ -5,4 +5,4 @@ if [ -z "${ANALYTICS_ZOO_HOME}" ]; then fi # bash $ANALYTICS_ZOO_HOME/bin/spark-submit-with-zoo.sh --master local[4] inference_mem_model_zoo.py -bash $ANALYTICS_ZOO_HOME/bin/spark-submit-with-zoo.sh --master local[36] --driver-memory 32g inference_mem_model_zoo.py +bash $ANALYTICS_ZOO_HOME/bin/spark-submit-python-with-zoo.sh --master local[36] --driver-memory 32g train_mem_model_zoo.py diff --git a/scala-inference/bin/run-scala-inference.sh b/scala-inference/bin/run-scala-inference.sh index 32a46d8..4ae3d2a 100644 --- a/scala-inference/bin/run-scala-inference.sh +++ b/scala-inference/bin/run-scala-inference.sh @@ -8,7 +8,7 @@ fi full_path=$(realpath $0) dir_path=$(dirname $full_path) -bash $dir_path/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 false +bash $ANALYTICS_ZOO_HOME/bin/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 false diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py new file mode 100644 index 0000000..384e802 --- /dev/null +++ b/train_mem_model_zoo.py @@ -0,0 +1,57 @@ +import tensorflow as tf +from tensorflow.contrib import layers +import os +from utils import make_date_dir, find_latest_dir + +import sys +from zoo.pipeline.api.net import TFNet +from zoo import init_nncontext, Sample +from zoo.tfpark import TFOptimizer, TFDataset +from bigdl.optim.optimizer import * +import tensorflow as tf +import numpy as np +from data_utils import load_agg_selected_data_mem +from ARMem.config import Config +from ARMem.model import Model + +# to reproduce the results in test_mem_model.py +# please set PARALLELISM to 1 and BATCH_PER_THREAD to 1022 +PARALLELISM=4 +BATCH_PER_THREAD=3200 + + +if __name__ == "__main__": + config = Config() + + config.latest_model=False + + # init or get SparkContext + sc = init_nncontext() + + # create test data + train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \ + load_agg_selected_data_mem(data_path=config.data_path, + x_len=config.x_len, + y_len=config.y_len, + foresight=config.foresight, + cell_ids=config.test_cell_ids, + dev_ratio=config.dev_ratio, + test_len=config.test_len, + seed=config.seed) + + model_dir = config.model_dir + + dataset = TFDataset.from_ndarrays([train_x, train_m, train_y], batch_size=2700, val_tensors=[dev_x, dev_m, dev_y],) + + model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) + optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae}) + + optimizer.optimize(end_trigger=MaxEpoch(15000)) + + + + + + + + From 96e673b1ff56e357296de5a68a84f3c58e95bf31 Mon Sep 17 00:00:00 2001 From: jenniew Date: Thu, 7 Nov 2019 13:53:56 -0800 Subject: [PATCH 09/16] add scala training --- ARMem/config.py | 2 +- data_utils.py | 2 +- .../r2/ml/FlashBaseMLPipelineTrain.scala | 354 ++++++++++++++++++ 3 files changed, 356 insertions(+), 2 deletions(-) create mode 100644 flashbase-ml-pipeline/src/main/scala/com/skt/spark/r2/ml/FlashBaseMLPipelineTrain.scala diff --git a/ARMem/config.py b/ARMem/config.py index 4ffb872..0e5df5a 100644 --- a/ARMem/config.py +++ b/ARMem/config.py @@ -13,7 +13,7 @@ def __init__(self): self.ar_g = 1 # data params - self.data_path = '/home/yang/sources/ARMemNet-BigDL/data/aggregated_5min_scaled.csv' + self.data_path = '/home/jwang/git/ARMemNet-BigDL_jennie/data/aggregated_5min_scaled.csv' self.nfeatures = 8 # number of col_list in "../config_preprocess.py" self.x_len = self.nsteps self.y_len = 1 diff --git a/data_utils.py b/data_utils.py index 0f0b4b2..b443c48 100644 --- a/data_utils.py +++ b/data_utils.py @@ -125,7 +125,7 @@ def load_agg_data( def load_agg_selected_data_mem( - data_path='/home/yang/sources/ARMemNet-BigDL/data/aggregated_5min_scaled.csv', + data_path='/home/jwang/git/ARMemNet-BigDL_jennie/data/aggregated_5min_scaled.csv', x_len=10, y_len=1, mem_len=7, diff --git a/flashbase-ml-pipeline/src/main/scala/com/skt/spark/r2/ml/FlashBaseMLPipelineTrain.scala b/flashbase-ml-pipeline/src/main/scala/com/skt/spark/r2/ml/FlashBaseMLPipelineTrain.scala new file mode 100644 index 0000000..da7ce32 --- /dev/null +++ b/flashbase-ml-pipeline/src/main/scala/com/skt/spark/r2/ml/FlashBaseMLPipelineTrain.scala @@ -0,0 +1,354 @@ +package com.skt.spark.r2.ml + +import com.intel.analytics.bigdl.dataset.{ArraySample, Sample} +import com.intel.analytics.bigdl.models.utils.ModelBroadcast +import com.intel.analytics.bigdl.nn.{CrossEntropyCriterion, MSECriterion, TimeDistributedCriterion} +import com.intel.analytics.bigdl.nn.abstractnn.Activity +import com.intel.analytics.bigdl.optim._ +import com.intel.analytics.bigdl.tensor.Tensor +import com.intel.analytics.bigdl.utils.{T, Table} +import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric._ +import com.intel.analytics.zoo.common.NNContext +import com.intel.analytics.zoo.pipeline.api.net.TFNet +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.types._ + +import scala.collection.{immutable, mutable} + +object FlashBaseMLPipelineTrain { + + import DateUtil._ + + var sqlContext: SQLContext = _ + var sparkContext: SparkContext = _ + + def main(args: Array[String]) { + val tfNetPath = args(0) + val fbHost = args(1) + val fbPort = args(2).toInt + val batchSize = args(3).toInt + + sparkContext = NNContext.initNNContext("fb-ml-pipeline") + sqlContext = new SQLContext(sparkContext) + + val initialTime = "20190920094500" + val normR2Df = normalizedDF(createR2DataFrame(fbHost, fbPort)) + + var (start, end) = (0L, 0L) + start = System.nanoTime() + val cachedInputQueue = cacheInputFeaturesQueue(normR2Df, initialTime) + end = System.nanoTime() + println(s"The time of building Input-Queue is ${(end - start) / 1.0e9}s") + + start = System.nanoTime() + val cachedMemoryQueue = cacheMemoryFeaturesQueue(normR2Df, beforeDays(afterMinutes(initialTime, 5), 1)) + end = System.nanoTime() + println(s"The time of building Memory-Queue is ${(end - start) / 1.0e9}s") + + var avg = 0L + + start = System.nanoTime() + val trainRDD = buildSampleRDD( + buildInputFeatures(normR2Df, afterMinutes(initialTime, 5), cachedInputQueue), + buildMemoryFeatures(normR2Df, beforeDays(afterMinutes(initialTime, 10), 1), cachedMemoryQueue), + buildInputTargets(normR2Df, afterMinutes(initialTime, 5), cachedInputQueue)) + end = System.nanoTime() + // avg += (end - start) + // println(s"Try #${i + 1} : The time of pre-processing is ${(end - start) / 1.0e9}s") + + // println(s"AVG time of pre-processing is ${(avg / 5) / 1.0e9}s") + // avg = 0L + + start = System.nanoTime() + val model = TFNet(tfNetPath) + + val optimMethod = new Adam[Float]() + + val optimizer = Optimizer( + model = model, + sampleRDD = trainRDD, + criterion = MSECriterion[Float](), + batchSize = 1000 + ) + + optimizer + .setOptimMethod(optimMethod) + .setEndWhen(Trigger.maxEpoch(10)) + .optimize() + // val btfnet = ModelBroadcast[Float]().broadcast(sparkContext, TFNet(tfNetPath)) + // end = System.nanoTime() + // println(s"The time of broadcasting model is ${(end - start) / 1.0e9}s") + + // for (i <- 0 until 5) { + // start = System.nanoTime() + // buildInferenceRDD( + // buildTensorRDD( + // buildInputFeatures(normR2Df, afterMinutes(initialTime, 5), cachedInputQueue), + // buildMemoryFeatures(normR2Df, beforeDays(afterMinutes(initialTime, 10), 1), cachedMemoryQueue), + // batchSize), + // btfnet + // ).collect() + // end = System.nanoTime() + // avg += (end - start) + // println(s"Try #${i + 1} : The time of pre-processing and inference is ${(end - start) / 1.0e9}s") + // } + // println(s"AVG time of pre-processing and inference is ${(avg / 5) / 1.0e9}s") + } + + /** + * Cache a Map of ('cellID' -> 'buffer of recent 50 min features sorted by evt_dtm') + * into each partitions. + * Features are partitioned by unique CellId and distributed into Spark partitions. + */ + def cacheInputFeaturesQueue( + normR2Df: DataFrame, + time: String) + : RDD[immutable.Map[Int, mutable.ArrayBuffer[Array[Float]]]] = { + val filter = buildTimeFilter("evt_dtm", time, 45, 1) + val cachedInput = normR2Df.where(filter).rdd.map(row => (row.getInt(1), row)) + .aggregateByKey(new mutable.ArrayBuffer[(String, Array[Float])]())( + (buf, row) => { + val features = (for (i <- 2 until row.size) yield row.getFloat(i)).toArray + buf.append((row.getString(0), features)) + buf + }, (b1, b2) => b1 ++= b2) + .mapPartitions(it => Iterator.single(it.map { + case (cellId, featureBuf) => (cellId, featureBuf.sortBy(_._1).map(_._2)) + }.toMap), + preservesPartitioning = true) + .cache() + cachedInput.count() + + cachedInput + } + + /** + * Build a inputRDD(rdd of a Map of ('cellID' -> 'input Features')). + * It is built by concatenating new 5 min features with last 45 minutes features of cached + * input-features in each partitions. + */ + def buildInputFeatures( + normR2Df: DataFrame, + time: String, + cachedInputFeatures: RDD[Map[Int, mutable.ArrayBuffer[Array[Float]]]]) + : RDD[immutable.Map[Int, mutable.ArrayBuffer[Array[Float]]]] = { + val filter = buildTimeFilter("evt_dtm", time, 0, 1) + normR2Df.where(filter).rdd + .map(row => (row.getInt(1), row)) + .partitionBy(cachedInputFeatures.partitioner.get) + .zipPartitions(cachedInputFeatures, preservesPartitioning = true) { + (newRowIter, cached) => + if (newRowIter.isEmpty) { + cached + } else { + val cachedMap = cached.next() + val newMap = new mutable.HashMap[Int, mutable.ArrayBuffer[Array[Float]]]() + newRowIter.foreach { + newRow => + val buffer = cachedMap(newRow._1) + val features = (for (i <- 2 until newRow._2.size) yield newRow._2.getFloat(i)).toArray + newMap.put(newRow._1, buffer.drop(1) += features) + } + Iterator.single(newMap.toMap) + } + } + } + + /** + * Build a targetRDD(rdd of a Map of ('cellID' -> 'target features')). + * It is built by new 5 min features in each partitions. + */ + def buildInputTargets( + normR2Df: DataFrame, + time: String, + cachedInputFeatures: RDD[Map[Int, mutable.ArrayBuffer[Array[Float]]]]) + : RDD[immutable.Map[Int, Array[Float]]] = { + val filter = buildTimeFilter("evt_dtm", time, 0, 1) + normR2Df.where(filter).rdd + .map(row => (row.getInt(1), row)) + .partitionBy(cachedInputFeatures.partitioner.get) + .mapPartitions({ + it: Iterator[(Int, Row)] => + // val cachedMap = cached.next() + val newMap = new mutable.HashMap[Int, Array[Float]]() + it.foreach { + newRow => + // val buffer = cachedMap(newRow._1) + val features = (for (i <- 2 until newRow._2.size) yield newRow._2.getFloat(i)).toArray + newMap.put(newRow._1, features) + } + Iterator.single(newMap.toMap) + } + ) + } + + /** + * Cache a Map of ('cellID' -> '7 days x buffer of 55 min features sorted by evt_dtm') into each + * partitions. + * Features are partitioned by unique CellId and distributed into Spark partitions. + */ + def cacheMemoryFeaturesQueue( + normR2Df: DataFrame, + time: String) + : RDD[immutable.Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]] = { + val filter = buildTimeFilter("evt_dtm", time, 50, 7) + val cachedMemory = normR2Df.where(filter).rdd.map(row => (row.getInt(1), row)) + .aggregateByKey(new mutable.ArrayBuffer[(String, Array[Float])]())( + (buf, row) => { + val features = (for (i <- 2 until row.size) yield row.getFloat(i)).toArray + buf.append((row.getString(0), features)) // Tuple of (DT, Features) + buf + }, (b1, b2) => b1 ++= b2) + .mapPartitions({ + it: Iterator[(Int, mutable.ArrayBuffer[(String, Array[Float])])] => + Iterator.single[Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]]( + it.map { + case (cellId, fs) => (cellId, fs.sortBy(_._1).map(_._2).grouped(11).toArray) + }.toMap) + }, preservesPartitioning = true) + .cache() + + cachedMemory.count() + cachedMemory + } + + /** + * Build a memoryRDD(rdd of a Map of ('cellID' -> 'memory features')). + * It is built by concatenating new 7days x 5 min features with 7days x last 50 minutes features + * of cached memory feature in each partitions. + */ + def buildMemoryFeatures( + normR2Df: DataFrame, + time: String, + cachedMemory: RDD[immutable.Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]]) + : RDD[immutable.Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]] = { + val filter = buildTimeFilter("evt_dtm", time, 0, 7) + normR2Df.where(filter).rdd + .map(row => (row.getInt(1), row)) + .partitionBy(cachedMemory.partitioner.get) + .zipPartitions(cachedMemory, preservesPartitioning = true) { + (newRowIter, cachedMapIter) => + if (newRowIter.isEmpty) { + cachedMapIter + } else { + val cachedMap = cachedMapIter.next() + val newMemoryMap = new mutable.HashMap[Int, Array[mutable.ArrayBuffer[Array[Float]]]]() + newRowIter.toArray.groupBy(_._1).values.iterator.foreach { + newRows: Array[(Int, Row)] => + val cellId = newRows(0)._1 + val memories = cachedMap(cellId) + val newFeatures = newRows.map(_._2).sortBy(r => r.getString(0)).map { + row => (for (i <- 2 until row.size) yield row.getFloat(i)).toArray + } + val newMemory = Array.ofDim[mutable.ArrayBuffer[Array[Float]]](memories.length) + for (i <- memories.indices) { + newMemory(i) = memories(i).drop(1) + newMemory(i) += newFeatures(i) + } + newMemoryMap.put(cellId, newMemory) + } + Iterator.single(newMemoryMap.toMap) + } + } + } + + /** + * Build TensorRDD with inputRDD and memoryRDD. + * Input features and memory features are zipped and grouped with batchSize in each partitions. + */ + def buildSampleRDD( + inputRDD: RDD[immutable.Map[Int, mutable.ArrayBuffer[Array[Float]]]], + memoryRDD: RDD[immutable.Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]], + targetRDD: RDD[immutable.Map[Int, Array[Float]]] + ): RDD[Sample[Float]] = { + inputRDD.zipPartitions(memoryRDD, targetRDD, preservesPartitioning = true) { + (iterInput, iterMemory, iterTarget) => + val inputMap = iterInput.next() + val memoryMap = iterMemory.next() + val targetMap = iterTarget.next() + val pairedInputXMLabel = inputMap.map { + case (cellId: Int, inputFs: mutable.ArrayBuffer[Array[Float]]) => + val memoryFs = memoryMap(cellId) + val labelFs = targetMap(cellId) + (inputFs.flatten.toArray, memoryFs.flatMap(fs => fs.flatten), labelFs) + }.toArray + val xData = pairedInputXMLabel.flatMap(_._1) + val xTensor = Tensor[Float](xData, Array(xData.length / 10 / 8, 10, 8)) + val mData = pairedInputXMLabel.flatMap(_._2) + val mTensor = Tensor[Float](mData, Array(mData.length / 77 / 8, 77, 8)) + val label = pairedInputXMLabel.flatMap(_._3) + val labelTensor = Tensor[Float](label, Array(label.length / 8, 1, 8)) + val sample = ArraySample[Float](Array(xTensor, mTensor), labelTensor) + Iterator.single(sample) + } + } + + /** + * Create inferenceRDD which applies tfModel to the tensorRDD. + */ + def buildInferenceRDD(tensorRDD: RDD[Table], btfnet: ModelBroadcast[Float]): RDD[Activity] = { + tensorRDD.mapPartitions( + iterTable => { + iterTable.toArray.map(btfnet.value().forward).toIterator + }, preservesPartitioning = true) + } + + /** + * Create DataFrame which has FrashBase as data source. + */ + def createR2DataFrame(fbHost: String, fbPort: Int): DataFrame = { + val params = Map("table" -> "1", + "host" -> fbHost, + "port" -> fbPort.toString, + "partitions" -> "evt_dtm random", + "mode" -> "nvkvs", + "group_query_enabled" -> "no", + "group_size" -> "44", + "query_result_partition_cnt_limit" -> "400000000", + "query_result_task_row_cnt_limit" -> "10000000", + "query_result_total_row_cnt_limit" -> "2147483647", + "at_least_one_partition_enabled" -> "no") + + val fields = "evt_dtm,uniq_id,rsrp,rsrq,dl_prb_usage_rate,sinr,ue_tx_power,phr,ue_conn_tot_cnt,cqi,random" + .split(',') + .map { + case "evt_dtm" => StructField("evt_dtm", StringType) + case name@("uniq_id" | "random") => StructField(name, IntegerType) + case fieldName => StructField(fieldName, FloatType) + } + + val schema = StructType(fields) + val r2Df = sqlContext.read.format("r2") + .options(params) + .schema(schema) + .load() + .drop("random") + r2Df + } + + /** + * Build DataFrame which applies normalization to each features. + */ + def normalizedDF(dfToNorm: DataFrame): DataFrame = { + val minMaxMap = Map[String, (Float, Float)]( + "rsrp" -> (-121.0f, 0.0f), + "rsrq" -> (-20.0f, 0.0f), + "dl_prb_usage_rate" -> (1.0f, 99.57666778564453f), + "sinr" -> (-3.676666736602783f, 20.5f), + "ue_tx_power" -> (-10.943333625793457f, 23.0f), + "phr" -> (0.5f, 52.91666793823242f), + "ue_conn_tot_cnt" -> (0.0f, 144.63333129882812f), + "cqi" -> (1.9620689153671265f, 14.984615325927734f) + ) + + var df = dfToNorm + for (col <- minMaxMap) { + val colName = col._1 + val (min, max) = col._2 + df = df.withColumn(colName, (((df(colName) - min) * 2.0f / (max - min)) + -1.0f).cast(FloatType)) + } + df + } +} From 413a32b0d96f69de73cd4e8c9c4dad80314b7bc4 Mon Sep 17 00:00:00 2001 From: jenniew Date: Mon, 11 Nov 2019 15:23:51 -0800 Subject: [PATCH 10/16] add train code of zoo --- .../r2/ml/FlashBaseMLPipelineTrain.scala | 354 ------------------ train_mem_model_zoo.py | 29 +- 2 files changed, 10 insertions(+), 373 deletions(-) delete mode 100644 flashbase-ml-pipeline/src/main/scala/com/skt/spark/r2/ml/FlashBaseMLPipelineTrain.scala diff --git a/flashbase-ml-pipeline/src/main/scala/com/skt/spark/r2/ml/FlashBaseMLPipelineTrain.scala b/flashbase-ml-pipeline/src/main/scala/com/skt/spark/r2/ml/FlashBaseMLPipelineTrain.scala deleted file mode 100644 index da7ce32..0000000 --- a/flashbase-ml-pipeline/src/main/scala/com/skt/spark/r2/ml/FlashBaseMLPipelineTrain.scala +++ /dev/null @@ -1,354 +0,0 @@ -package com.skt.spark.r2.ml - -import com.intel.analytics.bigdl.dataset.{ArraySample, Sample} -import com.intel.analytics.bigdl.models.utils.ModelBroadcast -import com.intel.analytics.bigdl.nn.{CrossEntropyCriterion, MSECriterion, TimeDistributedCriterion} -import com.intel.analytics.bigdl.nn.abstractnn.Activity -import com.intel.analytics.bigdl.optim._ -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.utils.{T, Table} -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric._ -import com.intel.analytics.zoo.common.NNContext -import com.intel.analytics.zoo.pipeline.api.net.TFNet -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.apache.spark.sql.types._ - -import scala.collection.{immutable, mutable} - -object FlashBaseMLPipelineTrain { - - import DateUtil._ - - var sqlContext: SQLContext = _ - var sparkContext: SparkContext = _ - - def main(args: Array[String]) { - val tfNetPath = args(0) - val fbHost = args(1) - val fbPort = args(2).toInt - val batchSize = args(3).toInt - - sparkContext = NNContext.initNNContext("fb-ml-pipeline") - sqlContext = new SQLContext(sparkContext) - - val initialTime = "20190920094500" - val normR2Df = normalizedDF(createR2DataFrame(fbHost, fbPort)) - - var (start, end) = (0L, 0L) - start = System.nanoTime() - val cachedInputQueue = cacheInputFeaturesQueue(normR2Df, initialTime) - end = System.nanoTime() - println(s"The time of building Input-Queue is ${(end - start) / 1.0e9}s") - - start = System.nanoTime() - val cachedMemoryQueue = cacheMemoryFeaturesQueue(normR2Df, beforeDays(afterMinutes(initialTime, 5), 1)) - end = System.nanoTime() - println(s"The time of building Memory-Queue is ${(end - start) / 1.0e9}s") - - var avg = 0L - - start = System.nanoTime() - val trainRDD = buildSampleRDD( - buildInputFeatures(normR2Df, afterMinutes(initialTime, 5), cachedInputQueue), - buildMemoryFeatures(normR2Df, beforeDays(afterMinutes(initialTime, 10), 1), cachedMemoryQueue), - buildInputTargets(normR2Df, afterMinutes(initialTime, 5), cachedInputQueue)) - end = System.nanoTime() - // avg += (end - start) - // println(s"Try #${i + 1} : The time of pre-processing is ${(end - start) / 1.0e9}s") - - // println(s"AVG time of pre-processing is ${(avg / 5) / 1.0e9}s") - // avg = 0L - - start = System.nanoTime() - val model = TFNet(tfNetPath) - - val optimMethod = new Adam[Float]() - - val optimizer = Optimizer( - model = model, - sampleRDD = trainRDD, - criterion = MSECriterion[Float](), - batchSize = 1000 - ) - - optimizer - .setOptimMethod(optimMethod) - .setEndWhen(Trigger.maxEpoch(10)) - .optimize() - // val btfnet = ModelBroadcast[Float]().broadcast(sparkContext, TFNet(tfNetPath)) - // end = System.nanoTime() - // println(s"The time of broadcasting model is ${(end - start) / 1.0e9}s") - - // for (i <- 0 until 5) { - // start = System.nanoTime() - // buildInferenceRDD( - // buildTensorRDD( - // buildInputFeatures(normR2Df, afterMinutes(initialTime, 5), cachedInputQueue), - // buildMemoryFeatures(normR2Df, beforeDays(afterMinutes(initialTime, 10), 1), cachedMemoryQueue), - // batchSize), - // btfnet - // ).collect() - // end = System.nanoTime() - // avg += (end - start) - // println(s"Try #${i + 1} : The time of pre-processing and inference is ${(end - start) / 1.0e9}s") - // } - // println(s"AVG time of pre-processing and inference is ${(avg / 5) / 1.0e9}s") - } - - /** - * Cache a Map of ('cellID' -> 'buffer of recent 50 min features sorted by evt_dtm') - * into each partitions. - * Features are partitioned by unique CellId and distributed into Spark partitions. - */ - def cacheInputFeaturesQueue( - normR2Df: DataFrame, - time: String) - : RDD[immutable.Map[Int, mutable.ArrayBuffer[Array[Float]]]] = { - val filter = buildTimeFilter("evt_dtm", time, 45, 1) - val cachedInput = normR2Df.where(filter).rdd.map(row => (row.getInt(1), row)) - .aggregateByKey(new mutable.ArrayBuffer[(String, Array[Float])]())( - (buf, row) => { - val features = (for (i <- 2 until row.size) yield row.getFloat(i)).toArray - buf.append((row.getString(0), features)) - buf - }, (b1, b2) => b1 ++= b2) - .mapPartitions(it => Iterator.single(it.map { - case (cellId, featureBuf) => (cellId, featureBuf.sortBy(_._1).map(_._2)) - }.toMap), - preservesPartitioning = true) - .cache() - cachedInput.count() - - cachedInput - } - - /** - * Build a inputRDD(rdd of a Map of ('cellID' -> 'input Features')). - * It is built by concatenating new 5 min features with last 45 minutes features of cached - * input-features in each partitions. - */ - def buildInputFeatures( - normR2Df: DataFrame, - time: String, - cachedInputFeatures: RDD[Map[Int, mutable.ArrayBuffer[Array[Float]]]]) - : RDD[immutable.Map[Int, mutable.ArrayBuffer[Array[Float]]]] = { - val filter = buildTimeFilter("evt_dtm", time, 0, 1) - normR2Df.where(filter).rdd - .map(row => (row.getInt(1), row)) - .partitionBy(cachedInputFeatures.partitioner.get) - .zipPartitions(cachedInputFeatures, preservesPartitioning = true) { - (newRowIter, cached) => - if (newRowIter.isEmpty) { - cached - } else { - val cachedMap = cached.next() - val newMap = new mutable.HashMap[Int, mutable.ArrayBuffer[Array[Float]]]() - newRowIter.foreach { - newRow => - val buffer = cachedMap(newRow._1) - val features = (for (i <- 2 until newRow._2.size) yield newRow._2.getFloat(i)).toArray - newMap.put(newRow._1, buffer.drop(1) += features) - } - Iterator.single(newMap.toMap) - } - } - } - - /** - * Build a targetRDD(rdd of a Map of ('cellID' -> 'target features')). - * It is built by new 5 min features in each partitions. - */ - def buildInputTargets( - normR2Df: DataFrame, - time: String, - cachedInputFeatures: RDD[Map[Int, mutable.ArrayBuffer[Array[Float]]]]) - : RDD[immutable.Map[Int, Array[Float]]] = { - val filter = buildTimeFilter("evt_dtm", time, 0, 1) - normR2Df.where(filter).rdd - .map(row => (row.getInt(1), row)) - .partitionBy(cachedInputFeatures.partitioner.get) - .mapPartitions({ - it: Iterator[(Int, Row)] => - // val cachedMap = cached.next() - val newMap = new mutable.HashMap[Int, Array[Float]]() - it.foreach { - newRow => - // val buffer = cachedMap(newRow._1) - val features = (for (i <- 2 until newRow._2.size) yield newRow._2.getFloat(i)).toArray - newMap.put(newRow._1, features) - } - Iterator.single(newMap.toMap) - } - ) - } - - /** - * Cache a Map of ('cellID' -> '7 days x buffer of 55 min features sorted by evt_dtm') into each - * partitions. - * Features are partitioned by unique CellId and distributed into Spark partitions. - */ - def cacheMemoryFeaturesQueue( - normR2Df: DataFrame, - time: String) - : RDD[immutable.Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]] = { - val filter = buildTimeFilter("evt_dtm", time, 50, 7) - val cachedMemory = normR2Df.where(filter).rdd.map(row => (row.getInt(1), row)) - .aggregateByKey(new mutable.ArrayBuffer[(String, Array[Float])]())( - (buf, row) => { - val features = (for (i <- 2 until row.size) yield row.getFloat(i)).toArray - buf.append((row.getString(0), features)) // Tuple of (DT, Features) - buf - }, (b1, b2) => b1 ++= b2) - .mapPartitions({ - it: Iterator[(Int, mutable.ArrayBuffer[(String, Array[Float])])] => - Iterator.single[Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]]( - it.map { - case (cellId, fs) => (cellId, fs.sortBy(_._1).map(_._2).grouped(11).toArray) - }.toMap) - }, preservesPartitioning = true) - .cache() - - cachedMemory.count() - cachedMemory - } - - /** - * Build a memoryRDD(rdd of a Map of ('cellID' -> 'memory features')). - * It is built by concatenating new 7days x 5 min features with 7days x last 50 minutes features - * of cached memory feature in each partitions. - */ - def buildMemoryFeatures( - normR2Df: DataFrame, - time: String, - cachedMemory: RDD[immutable.Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]]) - : RDD[immutable.Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]] = { - val filter = buildTimeFilter("evt_dtm", time, 0, 7) - normR2Df.where(filter).rdd - .map(row => (row.getInt(1), row)) - .partitionBy(cachedMemory.partitioner.get) - .zipPartitions(cachedMemory, preservesPartitioning = true) { - (newRowIter, cachedMapIter) => - if (newRowIter.isEmpty) { - cachedMapIter - } else { - val cachedMap = cachedMapIter.next() - val newMemoryMap = new mutable.HashMap[Int, Array[mutable.ArrayBuffer[Array[Float]]]]() - newRowIter.toArray.groupBy(_._1).values.iterator.foreach { - newRows: Array[(Int, Row)] => - val cellId = newRows(0)._1 - val memories = cachedMap(cellId) - val newFeatures = newRows.map(_._2).sortBy(r => r.getString(0)).map { - row => (for (i <- 2 until row.size) yield row.getFloat(i)).toArray - } - val newMemory = Array.ofDim[mutable.ArrayBuffer[Array[Float]]](memories.length) - for (i <- memories.indices) { - newMemory(i) = memories(i).drop(1) - newMemory(i) += newFeatures(i) - } - newMemoryMap.put(cellId, newMemory) - } - Iterator.single(newMemoryMap.toMap) - } - } - } - - /** - * Build TensorRDD with inputRDD and memoryRDD. - * Input features and memory features are zipped and grouped with batchSize in each partitions. - */ - def buildSampleRDD( - inputRDD: RDD[immutable.Map[Int, mutable.ArrayBuffer[Array[Float]]]], - memoryRDD: RDD[immutable.Map[Int, Array[mutable.ArrayBuffer[Array[Float]]]]], - targetRDD: RDD[immutable.Map[Int, Array[Float]]] - ): RDD[Sample[Float]] = { - inputRDD.zipPartitions(memoryRDD, targetRDD, preservesPartitioning = true) { - (iterInput, iterMemory, iterTarget) => - val inputMap = iterInput.next() - val memoryMap = iterMemory.next() - val targetMap = iterTarget.next() - val pairedInputXMLabel = inputMap.map { - case (cellId: Int, inputFs: mutable.ArrayBuffer[Array[Float]]) => - val memoryFs = memoryMap(cellId) - val labelFs = targetMap(cellId) - (inputFs.flatten.toArray, memoryFs.flatMap(fs => fs.flatten), labelFs) - }.toArray - val xData = pairedInputXMLabel.flatMap(_._1) - val xTensor = Tensor[Float](xData, Array(xData.length / 10 / 8, 10, 8)) - val mData = pairedInputXMLabel.flatMap(_._2) - val mTensor = Tensor[Float](mData, Array(mData.length / 77 / 8, 77, 8)) - val label = pairedInputXMLabel.flatMap(_._3) - val labelTensor = Tensor[Float](label, Array(label.length / 8, 1, 8)) - val sample = ArraySample[Float](Array(xTensor, mTensor), labelTensor) - Iterator.single(sample) - } - } - - /** - * Create inferenceRDD which applies tfModel to the tensorRDD. - */ - def buildInferenceRDD(tensorRDD: RDD[Table], btfnet: ModelBroadcast[Float]): RDD[Activity] = { - tensorRDD.mapPartitions( - iterTable => { - iterTable.toArray.map(btfnet.value().forward).toIterator - }, preservesPartitioning = true) - } - - /** - * Create DataFrame which has FrashBase as data source. - */ - def createR2DataFrame(fbHost: String, fbPort: Int): DataFrame = { - val params = Map("table" -> "1", - "host" -> fbHost, - "port" -> fbPort.toString, - "partitions" -> "evt_dtm random", - "mode" -> "nvkvs", - "group_query_enabled" -> "no", - "group_size" -> "44", - "query_result_partition_cnt_limit" -> "400000000", - "query_result_task_row_cnt_limit" -> "10000000", - "query_result_total_row_cnt_limit" -> "2147483647", - "at_least_one_partition_enabled" -> "no") - - val fields = "evt_dtm,uniq_id,rsrp,rsrq,dl_prb_usage_rate,sinr,ue_tx_power,phr,ue_conn_tot_cnt,cqi,random" - .split(',') - .map { - case "evt_dtm" => StructField("evt_dtm", StringType) - case name@("uniq_id" | "random") => StructField(name, IntegerType) - case fieldName => StructField(fieldName, FloatType) - } - - val schema = StructType(fields) - val r2Df = sqlContext.read.format("r2") - .options(params) - .schema(schema) - .load() - .drop("random") - r2Df - } - - /** - * Build DataFrame which applies normalization to each features. - */ - def normalizedDF(dfToNorm: DataFrame): DataFrame = { - val minMaxMap = Map[String, (Float, Float)]( - "rsrp" -> (-121.0f, 0.0f), - "rsrq" -> (-20.0f, 0.0f), - "dl_prb_usage_rate" -> (1.0f, 99.57666778564453f), - "sinr" -> (-3.676666736602783f, 20.5f), - "ue_tx_power" -> (-10.943333625793457f, 23.0f), - "phr" -> (0.5f, 52.91666793823242f), - "ue_conn_tot_cnt" -> (0.0f, 144.63333129882812f), - "cqi" -> (1.9620689153671265f, 14.984615325927734f) - ) - - var df = dfToNorm - for (col <- minMaxMap) { - val colName = col._1 - val (min, max) = col._2 - df = df.withColumn(colName, (((df(colName) - min) * 2.0f / (max - min)) + -1.0f).cast(FloatType)) - } - df - } -} diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py index 384e802..4999319 100644 --- a/train_mem_model_zoo.py +++ b/train_mem_model_zoo.py @@ -1,34 +1,25 @@ -import tensorflow as tf -from tensorflow.contrib import layers -import os -from utils import make_date_dir, find_latest_dir - -import sys -from zoo.pipeline.api.net import TFNet -from zoo import init_nncontext, Sample +from zoo import init_nncontext from zoo.tfpark import TFOptimizer, TFDataset from bigdl.optim.optimizer import * -import tensorflow as tf -import numpy as np from data_utils import load_agg_selected_data_mem from ARMem.config import Config from ARMem.model import Model -# to reproduce the results in test_mem_model.py -# please set PARALLELISM to 1 and BATCH_PER_THREAD to 1022 -PARALLELISM=4 -BATCH_PER_THREAD=3200 - if __name__ == "__main__": - config = Config() + data_path = sys.argv[1] + batch_size = int(sys.argv[2]) + num_epochs = int(sys.argv[3]) + + config = Config() + config.data_path = data_path config.latest_model=False # init or get SparkContext sc = init_nncontext() - # create test data + # create train data train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \ load_agg_selected_data_mem(data_path=config.data_path, x_len=config.x_len, @@ -41,12 +32,12 @@ model_dir = config.model_dir - dataset = TFDataset.from_ndarrays([train_x, train_m, train_y], batch_size=2700, val_tensors=[dev_x, dev_m, dev_y],) + dataset = TFDataset.from_ndarrays([train_x, train_m, train_y], batch_size=batch_size, val_tensors=[dev_x, dev_m, dev_y],) model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae}) - optimizer.optimize(end_trigger=MaxEpoch(15000)) + optimizer.optimize(end_trigger=MaxEpoch(num_epochs)) From 143b00191ea056b492e409353e8a10eb4b0b1588 Mon Sep 17 00:00:00 2001 From: jenniew Date: Mon, 11 Nov 2019 15:36:06 -0800 Subject: [PATCH 11/16] restore data path --- data_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data_utils.py b/data_utils.py index b443c48..ad15402 100644 --- a/data_utils.py +++ b/data_utils.py @@ -16,7 +16,7 @@ def batch_loader(iterable, batch_size, shuffle=False): def load_agg_data( - data_path='/home/yang/sources/ARMemNet-BigDL/data/aggregated_5min_scaled.csv', + data_path='../data/aggregated_5min_scaled.csv', x_len=10, y_len=1, ncells=20, @@ -125,7 +125,7 @@ def load_agg_data( def load_agg_selected_data_mem( - data_path='/home/jwang/git/ARMemNet-BigDL_jennie/data/aggregated_5min_scaled.csv', + data_path='../data/aggregated_5min_scaled.csv', x_len=10, y_len=1, mem_len=7, From 2366e03eea3bdbd5bc6a6f51f79df0996b7cfa85 Mon Sep 17 00:00:00 2001 From: jenniew Date: Mon, 11 Nov 2019 15:37:31 -0800 Subject: [PATCH 12/16] restore data path --- data_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_utils.py b/data_utils.py index ad15402..5c5a5f7 100644 --- a/data_utils.py +++ b/data_utils.py @@ -302,7 +302,7 @@ def _time_concat(arg): return train_x, dev_x, te_x, train_y, dev_y, te_y, train_m, dev_m, te_m, test_dt -def load_agg_data_all(data_path='/home/yang/sources/ARMemNet-BigDL/data/aggregated_data_5min_scaled.csv', ncells=20, test_len=7): +def load_agg_data_all(data_path='../data/aggregated_data_5min_scaled.csv', ncells=20, test_len=7): data = pd.read_csv(data_path, index_col=0) data.index = pd.to_datetime(data.index) From cad01c7d4c5f59f7fe30cc404a707aada3728d0c Mon Sep 17 00:00:00 2001 From: jenniew Date: Mon, 11 Nov 2019 15:41:36 -0800 Subject: [PATCH 13/16] update run inference --- ARMem/config.py | 2 +- run_inference_mem_model_zoo.sh | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ARMem/config.py b/ARMem/config.py index 0e5df5a..fc8daa3 100644 --- a/ARMem/config.py +++ b/ARMem/config.py @@ -13,7 +13,7 @@ def __init__(self): self.ar_g = 1 # data params - self.data_path = '/home/jwang/git/ARMemNet-BigDL_jennie/data/aggregated_5min_scaled.csv' + self.data_path = '../data/aggregated_5min_scaled.csv' self.nfeatures = 8 # number of col_list in "../config_preprocess.py" self.x_len = self.nsteps self.y_len = 1 diff --git a/run_inference_mem_model_zoo.sh b/run_inference_mem_model_zoo.sh index 248d4ce..71973fd 100644 --- a/run_inference_mem_model_zoo.sh +++ b/run_inference_mem_model_zoo.sh @@ -5,4 +5,7 @@ if [ -z "${ANALYTICS_ZOO_HOME}" ]; then fi # bash $ANALYTICS_ZOO_HOME/bin/spark-submit-with-zoo.sh --master local[4] inference_mem_model_zoo.py -bash $ANALYTICS_ZOO_HOME/bin/spark-submit-python-with-zoo.sh --master local[36] --driver-memory 32g train_mem_model_zoo.py +bash $ANALYTICS_ZOO_HOME/bin/spark-submit-python-with-zoo.sh \ + --master local[36] \ + --driver-memory 32g \ + inference_mem_model_zoo.py From a64d701023a1386d0f21ecc14f690d4f768fd167 Mon Sep 17 00:00:00 2001 From: jenniew Date: Mon, 11 Nov 2019 15:42:37 -0800 Subject: [PATCH 14/16] update --- train_mem_model_zoo.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py index 4999319..171893b 100644 --- a/train_mem_model_zoo.py +++ b/train_mem_model_zoo.py @@ -39,10 +39,3 @@ optimizer.optimize(end_trigger=MaxEpoch(num_epochs)) - - - - - - - From a1546186a488bc5563cdfb3a0a285e24b6437201 Mon Sep 17 00:00:00 2001 From: jenniew Date: Mon, 11 Nov 2019 15:46:33 -0800 Subject: [PATCH 15/16] add train script --- run_train_mem_model_zoo.sh | 5 +++++ 1 file changed, 5 insertions(+) create mode 100755 run_train_mem_model_zoo.sh diff --git a/run_train_mem_model_zoo.sh b/run_train_mem_model_zoo.sh new file mode 100755 index 0000000..7529708 --- /dev/null +++ b/run_train_mem_model_zoo.sh @@ -0,0 +1,5 @@ + +${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ + --master local[4] \ + --driver-memory 20g \ + train_mem_model_zoo.py /home/jwang/git/ARMemNet-BigDL_jennie/data/aggregated_5min_scaled.csv 2700 1000 From 434f8b5801c6bacc10afcda7da9eedfd285b5ae0 Mon Sep 17 00:00:00 2001 From: jenniew Date: Mon, 11 Nov 2019 16:04:09 -0800 Subject: [PATCH 16/16] update analytics zoo version in pom file --- flashbase-ml-pipeline/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flashbase-ml-pipeline/pom.xml b/flashbase-ml-pipeline/pom.xml index 85d4e37..e3cd39e 100644 --- a/flashbase-ml-pipeline/pom.xml +++ b/flashbase-ml-pipeline/pom.xml @@ -43,8 +43,8 @@ com.intel.analytics.zoo - analytics-zoo-bigdl_0.8.0-spark_${spark.version} - 0.5.1 + analytics-zoo-bigdl_0.9.1-spark_${spark.version} + 0.6.0 provided