diff --git a/ARMem/model.py b/ARMem/model.py index a78bfa4..99c4889 100644 --- a/ARMem/model.py +++ b/ARMem/model.py @@ -4,12 +4,15 @@ # AR_memory class Model(object): - def __init__(self, config): + def __init__(self, config, input_x=None, memories=None, targets=None): self.config = config self.global_step = tf.Variable(0, trainable=False, name="global_step") self.regularizer = layers.l2_regularizer(self.config.l2_lambda) self.sess = None self.saver = None + self.input_x = input_x + self.memories = memories + self.targets = targets self._build_model() def _build_model(self): @@ -54,9 +57,12 @@ def _build_model(self): self.initialize_session() def add_placeholder(self): - self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x") - self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets") - self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32, + if self.input_x is None: + self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x") + if self.targets is None: + self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets") + if self.memories is None: + self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32, name="memories") # self.targets = tf.placeholder(shape=[None], dtype=tf.int32, name="targets") self.dropout = tf.placeholder(dtype=tf.float32, name="dropout") diff --git a/flashbase-ml-pipeline/pom.xml b/flashbase-ml-pipeline/pom.xml index 85d4e37..e3cd39e 100644 --- a/flashbase-ml-pipeline/pom.xml +++ b/flashbase-ml-pipeline/pom.xml @@ -43,8 +43,8 @@ com.intel.analytics.zoo - analytics-zoo-bigdl_0.8.0-spark_${spark.version} - 0.5.1 + analytics-zoo-bigdl_0.9.1-spark_${spark.version} + 0.6.0 provided diff --git a/run_inference_mem_model_zoo.sh b/run_inference_mem_model_zoo.sh index 006dc35..71973fd 100644 --- a/run_inference_mem_model_zoo.sh +++ b/run_inference_mem_model_zoo.sh @@ -5,4 +5,7 @@ if [ -z "${ANALYTICS_ZOO_HOME}" ]; then fi # bash $ANALYTICS_ZOO_HOME/bin/spark-submit-with-zoo.sh --master local[4] inference_mem_model_zoo.py -bash $ANALYTICS_ZOO_HOME/bin/spark-submit-with-zoo.sh --master local[36] --driver-memory 32g inference_mem_model_zoo.py +bash $ANALYTICS_ZOO_HOME/bin/spark-submit-python-with-zoo.sh \ + --master local[36] \ + --driver-memory 32g \ + inference_mem_model_zoo.py diff --git a/run_train_mem_model_zoo.sh b/run_train_mem_model_zoo.sh new file mode 100755 index 0000000..7529708 --- /dev/null +++ b/run_train_mem_model_zoo.sh @@ -0,0 +1,5 @@ + +${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ + --master local[4] \ + --driver-memory 20g \ + train_mem_model_zoo.py /home/jwang/git/ARMemNet-BigDL_jennie/data/aggregated_5min_scaled.csv 2700 1000 diff --git a/scala-inference/bin/run-scala-inference.sh b/scala-inference/bin/run-scala-inference.sh index 32a46d8..75c0c35 100644 --- a/scala-inference/bin/run-scala-inference.sh +++ b/scala-inference/bin/run-scala-inference.sh @@ -8,7 +8,5 @@ fi full_path=$(realpath $0) dir_path=$(dirname $full_path) -bash $dir_path/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 false - - +bash $ANALYTICS_ZOO_HOME/bin/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 false diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py new file mode 100644 index 0000000..171893b --- /dev/null +++ b/train_mem_model_zoo.py @@ -0,0 +1,41 @@ +from zoo import init_nncontext +from zoo.tfpark import TFOptimizer, TFDataset +from bigdl.optim.optimizer import * +from data_utils import load_agg_selected_data_mem +from ARMem.config import Config +from ARMem.model import Model + + +if __name__ == "__main__": + + data_path = sys.argv[1] + batch_size = int(sys.argv[2]) + num_epochs = int(sys.argv[3]) + + config = Config() + config.data_path = data_path + config.latest_model=False + + # init or get SparkContext + sc = init_nncontext() + + # create train data + train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \ + load_agg_selected_data_mem(data_path=config.data_path, + x_len=config.x_len, + y_len=config.y_len, + foresight=config.foresight, + cell_ids=config.test_cell_ids, + dev_ratio=config.dev_ratio, + test_len=config.test_len, + seed=config.seed) + + model_dir = config.model_dir + + dataset = TFDataset.from_ndarrays([train_x, train_m, train_y], batch_size=batch_size, val_tensors=[dev_x, dev_m, dev_y],) + + model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) + optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae}) + + optimizer.optimize(end_trigger=MaxEpoch(num_epochs)) +