diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala index b64a658bef..e2bb1b5fca 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala @@ -77,14 +77,38 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P] cartesian.flatMap { case (m, qArray) => qArray.map { case (qx, q) => - (qx, Await.result(predict(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) ) + (qx, + Await.result(predictAsync(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) ) } } } - def predictBase(localBaseModel: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = { - predict(localBaseModel.asInstanceOf[M], q)(ec) - } + override def predictBaseAsync(localBaseModel: Any, q: Q)(implicit ec: ExecutionContext) + : Future[P] = + predictAsync(localBaseModel.asInstanceOf[M], q)(ec) + + @deprecated(message = + "this method is just here for backward compatibility, predictBaseAsync() is called now", + since = "0.14.0") + override def predictBase(localBaseModel: Any, q: Q): P = + predict(localBaseModel.asInstanceOf[M], q) + + /** Implement this method to produce a Future of a prediction in a non blocking way + * from a query and trained model. + * + * This method is implemented to just delegate to blocking predict() for + * backward compatibility reasons. + * Definitely overwrite it to implement your blocking prediction method, and leave + * the old blocking predict() as it is (throwing an exception), it won't be called from + * now on. + * + * @param model Trained model produced by [[train]]. + * @param query An input query. + * @param ec ExecutionContext to use for async operations + * @return A Future of a prediction. + */ + def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] = + Future.successful(blocking(predict(model, query))) /** Implement this method to produce a prediction from a query and trained * model. @@ -93,7 +117,9 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P] * @param q An input query. * @return A prediction. */ - def predict(m: M, q: Q)(implicit ec: ExecutionContext): Future[P] + @deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0") + def predict(m: M, q: Q): P = + throw new NotImplementedError("predict() is deprecated, override predictAsync() instead") /** :: DeveloperApi :: * Engine developers should not use this directly (read on to see how local diff --git a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala index 9612ba789f..07d0dbdb21 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala @@ -71,12 +71,34 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P] */ def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = { qs.mapValues { q => - Await.result(predict(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) + Await.result(predictAsync(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) } } - def predictBase(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = - predict(bm.asInstanceOf[M], q)(ec) + override def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = + predictAsync(bm.asInstanceOf[M], q)(ec) + + @deprecated(message = + "this method is just here for backward compatibility, predictBaseAsync() is called now", + since = "0.14.0") + override def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q) + + /** Implement this method to produce a Future of a prediction in a non blocking way + * from a query and trained model. + * + * This method is implemented to just delegate to blocking predict() for + * backward compatibility reasons. + * Definitely overwrite it to implement your blocking prediction method, and leave + * the old blocking predict() as it is (throwing an exception), it won't be called from + * now on. + * + * @param model Trained model produced by [[train]]. + * @param query An input query. + * @param ec ExecutionContext to use for async operations + * @return A Future of a prediction. + */ + def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] = + Future.successful(blocking(predict(model, query))) /** Implement this method to produce a prediction from a query and trained * model. @@ -85,7 +107,9 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P] * @param query An input query. * @return A prediction. */ - def predict(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] + @deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0") + def predict(model: M, query: Q): P = + throw new NotImplementedError("predict() is deprecated, override predictAsync() instead") /** :: DeveloperApi :: * Engine developers should not use this directly (read on to see how diff --git a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala index 3a24c07ffe..2b619a5357 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala @@ -24,7 +24,7 @@ import org.apache.predictionio.workflow.PersistentModelManifest import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, blocking} /** Base class of a parallel algorithm. * @@ -74,9 +74,31 @@ abstract class PAlgorithm[PD, M, Q, P] def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = throw new NotImplementedError("batchPredict not implemented") - def predictBase(baseModel: Any, query: Q)(implicit ec: ExecutionContext): Future[P] = { - predict(baseModel.asInstanceOf[M], query)(ec) - } + override def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = + predictAsync(bm.asInstanceOf[M], q)(ec) + + @deprecated(message = + "this method is just here for backward compatibility, predictBaseAsync() is called now", + since = "0.14.0") + override def predictBase(baseModel: Any, query: Q): P = + predict(baseModel.asInstanceOf[M], query) + + /** Implement this method to produce a Future of a prediction in a non blocking way + * from a query and trained model. + * + * This method is implemented to just delegate to blocking predict() for + * backward compatibility reasons. + * Definitely overwrite it to implement your blocking prediction method, and leave + * the old blocking predict() as it is (throwing an exception), it won't be called from + * now on. + * + * @param model Trained model produced by [[train]]. + * @param query An input query. + * @param ec ExecutionContext to use for async operations + * @return A Future of a prediction. + */ + def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] = + Future.successful(blocking(predict(model, query))) /** Implement this method to produce a prediction from a query and trained * model. @@ -85,7 +107,9 @@ abstract class PAlgorithm[PD, M, Q, P] * @param query An input query. * @return A prediction. */ - def predict(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] + @deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0") + def predict(model: M, query: Q): P = + throw new NotImplementedError("predict() is deprecated, override predictAsync() instead") /** :: DeveloperApi :: * Engine developers should not use this directly (read on to see how parallel diff --git a/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala index f6703c1a64..2030deb8a3 100644 --- a/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala @@ -26,7 +26,7 @@ import net.jodah.typetools.TypeResolver import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, blocking} /** :: DeveloperApi :: * Base trait with default custom query serializer, exposed to engine developer @@ -83,6 +83,19 @@ abstract class BaseAlgorithm[PD, M, Q, P] def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) : RDD[(Long, P)] + /** :: DeveloperApi :: + * Engine developers should not use this directly. Called by serving to + * perform a single prediction. + * + * @param bm Model + * @param q Query + * @param ec ExecutionContext to use for async operations + * @return Future of a Predicted result + */ + @DeveloperApi + def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = + Future.successful(blocking {predictBase(bm, q)}) + /** :: DeveloperApi :: * Engine developers should not use this directly. Called by serving to * perform a single prediction. @@ -92,7 +105,11 @@ abstract class BaseAlgorithm[PD, M, Q, P] * @return Predicted result */ @DeveloperApi - def predictBase(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] + @deprecated(message = "override non blocking predictBaseAsync() instead", since = "0.14.0") + def predictBase(bm: Any, q: Q): P = + throw new NotImplementedError( + "predictBase() is deprecated, override predictBaseAsync() instead" + ) /** :: DeveloperApi :: * Engine developers should not use this directly. Prepare a model for diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala index 4bb6e04899..8b0bd64fb1 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala @@ -213,7 +213,7 @@ object BatchPredict extends Logging { // finally Serving.serve. val supplementedQuery = serving.supplementBase(query) val predictionsFuture = Future.sequence(algorithms.zip(models).map { case (a, m) => - a.predictBase(m, supplementedQuery) + a.predictBaseAsync(m, supplementedQuery) }) // Notice that it is by design to call Serving.serve with the // *original* query. diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala index 12ea70a7a7..ed2b4f4ad2 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala @@ -506,7 +506,7 @@ class PredictionServer[Q, P]( val supplementedQuery = serving.supplementBase(query) val predictionsFuture = Future.sequence(algorithms.zip(models).map { case (a, m) => - a.predictBase(m, supplementedQuery) + a.predictBaseAsync(m, supplementedQuery) }) // Notice that it is by design to call Serving.serve with the // *original* query. diff --git a/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala b/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala index 8b17fe959f..31df0a83d3 100644 --- a/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala +++ b/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala @@ -205,7 +205,8 @@ object Engine0 { qs.mapValues(q => Prediction(id, q, Some(m))) } - def predict(m: PAlgo0.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: PAlgo0.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(id, q, Some(m))) } } @@ -225,7 +226,7 @@ object Engine0 { qs.mapValues(q => Prediction(id, q, Some(m))) } - def predict(m: PAlgo1.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: PAlgo1.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(id, q, Some(m))) } } @@ -248,7 +249,7 @@ object Engine0 { qs.mapValues(q => Prediction(id, q, Some(m))) } - def predict(m: PAlgo2.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: PAlgo2.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(id, q, Some(m))) } } @@ -275,7 +276,8 @@ object Engine0 { qs.mapValues(q => Prediction(id, q, Some(m))) } - def predict(m: PAlgo3.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: PAlgo3.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(id, q, Some(m))) } } @@ -284,11 +286,12 @@ object Engine0 { case class Model(id: Int, pd: ProcessedData) } - class LAlgo0(id: Int = 0) + class LAlgo0(id: Int = 0) extends LAlgorithm[ProcessedData, LAlgo0.Model, Query, Prediction] { def train(pd: ProcessedData): LAlgo0.Model = LAlgo0.Model(id, pd) - def predict(m: LAlgo0.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: LAlgo0.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(id, q, Some(m))) } } @@ -297,11 +300,12 @@ object Engine0 { case class Model(id: Int, pd: ProcessedData) } - class LAlgo1(id: Int = 0) + class LAlgo1(id: Int = 0) extends LAlgorithm[ProcessedData, LAlgo1.Model, Query, Prediction] { def train(pd: ProcessedData): LAlgo1.Model = LAlgo1.Model(id, pd) - def predict(m: LAlgo1.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: LAlgo1.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(id, q, Some(m))) } } @@ -315,11 +319,12 @@ object Engine0 { object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model] } - class LAlgo2(params: LAlgo2.Params) + class LAlgo2(params: LAlgo2.Params) extends LAlgorithm[ProcessedData, LAlgo2.Model, Query, Prediction] { def train(pd: ProcessedData): LAlgo2.Model = LAlgo2.Model(params.id, pd) - def predict(m: LAlgo2.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: LAlgo2.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(params.id, q, Some(m))) } } @@ -330,11 +335,12 @@ object Engine0 { case class Model(id: Int, pd: ProcessedData) } - class LAlgo3(params: LAlgo3.Params) + class LAlgo3(params: LAlgo3.Params) extends LAlgorithm[ProcessedData, LAlgo3.Model, Query, Prediction] { def train(pd: ProcessedData): LAlgo3.Model = LAlgo3.Model(params.id, pd) - def predict(m: LAlgo3.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: LAlgo3.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(params.id, q, Some(m))) } } @@ -349,7 +355,8 @@ object Engine0 { def train(sc: SparkContext, pd: ProcessedData) : NAlgo0.Model = NAlgo0.Model(id, pd) - def predict(m: NAlgo0.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: NAlgo0.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(id, q, Some(m))) } } @@ -363,7 +370,8 @@ object Engine0 { def train(sc: SparkContext, pd: ProcessedData) : NAlgo1.Model = NAlgo1.Model(id, pd) - def predict(m: NAlgo1.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: NAlgo1.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(id, q, Some(m))) } } @@ -377,12 +385,12 @@ object Engine0 { object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model] } - class NAlgo2(params: NAlgo2.Params) + class NAlgo2(params: NAlgo2.Params) extends P2LAlgorithm[ProcessedData, NAlgo2.Model, Query, Prediction] { def train(sc: SparkContext, pd: ProcessedData) : NAlgo2.Model = NAlgo2.Model(params.id, pd) - - def predict(m: NAlgo2.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: NAlgo2.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(params.id, q, Some(m))) } } @@ -393,12 +401,13 @@ object Engine0 { case class Model(id: Int, pd: ProcessedData) } - class NAlgo3(params: NAlgo3.Params) + class NAlgo3(params: NAlgo3.Params) extends P2LAlgorithm[ProcessedData, NAlgo3.Model, Query, Prediction] { def train(sc: SparkContext, pd: ProcessedData) : NAlgo3.Model = NAlgo3.Model(params.id, pd) - def predict(m: NAlgo3.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + override def predictAsync(m: NAlgo3.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { Future.successful(Prediction(params.id, q, Some(m))) } } @@ -437,18 +446,18 @@ object Engine1 { case class DSP(v: Double) extends Params } -class Engine1 +class Engine1 extends BaseEngine[ Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, Engine1.Actual] { def train( - sc: SparkContext, + sc: SparkContext, engineParams: EngineParams, engineInstanceId: String = "", params: WorkflowParams = WorkflowParams()): Seq[Any] = Seq[Any]() def eval(sc: SparkContext, engineParams: EngineParams, params: WorkflowParams) - : Seq[(Engine1.EvalInfo, + : Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])] = { val dsp = engineParams.dataSourceParams._2.asInstanceOf[Engine1.DSP] Seq( @@ -464,7 +473,7 @@ Engine1.Actual, Double] { override def header: String = "Metric0" def calculate( - sc: SparkContext, + sc: SparkContext, evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])]): Double = { evalDataSet.head._1.v @@ -481,7 +490,7 @@ Engine1.Actual, Metric1.Result]()(Ordering.by[Metric1.Result, Double](_.v)) { override def header: String = "Metric1" def calculate( - sc: SparkContext, + sc: SparkContext, evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])]): Metric1.Result = { Metric1.Result(0, evalDataSet.head._1.v) diff --git a/examples/scala-parallel-classification/add-algorithm/src/main/scala/NaiveBayesAlgorithm.scala b/examples/scala-parallel-classification/add-algorithm/src/main/scala/NaiveBayesAlgorithm.scala index f634b167e3..c02b24bd6c 100644 --- a/examples/scala-parallel-classification/add-algorithm/src/main/scala/NaiveBayesAlgorithm.scala +++ b/examples/scala-parallel-classification/add-algorithm/src/main/scala/NaiveBayesAlgorithm.scala @@ -47,7 +47,7 @@ class NaiveBayesAlgorithm(val ap: AlgorithmParams) NaiveBayes.train(data.labeledPoints, ap.lambda) } - def predict(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val label = model.predict(Vectors.dense( Array(query.attr0, query.attr1, query.attr2) )) diff --git a/examples/scala-parallel-classification/reading-custom-properties/src/main/scala/NaiveBayesAlgorithm.scala b/examples/scala-parallel-classification/reading-custom-properties/src/main/scala/NaiveBayesAlgorithm.scala index ac45707d38..65e31813fb 100644 --- a/examples/scala-parallel-classification/reading-custom-properties/src/main/scala/NaiveBayesAlgorithm.scala +++ b/examples/scala-parallel-classification/reading-custom-properties/src/main/scala/NaiveBayesAlgorithm.scala @@ -47,7 +47,7 @@ class NaiveBayesAlgorithm(val ap: AlgorithmParams) NaiveBayes.train(data.labeledPoints, ap.lambda) } - def predict(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val label = model.predict(Vectors.dense( // MODIFIED Array(query.featureA, query.featureB, query.featureC, query.featureD) diff --git a/examples/scala-parallel-ecommercerecommendation/adjust-score/src/main/scala/ECommAlgorithm.scala b/examples/scala-parallel-ecommercerecommendation/adjust-score/src/main/scala/ECommAlgorithm.scala index 444cf0c48e..5f0d075cdd 100644 --- a/examples/scala-parallel-ecommercerecommendation/adjust-score/src/main/scala/ECommAlgorithm.scala +++ b/examples/scala-parallel-ecommercerecommendation/adjust-score/src/main/scala/ECommAlgorithm.scala @@ -238,7 +238,7 @@ class ECommAlgorithm(val ap: ECommAlgorithmParams) buyCountsRDD.collectAsMap.toMap } - def predict(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val userFeatures = model.userFeatures val productModels = model.productModels diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala index 2baf02c33c..6c24811953 100644 --- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala +++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala @@ -239,7 +239,7 @@ class ECommAlgorithm(val ap: ECommAlgorithmParams) buyCountsRDD.collectAsMap.toMap } - def predict(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val userFeatures = model.userFeatures val productModels = model.productModels diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala index 8af414dfb0..debf8b0e37 100644 --- a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala index 7d11c0a638..b25dbad4e1 100644 --- a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala index 7d11c0a638..b25dbad4e1 100644 --- a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala index 7d11c0a638..b25dbad4e1 100644 --- a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala index 3578ca86f8..02ab817705 100644 --- a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala @@ -93,7 +93,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { Future.successful( // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala index 3dc8032905..0cef67c8bd 100644 --- a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala @@ -132,7 +132,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val productFeatures = model.productFeatures // convert items to Int index diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala index 4204185af8..da7d0bdefb 100644 --- a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala @@ -104,7 +104,7 @@ class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams) topCooccurrences } - def predict(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // convert items to Int index val queryList: Set[Int] = query.items diff --git a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala index 77fc93370d..eaaff27bae 100644 --- a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala @@ -126,7 +126,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val similarUserFeatures = model.similarUserFeatures diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala index 02b3e1374a..13d98ebc5b 100644 --- a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala @@ -132,7 +132,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val productFeatures = model.productFeatures // convert items to Int index diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala index 6bb175bdc6..d3293127a5 100644 --- a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala @@ -104,7 +104,7 @@ class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams) topCooccurrences } - def predict(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // convert items to Int index val queryList: Set[Int] = query.items diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala index 3f636a29e4..d9d52d4bec 100644 --- a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala @@ -128,7 +128,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val productFeatures = model.productFeatures diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala index 6550a12372..f5a42b164c 100644 --- a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala @@ -140,7 +140,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val productFeatures = model.productFeatures // convert items to Int index diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala index 52ca48cdc6..3ab3910415 100644 --- a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala +++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap