diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala index 27d1d14e96..e2bb1b5fca 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala @@ -24,6 +24,9 @@ import org.apache.predictionio.workflow.PersistentModelManifest import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future, blocking} +import scala.language.postfixOps import scala.reflect._ /** Base class of a local algorithm. @@ -72,13 +75,40 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P] val glomQs: RDD[Array[(Long, Q)]] = qs.glom() val cartesian: RDD[(M, Array[(Long, Q)])] = mRDD.cartesian(glomQs) cartesian.flatMap { case (m, qArray) => - qArray.map { case (qx, q) => (qx, predict(m, q)) } + qArray.map { + case (qx, q) => + (qx, + Await.result(predictAsync(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) ) + } } } - def predictBase(localBaseModel: Any, q: Q): P = { + override def predictBaseAsync(localBaseModel: Any, q: Q)(implicit ec: ExecutionContext) + : Future[P] = + predictAsync(localBaseModel.asInstanceOf[M], q)(ec) + + @deprecated(message = + "this method is just here for backward compatibility, predictBaseAsync() is called now", + since = "0.14.0") + override def predictBase(localBaseModel: Any, q: Q): P = predict(localBaseModel.asInstanceOf[M], q) - } + + /** Implement this method to produce a Future of a prediction in a non blocking way + * from a query and trained model. + * + * This method is implemented to just delegate to blocking predict() for + * backward compatibility reasons. + * Definitely overwrite it to implement your blocking prediction method, and leave + * the old blocking predict() as it is (throwing an exception), it won't be called from + * now on. + * + * @param model Trained model produced by [[train]]. + * @param query An input query. + * @param ec ExecutionContext to use for async operations + * @return A Future of a prediction. + */ + def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] = + Future.successful(blocking(predict(model, query))) /** Implement this method to produce a prediction from a query and trained * model. @@ -87,7 +117,9 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P] * @param q An input query. * @return A prediction. */ - def predict(m: M, q: Q): P + @deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0") + def predict(m: M, q: Q): P = + throw new NotImplementedError("predict() is deprecated, override predictAsync() instead") /** :: DeveloperApi :: * Engine developers should not use this directly (read on to see how local diff --git a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala index c617d2c50a..07d0dbdb21 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala @@ -25,6 +25,9 @@ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future, blocking} +import scala.language.postfixOps import scala.reflect._ /** Base class of a parallel-to-local algorithm. @@ -67,10 +70,35 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P] * @return Batch of predicted results */ def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = { - qs.mapValues { q => predict(m, q) } + qs.mapValues { q => + Await.result(predictAsync(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) + } } - def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q) + override def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = + predictAsync(bm.asInstanceOf[M], q)(ec) + + @deprecated(message = + "this method is just here for backward compatibility, predictBaseAsync() is called now", + since = "0.14.0") + override def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q) + + /** Implement this method to produce a Future of a prediction in a non blocking way + * from a query and trained model. + * + * This method is implemented to just delegate to blocking predict() for + * backward compatibility reasons. + * Definitely overwrite it to implement your blocking prediction method, and leave + * the old blocking predict() as it is (throwing an exception), it won't be called from + * now on. + * + * @param model Trained model produced by [[train]]. + * @param query An input query. + * @param ec ExecutionContext to use for async operations + * @return A Future of a prediction. + */ + def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] = + Future.successful(blocking(predict(model, query))) /** Implement this method to produce a prediction from a query and trained * model. @@ -79,7 +107,9 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P] * @param query An input query. * @return A prediction. */ - def predict(model: M, query: Q): P + @deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0") + def predict(model: M, query: Q): P = + throw new NotImplementedError("predict() is deprecated, override predictAsync() instead") /** :: DeveloperApi :: * Engine developers should not use this directly (read on to see how diff --git a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala index 55f8363fdb..2b619a5357 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala @@ -24,6 +24,8 @@ import org.apache.predictionio.workflow.PersistentModelManifest import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.concurrent.{ExecutionContext, Future, blocking} + /** Base class of a parallel algorithm. * * A parallel algorithm can be run in parallel on a cluster and produces a @@ -72,9 +74,31 @@ abstract class PAlgorithm[PD, M, Q, P] def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = throw new NotImplementedError("batchPredict not implemented") - def predictBase(baseModel: Any, query: Q): P = { + override def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = + predictAsync(bm.asInstanceOf[M], q)(ec) + + @deprecated(message = + "this method is just here for backward compatibility, predictBaseAsync() is called now", + since = "0.14.0") + override def predictBase(baseModel: Any, query: Q): P = predict(baseModel.asInstanceOf[M], query) - } + + /** Implement this method to produce a Future of a prediction in a non blocking way + * from a query and trained model. + * + * This method is implemented to just delegate to blocking predict() for + * backward compatibility reasons. + * Definitely overwrite it to implement your blocking prediction method, and leave + * the old blocking predict() as it is (throwing an exception), it won't be called from + * now on. + * + * @param model Trained model produced by [[train]]. + * @param query An input query. + * @param ec ExecutionContext to use for async operations + * @return A Future of a prediction. + */ + def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] = + Future.successful(blocking(predict(model, query))) /** Implement this method to produce a prediction from a query and trained * model. @@ -83,7 +107,9 @@ abstract class PAlgorithm[PD, M, Q, P] * @param query An input query. * @return A prediction. */ - def predict(model: M, query: Q): P + @deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0") + def predict(model: M, query: Q): P = + throw new NotImplementedError("predict() is deprecated, override predictAsync() instead") /** :: DeveloperApi :: * Engine developers should not use this directly (read on to see how parallel diff --git a/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala index 8b9edc147b..2030deb8a3 100644 --- a/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala @@ -26,6 +26,8 @@ import net.jodah.typetools.TypeResolver import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.concurrent.{ExecutionContext, Future, blocking} + /** :: DeveloperApi :: * Base trait with default custom query serializer, exposed to engine developer * via [[org.apache.predictionio.controller.CustomQuerySerializer]] @@ -81,6 +83,19 @@ abstract class BaseAlgorithm[PD, M, Q, P] def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) : RDD[(Long, P)] + /** :: DeveloperApi :: + * Engine developers should not use this directly. Called by serving to + * perform a single prediction. + * + * @param bm Model + * @param q Query + * @param ec ExecutionContext to use for async operations + * @return Future of a Predicted result + */ + @DeveloperApi + def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = + Future.successful(blocking {predictBase(bm, q)}) + /** :: DeveloperApi :: * Engine developers should not use this directly. Called by serving to * perform a single prediction. @@ -90,7 +105,11 @@ abstract class BaseAlgorithm[PD, M, Q, P] * @return Predicted result */ @DeveloperApi - def predictBase(bm: Any, q: Q): P + @deprecated(message = "override non blocking predictBaseAsync() instead", since = "0.14.0") + def predictBase(bm: Any, q: Q): P = + throw new NotImplementedError( + "predictBase() is deprecated, override predictBaseAsync() instead" + ) /** :: DeveloperApi :: * Engine developers should not use this directly. Prepare a model for diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala index 69525b11cf..8b0bd64fb1 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala @@ -32,7 +32,12 @@ import org.apache.predictionio.workflow.CleanupFunctions import org.apache.spark.rdd.RDD import org.json4s._ import org.json4s.native.JsonMethods._ +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.concurrent.blocking +import scala.concurrent.{Await, Future} import scala.language.existentials +import scala.concurrent.ExecutionContext.Implicits.global case class BatchPredictConfig( inputFilePath: String = "batchpredict-input.json", @@ -207,23 +212,26 @@ object BatchPredict extends Logging { // Deploy logic. First call Serving.supplement, then Algo.predict, // finally Serving.serve. val supplementedQuery = serving.supplementBase(query) - // TODO: Parallelize the following. - val predictions = algorithms.zip(models).map { case (a, m) => - a.predictBase(m, supplementedQuery) - } + val predictionsFuture = Future.sequence(algorithms.zip(models).map { case (a, m) => + a.predictBaseAsync(m, supplementedQuery) + }) // Notice that it is by design to call Serving.serve with the // *original* query. - val prediction = serving.serveBase(query, predictions) - // Combine query with prediction, so the batch results are - // self-descriptive. - val predictionJValue = JsonExtractor.toJValue( - jsonExtractorOption, - Map("query" -> query, - "prediction" -> prediction), - algorithms.head.querySerializer, - algorithms.head.gsonTypeAdapterFactories) - // Return JSON string - compact(render(predictionJValue)) + val predFutureRdds = predictionsFuture.map { + predictions => + val prediction = serving.serveBase(query, predictions) + // Combine query with prediction, so the batch results are + // self-descriptive. + val predictionJValue = JsonExtractor.toJValue( + jsonExtractorOption, + Map("query" -> query, + "prediction" -> prediction), + algorithms.head.querySerializer, + algorithms.head.gsonTypeAdapterFactories) + // Return JSON string + compact(render(predictionJValue)) + } + Await.result(predFutureRdds, 60 minutes) } predictionsRDD.saveAsTextFile(config.outputFilePath) diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala index 5642114f8b..469a8aa55b 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala @@ -51,6 +51,7 @@ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import org.apache.predictionio.akkahttpjson4s.Json4sSupport._ import org.apache.predictionio.configuration.SSLConfiguration +import org.json4s import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future} @@ -487,7 +488,6 @@ class PredictionServer[Q, P]( try { val servingStartTime = DateTime.now val jsonExtractorOption = args.jsonExtractor - val queryTime = DateTime.now // Extract Query from Json val query = JsonExtractor.extract( jsonExtractorOption, @@ -504,107 +504,67 @@ class PredictionServer[Q, P]( // Deploy logic. First call Serving.supplement, then Algo.predict, // finally Serving.serve. val supplementedQuery = serving.supplementBase(query) - // TODO: Parallelize the following. - val predictions = algorithms.zip(models).map { case (a, m) => - a.predictBase(m, supplementedQuery) - } + + val predictionsFuture = Future.sequence(algorithms.zip(models).map { case (a, m) => + a.predictBaseAsync(m, supplementedQuery) + }) // Notice that it is by design to call Serving.serve with the // *original* query. - val prediction = serving.serveBase(query, predictions) - val predictionJValue = JsonExtractor.toJValue( - jsonExtractorOption, - prediction, - algorithms.head.querySerializer, - algorithms.head.gsonTypeAdapterFactories) - /** Handle feedback to Event Server - * Send the following back to the Event Server - * - appId - * - engineInstanceId - * - query - * - prediction - * - prId - */ - val result = if (feedbackEnabled) { - implicit val formats = - algorithms.headOption map { alg => - alg.querySerializer - } getOrElse { - Utils.json4sDefaultFormats - } - // val genPrId = Random.alphanumeric.take(64).mkString - def genPrId: String = Random.alphanumeric.take(64).mkString - val newPrId = prediction match { - case id: WithPrId => - val org = id.prId - if (org.isEmpty) genPrId else org - case _ => genPrId - } - - // also save Query's prId as prId of this pio_pr predict events - val queryPrId = - query match { - case id: WithPrId => - Map("prId" -> id.prId) - case _ => - Map.empty - } - val data = Map( - // "appId" -> dataSourceParams.asInstanceOf[ParamsWithAppId].appId, - "event" -> "predict", - "eventTime" -> queryTime.toString(), - "entityType" -> "pio_pr", // prediction result - "entityId" -> newPrId, - "properties" -> Map( - "engineInstanceId" -> engineInstance.id, - "query" -> query, - "prediction" -> prediction)) ++ queryPrId - // At this point args.accessKey should be Some(String). - val accessKey = args.accessKey.getOrElse("") - val f: Future[Int] = Future { - scalaj.http.Http( - s"http://${args.eventServerIp}:${args.eventServerPort}/" + - s"events.json?accessKey=$accessKey").postData( - write(data)).header( - "content-type", "application/json").asString.code - } - f onComplete { - case Success(code) => { - if (code != 201) { - log.error(s"Feedback event failed. Status code: $code." - + s"Data: ${write(data)}.") + val pluginResultFuture = predictionsFuture.map { + predictions => + val prediction = serving.serveBase(query, predictions) + val predictionJValue = JsonExtractor.toJValue( + jsonExtractorOption, + prediction, + algorithms.head.querySerializer, + algorithms.head.gsonTypeAdapterFactories) + /** Handle feedback to Event Server + * Send the following back to the Event Server + * - appId + * - engineInstanceId + * - query + * - prediction + * - prId + */ + val result: json4s.JValue = if (feedbackEnabled) { + sendFeedback(prediction, query, predictionJValue) + } else predictionJValue + + val pluginResult = + pluginContext.outputBlockers.values.foldLeft(result) { case (r, p) => + p.process(engineInstance, queryJValue, r, pluginContext) } - } - case Failure(t) => { - log.error(s"Feedback event failed: ${t.getMessage}") } - } - // overwrite prId in predictedResult - // - if it is WithPrId, - // then overwrite with new prId - // - if it is not WithPrId, no prId injection - if (prediction.isInstanceOf[WithPrId]) { - predictionJValue merge parse(s"""{"prId" : "$newPrId"}""") - } else { - predictionJValue - } - } else predictionJValue - - val pluginResult = - pluginContext.outputBlockers.values.foldLeft(result) { case (r, p) => - p.process(engineInstance, queryJValue, r, pluginContext) - } - pluginsActorRef ! (engineInstance, queryJValue, result) - - // Bookkeeping - val servingEndTime = DateTime.now - lastServingSec = - (servingEndTime.getMillis - servingStartTime.getMillis) / 1000.0 - avgServingSec = - ((avgServingSec * requestCount) + lastServingSec) / - (requestCount + 1) - requestCount += 1 - - complete(compact(render(pluginResult))) + pluginsActorRef ! (engineInstance, queryJValue, result) + + // Bookkeeping + val servingEndTime = DateTime.now + lastServingSec = + (servingEndTime.getMillis - servingStartTime.getMillis) / 1000.0 + avgServingSec = + ((avgServingSec * requestCount) + lastServingSec) / + (requestCount + 1) + requestCount += 1 + + pluginResult + } + onComplete(pluginResultFuture) { + case Success(pluginResult) => + val responseBody = compact(render(pluginResult)) + log.info(s"Recommendation for query $queryString is: $responseBody") + complete(responseBody) + case Failure(t) => + val msg = s"Query:\n$queryString\n\nStack Trace:\n" + + s"${ExceptionUtils.getStackTrace(t)}\n\n" + log.error(msg) + args.logUrl map { url => + remoteLog( + url, + args.logPrefix.getOrElse(""), + msg) + } + complete(StatusCodes.InternalServerError, msg) + } } catch { case e: MappingException => val msg = s"Query:\n$queryString\n\nStack Trace:\n" + @@ -703,4 +663,69 @@ class PredictionServer[Q, P]( myRoute } + + def sendFeedback(prediction: P, query: Q, predictionJValue: JsonAST.JValue): json4s.JValue = { + val queryTime = DateTime.now + implicit val formats = + algorithms.headOption map { alg => + alg.querySerializer + } getOrElse { + Utils.json4sDefaultFormats + } + // val genPrId = Random.alphanumeric.take(64).mkString + def genPrId: String = Random.alphanumeric.take(64).mkString + val newPrId = prediction match { + case id: WithPrId => + val org = id.prId + if (org.isEmpty) genPrId else org + case _ => genPrId + } + + // also save Query's prId as prId of this pio_pr predict events + val queryPrId = + query match { + case id: WithPrId => + Map("prId" -> id.prId) + case _ => + Map.empty + } + val data = Map( + // "appId" -> dataSourceParams.asInstanceOf[ParamsWithAppId].appId, + "event" -> "predict", + "eventTime" -> queryTime.toString(), + "entityType" -> "pio_pr", // prediction result + "entityId" -> newPrId, + "properties" -> Map( + "engineInstanceId" -> engineInstance.id, + "query" -> query, + "prediction" -> prediction)) ++ queryPrId + // At this point args.accessKey should be Some(String). + val accessKey = args.accessKey.getOrElse("") + val f: Future[Int] = Future { + scalaj.http.Http( + s"http://${args.eventServerIp}:${args.eventServerPort}/" + + s"events.json?accessKey=$accessKey").postData( + write(data)).header( + "content-type", "application/json").asString.code + } + f onComplete { + case Success(code) => { + if (code != 201) { + log.error(s"Feedback event failed. Status code: $code." + + s"Data: ${write(data)}.") + } + } + case Failure(t) => { + log.error(s"Feedback event failed: ${t.getMessage}") } + } + // overwrite prId in predictedResult + // - if it is WithPrId, + // then overwrite with new prId + // - if it is not WithPrId, no prId injection + if (prediction.isInstanceOf[WithPrId]) { + predictionJValue merge parse(s"""{"prId" : "$newPrId"}""") + } else { + predictionJValue + } + } } diff --git a/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala b/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala index c53e98e827..31df0a83d3 100644 --- a/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala +++ b/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala @@ -19,13 +19,14 @@ package org.apache.predictionio.controller import org.apache.predictionio.controller.{Params => PIOParams} import org.apache.predictionio.core._ - import grizzled.slf4j.Logger import org.apache.predictionio.workflow.WorkflowParams import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD +import scala.concurrent.{ExecutionContext, Future} + object Engine0 { @transient lazy val logger = Logger[this.type] @@ -204,8 +205,9 @@ object Engine0 { qs.mapValues(q => Prediction(id, q, Some(m))) } - def predict(m: PAlgo0.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) + override def predictAsync(m: PAlgo0.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(id, q, Some(m))) } } @@ -224,8 +226,8 @@ object Engine0 { qs.mapValues(q => Prediction(id, q, Some(m))) } - def predict(m: PAlgo1.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) + override def predictAsync(m: PAlgo1.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(id, q, Some(m))) } } @@ -247,8 +249,8 @@ object Engine0 { qs.mapValues(q => Prediction(id, q, Some(m))) } - def predict(m: PAlgo2.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) + override def predictAsync(m: PAlgo2.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(id, q, Some(m))) } } @@ -274,8 +276,9 @@ object Engine0 { qs.mapValues(q => Prediction(id, q, Some(m))) } - def predict(m: PAlgo3.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) + override def predictAsync(m: PAlgo3.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(id, q, Some(m))) } } @@ -283,12 +286,13 @@ object Engine0 { case class Model(id: Int, pd: ProcessedData) } - class LAlgo0(id: Int = 0) + class LAlgo0(id: Int = 0) extends LAlgorithm[ProcessedData, LAlgo0.Model, Query, Prediction] { def train(pd: ProcessedData): LAlgo0.Model = LAlgo0.Model(id, pd) - def predict(m: LAlgo0.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) + override def predictAsync(m: LAlgo0.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(id, q, Some(m))) } } @@ -296,12 +300,13 @@ object Engine0 { case class Model(id: Int, pd: ProcessedData) } - class LAlgo1(id: Int = 0) + class LAlgo1(id: Int = 0) extends LAlgorithm[ProcessedData, LAlgo1.Model, Query, Prediction] { def train(pd: ProcessedData): LAlgo1.Model = LAlgo1.Model(id, pd) - def predict(m: LAlgo1.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) + override def predictAsync(m: LAlgo1.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(id, q, Some(m))) } } @@ -314,12 +319,13 @@ object Engine0 { object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model] } - class LAlgo2(params: LAlgo2.Params) + class LAlgo2(params: LAlgo2.Params) extends LAlgorithm[ProcessedData, LAlgo2.Model, Query, Prediction] { def train(pd: ProcessedData): LAlgo2.Model = LAlgo2.Model(params.id, pd) - def predict(m: LAlgo2.Model, q: Query): Prediction = { - Prediction(params.id, q, Some(m)) + override def predictAsync(m: LAlgo2.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(params.id, q, Some(m))) } } @@ -329,12 +335,13 @@ object Engine0 { case class Model(id: Int, pd: ProcessedData) } - class LAlgo3(params: LAlgo3.Params) + class LAlgo3(params: LAlgo3.Params) extends LAlgorithm[ProcessedData, LAlgo3.Model, Query, Prediction] { def train(pd: ProcessedData): LAlgo3.Model = LAlgo3.Model(params.id, pd) - def predict(m: LAlgo3.Model, q: Query): Prediction = { - Prediction(params.id, q, Some(m)) + override def predictAsync(m: LAlgo3.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(params.id, q, Some(m))) } } @@ -348,8 +355,9 @@ object Engine0 { def train(sc: SparkContext, pd: ProcessedData) : NAlgo0.Model = NAlgo0.Model(id, pd) - def predict(m: NAlgo0.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) + override def predictAsync(m: NAlgo0.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(id, q, Some(m))) } } @@ -362,8 +370,9 @@ object Engine0 { def train(sc: SparkContext, pd: ProcessedData) : NAlgo1.Model = NAlgo1.Model(id, pd) - def predict(m: NAlgo1.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) + override def predictAsync(m: NAlgo1.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(id, q, Some(m))) } } @@ -376,13 +385,13 @@ object Engine0 { object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model] } - class NAlgo2(params: NAlgo2.Params) + class NAlgo2(params: NAlgo2.Params) extends P2LAlgorithm[ProcessedData, NAlgo2.Model, Query, Prediction] { def train(sc: SparkContext, pd: ProcessedData) : NAlgo2.Model = NAlgo2.Model(params.id, pd) - - def predict(m: NAlgo2.Model, q: Query): Prediction = { - Prediction(params.id, q, Some(m)) + override def predictAsync(m: NAlgo2.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(params.id, q, Some(m))) } } @@ -392,13 +401,14 @@ object Engine0 { case class Model(id: Int, pd: ProcessedData) } - class NAlgo3(params: NAlgo3.Params) + class NAlgo3(params: NAlgo3.Params) extends P2LAlgorithm[ProcessedData, NAlgo3.Model, Query, Prediction] { def train(sc: SparkContext, pd: ProcessedData) : NAlgo3.Model = NAlgo3.Model(params.id, pd) - def predict(m: NAlgo3.Model, q: Query): Prediction = { - Prediction(params.id, q, Some(m)) + override def predictAsync(m: NAlgo3.Model, q: Query) + (implicit ec: ExecutionContext): Future[Prediction] = { + Future.successful(Prediction(params.id, q, Some(m))) } } @@ -436,18 +446,18 @@ object Engine1 { case class DSP(v: Double) extends Params } -class Engine1 +class Engine1 extends BaseEngine[ Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, Engine1.Actual] { def train( - sc: SparkContext, + sc: SparkContext, engineParams: EngineParams, engineInstanceId: String = "", params: WorkflowParams = WorkflowParams()): Seq[Any] = Seq[Any]() def eval(sc: SparkContext, engineParams: EngineParams, params: WorkflowParams) - : Seq[(Engine1.EvalInfo, + : Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])] = { val dsp = engineParams.dataSourceParams._2.asInstanceOf[Engine1.DSP] Seq( @@ -463,7 +473,7 @@ Engine1.Actual, Double] { override def header: String = "Metric0" def calculate( - sc: SparkContext, + sc: SparkContext, evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])]): Double = { evalDataSet.head._1.v @@ -480,7 +490,7 @@ Engine1.Actual, Metric1.Result]()(Ordering.by[Metric1.Result, Double](_.v)) { override def header: String = "Metric1" def calculate( - sc: SparkContext, + sc: SparkContext, evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])]): Metric1.Result = { Metric1.Result(0, evalDataSet.head._1.v) diff --git a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala index a73ee80c11..d081e1748f 100644 --- a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala +++ b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala @@ -54,6 +54,19 @@ object LEventStore { /** Reads events of the specified entity. May use this in Algorithm's predict() * or Serving logic to have fast event store access. * + * Note that this method uses `scala.concurrent.ExecutionContext.Implicits.global` + * internally. Since this is a thread pool which has a number of threads equal to + * available processors, parallelism is limited up to the number of processors. + * + * If this limitation become bottleneck of resource usage, you can increase the + * number of threads by declaring following VM options before calling "pio deploy": + * + *
+    * export JAVA_OPTS="$JAVA_OPTS \
+    *   -Dscala.concurrent.context.numThreads=1000 \
+    *   -Dscala.concurrent.context.maxThreads=1000"
+    * 
+ * * @param appName return events of this app * @param entityType return events of this entityType * @param entityId return events of this entityId @@ -92,8 +105,8 @@ object LEventStore { Await.result(findByEntityAsync( appName = appName, - entityType = entityType, - entityId = entityId, + entityType = Some(entityType), + entityId = Some(entityId), channelName = channelName, eventNames = eventNames, targetEntityType = targetEntityType, @@ -129,8 +142,8 @@ object LEventStore { */ def findByEntityAsync( appName: String, - entityType: String, - entityId: String, + entityType: Option[String], + entityId: Option[String], channelName: Option[String] = None, eventNames: Option[Seq[String]] = None, targetEntityType: Option[Option[String]] = None, @@ -147,8 +160,8 @@ object LEventStore { channelId = channelId, startTime = startTime, untilTime = untilTime, - entityType = Some(entityType), - entityId = Some(entityId), + entityType = entityType, + entityId = entityId, eventNames = eventNames, targetEntityType = targetEntityType, targetEntityId = targetEntityId, diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala index 6f39feb269..8c797876e5 100644 --- a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala +++ b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala @@ -53,6 +53,19 @@ object LJavaEventStore { /** Reads events of the specified entity. May use this in Algorithm's predict() * or Serving logic to have fast event store access. * + * Note that this method uses `scala.concurrent.ExecutionContext.Implicits.global` + * internally. Since this is a thread pool which has a number of threads equal to + * available processors, parallelism is limited up to the number of processors. + * + * If this limitation become bottleneck of resource usage, you can increase the + * number of threads by declaring following VM options before calling "pio deploy": + * + *
+    * export JAVA_OPTS="$JAVA_OPTS \
+    *   -Dscala.concurrent.context.numThreads=1000 \
+    *   -Dscala.concurrent.context.maxThreads=1000"
+    * 
+ * * @param appName return events of this app * @param entityType return events of this entityType * @param entityId return events of this entityId @@ -130,8 +143,8 @@ object LJavaEventStore { */ def findByEntityAsync( appName: String, - entityType: String, - entityId: String, + entityType: Option[String], + entityId: Option[String], channelName: Option[String], eventNames: Option[java.util.List[String]], targetEntityType: Option[Option[String]], diff --git a/docker/pio/Dockerfile b/docker/pio/Dockerfile index a63183e340..51555fbd91 100644 --- a/docker/pio/Dockerfile +++ b/docker/pio/Dockerfile @@ -18,7 +18,7 @@ FROM openjdk:8 ARG PIO_GIT_URL=https://github.com/apache/predictionio.git -ARG PIO_TAG=v0.13.0 +ARG PIO_TAG=v0.14.0 ENV SCALA_VERSION=2.11.12 ENV SPARK_VERSION=2.2.2 ENV HADOOP_VERSION=2.7.7 diff --git a/examples/scala-parallel-classification/add-algorithm/build.sbt b/examples/scala-parallel-classification/add-algorithm/build.sbt index e929f9e6f6..34dd03c077 100644 --- a/examples/scala-parallel-classification/add-algorithm/build.sbt +++ b/examples/scala-parallel-classification/add-algorithm/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-classification" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-classification/add-algorithm/src/main/scala/NaiveBayesAlgorithm.scala b/examples/scala-parallel-classification/add-algorithm/src/main/scala/NaiveBayesAlgorithm.scala index 0ac5e5b478..c02b24bd6c 100644 --- a/examples/scala-parallel-classification/add-algorithm/src/main/scala/NaiveBayesAlgorithm.scala +++ b/examples/scala-parallel-classification/add-algorithm/src/main/scala/NaiveBayesAlgorithm.scala @@ -19,14 +19,14 @@ package org.apache.predictionio.examples.classification import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params - import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.classification.NaiveBayesModel import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.SparkContext - import grizzled.slf4j.Logger +import scala.concurrent.{ExecutionContext, Future} + case class AlgorithmParams( lambda: Double ) extends Params @@ -47,11 +47,11 @@ class NaiveBayesAlgorithm(val ap: AlgorithmParams) NaiveBayes.train(data.labeledPoints, ap.lambda) } - def predict(model: NaiveBayesModel, query: Query): PredictedResult = { + def predictAsync(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val label = model.predict(Vectors.dense( Array(query.attr0, query.attr1, query.attr2) )) - PredictedResult(label) + Future.successful(PredictedResult(label)) } } diff --git a/examples/scala-parallel-classification/reading-custom-properties/build.sbt b/examples/scala-parallel-classification/reading-custom-properties/build.sbt index e929f9e6f6..34dd03c077 100644 --- a/examples/scala-parallel-classification/reading-custom-properties/build.sbt +++ b/examples/scala-parallel-classification/reading-custom-properties/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-classification" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-classification/reading-custom-properties/src/main/scala/NaiveBayesAlgorithm.scala b/examples/scala-parallel-classification/reading-custom-properties/src/main/scala/NaiveBayesAlgorithm.scala index 6625551268..65e31813fb 100644 --- a/examples/scala-parallel-classification/reading-custom-properties/src/main/scala/NaiveBayesAlgorithm.scala +++ b/examples/scala-parallel-classification/reading-custom-properties/src/main/scala/NaiveBayesAlgorithm.scala @@ -19,14 +19,14 @@ package org.apache.predictionio.examples.classification import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params - import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.classification.NaiveBayesModel import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.SparkContext - import grizzled.slf4j.Logger +import scala.concurrent.{ExecutionContext, Future} + case class AlgorithmParams( lambda: Double ) extends Params @@ -47,12 +47,12 @@ class NaiveBayesAlgorithm(val ap: AlgorithmParams) NaiveBayes.train(data.labeledPoints, ap.lambda) } - def predict(model: NaiveBayesModel, query: Query): PredictedResult = { + def predictAsync(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val label = model.predict(Vectors.dense( // MODIFIED Array(query.featureA, query.featureB, query.featureC, query.featureD) )) - PredictedResult(label) + Future.successful(PredictedResult(label)) } } diff --git a/examples/scala-parallel-ecommercerecommendation/adjust-score/build.sbt b/examples/scala-parallel-ecommercerecommendation/adjust-score/build.sbt index 7899d60199..55f20905b1 100644 --- a/examples/scala-parallel-ecommercerecommendation/adjust-score/build.sbt +++ b/examples/scala-parallel-ecommercerecommendation/adjust-score/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-ecommercerecommendation" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-ecommercerecommendation/adjust-score/src/main/scala/ECommAlgorithm.scala b/examples/scala-parallel-ecommercerecommendation/adjust-score/src/main/scala/ECommAlgorithm.scala index d63b09086c..5f0d075cdd 100644 --- a/examples/scala-parallel-ecommercerecommendation/adjust-score/src/main/scala/ECommAlgorithm.scala +++ b/examples/scala-parallel-ecommercerecommendation/adjust-score/src/main/scala/ECommAlgorithm.scala @@ -22,16 +22,15 @@ import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.store.LEventStore - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import org.apache.spark.rdd.RDD - import grizzled.slf4j.Logger import scala.collection.mutable.PriorityQueue +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global @@ -239,7 +238,7 @@ class ECommAlgorithm(val ap: ECommAlgorithmParams) buyCountsRDD.collectAsMap.toMap } - def predict(model: ECommModel, query: Query): PredictedResult = { + def predictAsync(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val userFeatures = model.userFeatures val productModels = model.productModels @@ -322,7 +321,7 @@ class ECommAlgorithm(val ap: ECommAlgorithmParams) ) } - new PredictedResult(itemScores) + Future.successful(new PredictedResult(itemScores)) } /** Generate final blackList based on other constraints */ diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/build.sbt b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/build.sbt index 7899d60199..55f20905b1 100644 --- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/build.sbt +++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-ecommercerecommendation" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala index e2c7224834..6c24811953 100644 --- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala +++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala @@ -22,16 +22,15 @@ import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.store.LEventStore - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import org.apache.spark.rdd.RDD - import grizzled.slf4j.Logger import scala.collection.mutable.PriorityQueue +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global @@ -240,7 +239,7 @@ class ECommAlgorithm(val ap: ECommAlgorithmParams) buyCountsRDD.collectAsMap.toMap } - def predict(model: ECommModel, query: Query): PredictedResult = { + def predictAsync(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val userFeatures = model.userFeatures val productModels = model.productModels @@ -311,7 +310,7 @@ class ECommAlgorithm(val ap: ECommAlgorithmParams) ) } - new PredictedResult(itemScores) + Future.successful(new PredictedResult(itemScores)) } /** Generate final blackList based on other constraints */ diff --git a/examples/scala-parallel-recommendation/blacklist-items/build.sbt b/examples/scala-parallel-recommendation/blacklist-items/build.sbt index bc124a1e64..abb84d0876 100644 --- a/examples/scala-parallel-recommendation/blacklist-items/build.sbt +++ b/examples/scala-parallel-recommendation/blacklist-items/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-recommendation" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") \ No newline at end of file diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala index d500d673e6..debf8b0e37 100644 --- a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala @@ -20,16 +20,16 @@ package org.apache.predictionio.examples.recommendation import org.apache.predictionio.controller.PAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import org.apache.spark.mllib.recommendation.ALSModel - import grizzled.slf4j.Logger +import scala.concurrent.{ExecutionContext, Future} + case class ALSAlgorithmParams( rank: Int, numIterations: Int, @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query): PredictedResult = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap @@ -103,10 +103,10 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) val itemScores = model .recommendProductsWithFilter(userInt, query.num, blackList) // MODIFIED .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) }.getOrElse{ logger.info(s"No prediction for unknown user ${query.user}.") - PredictedResult(Array.empty) + Future.successful(PredictedResult(Array.empty)) } } diff --git a/examples/scala-parallel-recommendation/customize-data-prep/build.sbt b/examples/scala-parallel-recommendation/customize-data-prep/build.sbt index bc124a1e64..abb84d0876 100644 --- a/examples/scala-parallel-recommendation/customize-data-prep/build.sbt +++ b/examples/scala-parallel-recommendation/customize-data-prep/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-recommendation" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") \ No newline at end of file diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala index 65f2f15ba4..b25dbad4e1 100644 --- a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala @@ -20,16 +20,16 @@ package org.apache.predictionio.examples.recommendation import org.apache.predictionio.controller.PAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import org.apache.spark.mllib.recommendation.ALSModel - import grizzled.slf4j.Logger +import scala.concurrent.{ExecutionContext, Future} + case class ALSAlgorithmParams( rank: Int, numIterations: Int, @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query): PredictedResult = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap @@ -101,10 +101,10 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) // index. Convert it to String ID for returning PredictedResult val itemScores = model.recommendProducts(userInt, query.num) .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) }.getOrElse{ logger.info(s"No prediction for unknown user ${query.user}.") - PredictedResult(Array.empty) + Future.successful(PredictedResult(Array.empty)) } } diff --git a/examples/scala-parallel-recommendation/customize-serving/build.sbt b/examples/scala-parallel-recommendation/customize-serving/build.sbt index bc124a1e64..abb84d0876 100644 --- a/examples/scala-parallel-recommendation/customize-serving/build.sbt +++ b/examples/scala-parallel-recommendation/customize-serving/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-recommendation" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") \ No newline at end of file diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala index 65f2f15ba4..b25dbad4e1 100644 --- a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala @@ -20,16 +20,16 @@ package org.apache.predictionio.examples.recommendation import org.apache.predictionio.controller.PAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import org.apache.spark.mllib.recommendation.ALSModel - import grizzled.slf4j.Logger +import scala.concurrent.{ExecutionContext, Future} + case class ALSAlgorithmParams( rank: Int, numIterations: Int, @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query): PredictedResult = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap @@ -101,10 +101,10 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) // index. Convert it to String ID for returning PredictedResult val itemScores = model.recommendProducts(userInt, query.num) .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) }.getOrElse{ logger.info(s"No prediction for unknown user ${query.user}.") - PredictedResult(Array.empty) + Future.successful(PredictedResult(Array.empty)) } } diff --git a/examples/scala-parallel-recommendation/reading-custom-events/build.sbt b/examples/scala-parallel-recommendation/reading-custom-events/build.sbt index bc124a1e64..abb84d0876 100644 --- a/examples/scala-parallel-recommendation/reading-custom-events/build.sbt +++ b/examples/scala-parallel-recommendation/reading-custom-events/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-recommendation" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") \ No newline at end of file diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala index 65f2f15ba4..b25dbad4e1 100644 --- a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala @@ -20,16 +20,16 @@ package org.apache.predictionio.examples.recommendation import org.apache.predictionio.controller.PAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import org.apache.spark.mllib.recommendation.ALSModel - import grizzled.slf4j.Logger +import scala.concurrent.{ExecutionContext, Future} + case class ALSAlgorithmParams( rank: Int, numIterations: Int, @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query): PredictedResult = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap @@ -101,10 +101,10 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) // index. Convert it to String ID for returning PredictedResult val itemScores = model.recommendProducts(userInt, query.num) .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) }.getOrElse{ logger.info(s"No prediction for unknown user ${query.user}.") - PredictedResult(Array.empty) + Future.successful(PredictedResult(Array.empty)) } } diff --git a/examples/scala-parallel-recommendation/train-with-view-event/build.sbt b/examples/scala-parallel-recommendation/train-with-view-event/build.sbt index bc124a1e64..abb84d0876 100644 --- a/examples/scala-parallel-recommendation/train-with-view-event/build.sbt +++ b/examples/scala-parallel-recommendation/train-with-view-event/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-recommendation" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") \ No newline at end of file diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala index 234aa0d33a..02ab817705 100644 --- a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala @@ -20,16 +20,16 @@ package org.apache.predictionio.examples.recommendation import org.apache.predictionio.controller.PAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import org.apache.spark.mllib.recommendation.ALSModel - import grizzled.slf4j.Logger +import scala.concurrent.{ExecutionContext, Future} + case class ALSAlgorithmParams( rank: Int, numIterations: Int, @@ -93,20 +93,21 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query): PredictedResult = { - // Convert String ID to Int index for Mllib - model.userStringIntMap.get(query.user).map { userInt => - // create inverse view of itemStringIntMap - val itemIntStringMap = model.itemStringIntMap.inverse - // recommendProducts() returns Array[MLlibRating], which uses item Int - // index. Convert it to String ID for returning PredictedResult - val itemScores = model.recommendProducts(userInt, query.num) - .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) - PredictedResult(itemScores) - }.getOrElse{ - logger.info(s"No prediction for unknown user ${query.user}.") - PredictedResult(Array.empty) - } + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { + Future.successful( + // Convert String ID to Int index for Mllib + model.userStringIntMap.get(query.user).map { userInt => + // create inverse view of itemStringIntMap + val itemIntStringMap = model.itemStringIntMap.inverse + // recommendProducts() returns Array[MLlibRating], which uses item Int + // index. Convert it to String ID for returning PredictedResult + val itemScores = model.recommendProducts(userInt, query.num) + .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) + PredictedResult(itemScores) + }.getOrElse{ + logger.info(s"No prediction for unknown user ${query.user}.") + PredictedResult(Array.empty) + }) } // This function is used by the evaluation module, where a batch of queries is sent to this engine diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt b/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt index e72b6f2195..7f271e762c 100644 --- a/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-similarproduct" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala index 64d570c3de..0cef67c8bd 100644 --- a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala @@ -20,15 +20,14 @@ package org.apache.predictionio.examples.similarproduct import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - import grizzled.slf4j.Logger import scala.collection.mutable.PriorityQueue +import scala.concurrent.{ExecutionContext, Future} case class ALSAlgorithmParams( rank: Int, @@ -133,8 +132,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query): PredictedResult = { - + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val productFeatures = model.productFeatures // convert items to Int index @@ -191,7 +189,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) } private diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala index 76307e7fa1..da7d0bdefb 100644 --- a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala @@ -20,10 +20,11 @@ package org.apache.predictionio.examples.similarproduct import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.concurrent.{ExecutionContext, Future} + case class CooccurrenceAlgorithmParams( n: Int // top co-occurrence ) extends Params @@ -103,7 +104,7 @@ class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams) topCooccurrences } - def predict(model: CooccurrenceModel, query: Query): PredictedResult = { + def predictAsync(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // convert items to Int index val queryList: Set[Int] = query.items @@ -146,7 +147,7 @@ class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams) ) } - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) } diff --git a/examples/scala-parallel-similarproduct/recommended-user/build.sbt b/examples/scala-parallel-similarproduct/recommended-user/build.sbt index f5b86f5edd..30a61fc0ae 100644 --- a/examples/scala-parallel-similarproduct/recommended-user/build.sbt +++ b/examples/scala-parallel-similarproduct/recommended-user/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-recommendeduser" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala index fd84284333..eaaff27bae 100644 --- a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala @@ -25,6 +25,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.{ALS, Rating => MLlibRating} import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} case class ALSAlgorithmParams( rank: Int, @@ -125,7 +126,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query): PredictedResult = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val similarUserFeatures = model.similarUserFeatures @@ -181,7 +182,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - PredictedResult(similarUserScores) + Future.successful(PredictedResult(similarUserScores)) } private diff --git a/examples/scala-parallel-similarproduct/return-item-properties/build.sbt b/examples/scala-parallel-similarproduct/return-item-properties/build.sbt index e72b6f2195..7f271e762c 100644 --- a/examples/scala-parallel-similarproduct/return-item-properties/build.sbt +++ b/examples/scala-parallel-similarproduct/return-item-properties/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-similarproduct" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala index 3bf3402231..13d98ebc5b 100644 --- a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala @@ -20,15 +20,14 @@ package org.apache.predictionio.examples.similarproduct import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - import grizzled.slf4j.Logger import scala.collection.mutable.PriorityQueue +import scala.concurrent.{ExecutionContext, Future} case class ALSAlgorithmParams( rank: Int, @@ -133,8 +132,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query): PredictedResult = { - + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val productFeatures = model.productFeatures // convert items to Int index @@ -196,7 +194,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) } private diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala index 470d87d5c4..d3293127a5 100644 --- a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala @@ -20,10 +20,11 @@ package org.apache.predictionio.examples.similarproduct import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.concurrent.{ExecutionContext, Future} + case class CooccurrenceAlgorithmParams( n: Int // top co-occurrence ) extends Params @@ -103,7 +104,7 @@ class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams) topCooccurrences } - def predict(model: CooccurrenceModel, query: Query): PredictedResult = { + def predictAsync(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // convert items to Int index val queryList: Set[Int] = query.items diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/build.sbt b/examples/scala-parallel-similarproduct/rid-user-set-event/build.sbt index e72b6f2195..7f271e762c 100644 --- a/examples/scala-parallel-similarproduct/rid-user-set-event/build.sbt +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-similarproduct" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala index 50c26b5272..d9d52d4bec 100644 --- a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala @@ -20,15 +20,14 @@ package org.apache.predictionio.examples.similarproduct import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - import grizzled.slf4j.Logger import scala.collection.mutable.PriorityQueue +import scala.concurrent.{ExecutionContext, Future} case class ALSAlgorithmParams( rank: Int, @@ -129,7 +128,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query): PredictedResult = { + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val productFeatures = model.productFeatures @@ -187,7 +186,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) } private diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/build.sbt b/examples/scala-parallel-similarproduct/train-with-rate-event/build.sbt index e72b6f2195..7f271e762c 100644 --- a/examples/scala-parallel-similarproduct/train-with-rate-event/build.sbt +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/build.sbt @@ -20,5 +20,5 @@ name := "template-scala-parallel-similarproduct" organization := "org.apache.predictionio" scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala index 507343e0fc..f5a42b164c 100644 --- a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala @@ -20,15 +20,14 @@ package org.apache.predictionio.examples.similarproduct import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - import grizzled.slf4j.Logger import scala.collection.mutable.PriorityQueue +import scala.concurrent.{ExecutionContext, Future} case class ALSAlgorithmParams( rank: Int, @@ -141,8 +140,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - def predict(model: ALSModel, query: Query): PredictedResult = { - + def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { val productFeatures = model.productFeatures // convert items to Int index @@ -199,7 +197,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) ) } - PredictedResult(itemScores) + Future.successful(PredictedResult(itemScores)) } private diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 15f223f81a..488d4b0505 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -20,7 +20,6 @@ package org.apache.predictionio.data.storage.elasticsearch import java.io.IOException import scala.collection.JavaConverters.mapAsJavaMapConverter - import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity import org.apache.http.util.EntityUtils @@ -32,8 +31,10 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write - import grizzled.slf4j.Logging +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.concurrent.{Await, ExecutionContext, Future} /** Elasticsearch implementation of AccessKeys. */ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String) @@ -90,30 +91,32 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin } def getAll(): Seq[AccessKey] = { - try { - val json = - ("query" -> - ("match_all" -> List.empty)) - ESUtils.getAll[AccessKey](client, internalIndex, estype, compact(render(json))) - } catch { - case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) - Nil - } + val json = + ("query" -> + ("match_all" -> List.empty)) + import scala.concurrent.ExecutionContext.Implicits.global + Await.result(ESUtils + .getAll[AccessKey](client, internalIndex, estype, compact(render(json))) + .recover { + case e: IOException => + error("Failed to access to /$internalIndex/$estype/_search", e) + Nil + }, 1 minute) } def getByAppid(appid: Int): Seq[AccessKey] = { - try { - val json = - ("query" -> - ("term" -> - ("appid" -> appid))) - ESUtils.getAll[AccessKey](client, internalIndex, estype, compact(render(json))) - } catch { + val json = + ("query" -> + ("term" -> + ("appid" -> appid))) + import scala.concurrent.ExecutionContext.Implicits.global + Await.result(ESUtils + .getAll[AccessKey](client, internalIndex, estype, compact(render(json))) + .recover { case e: IOException => error("Failed to access to /$internalIndex/$estype/_search", e) Nil - } + }, 1 minute) } def update(accessKey: AccessKey): Unit = { diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index cb17af8ebc..b34a0c5793 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -20,7 +20,6 @@ package org.apache.predictionio.data.storage.elasticsearch import java.io.IOException import scala.collection.JavaConverters.mapAsJavaMapConverter - import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity import org.apache.http.util.EntityUtils @@ -32,8 +31,10 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write - import grizzled.slf4j.Logging +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.concurrent.Await /** Elasticsearch implementation of Items. */ class ESApps(client: RestClient, config: StorageClientConfig, index: String) @@ -127,16 +128,18 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) } def getAll(): Seq[App] = { - try { - val json = - ("query" -> - ("match_all" -> Nil)) - ESUtils.getAll[App](client, internalIndex, estype, compact(render(json))) - } catch { - case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) - Nil - } + val json = + ("query" -> + ("match_all" -> Nil)) + import scala.concurrent.ExecutionContext.Implicits.global + + Await.result(ESUtils + .getAll[App](client, internalIndex, estype, compact(render(json))) + .recover { + case e: IOException => + error("Failed to access to /$internalIndex/$estype/_search", e) + Nil + }, 1 minute) } def update(app: App): Unit = { diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index 63b108f107..7d36dad548 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -20,7 +20,6 @@ package org.apache.predictionio.data.storage.elasticsearch import java.io.IOException import scala.collection.JavaConverters.mapAsJavaMapConverter - import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity import org.apache.http.util.EntityUtils @@ -32,8 +31,10 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write - import grizzled.slf4j.Logging +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.concurrent.Await class ESChannels(client: RestClient, config: StorageClientConfig, index: String) extends Channels with Logging { @@ -97,17 +98,18 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) } def getByAppid(appid: Int): Seq[Channel] = { - try { - val json = - ("query" -> - ("term" -> - ("appid" -> appid))) - ESUtils.getAll[Channel](client, internalIndex, estype, compact(render(json))) - } catch { - case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/_search", e) - Nil - } + val json = + ("query" -> + ("term" -> + ("appid" -> appid))) + import scala.concurrent.ExecutionContext.Implicits.global + Await.result(ESUtils + .getAll[Channel](client, internalIndex, estype, compact(render(json))) + .recover { + case e: IOException => + error(s"Failed to access to /$internalIndex/$estype/_search", e) + Nil + }, 1 minute) } def update(channel: Channel): Boolean = { diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index 02f7b98248..8db59f9107 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -20,7 +20,6 @@ package org.apache.predictionio.data.storage.elasticsearch import java.io.IOException import scala.collection.JavaConverters.mapAsJavaMapConverter - import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity import org.apache.http.util.EntityUtils @@ -33,8 +32,10 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write - import grizzled.slf4j.Logging +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.concurrent.Await class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String) extends EngineInstances with Logging { @@ -133,44 +134,47 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: } def getAll(): Seq[EngineInstance] = { - try { - val json = - ("query" -> - ("match_all" -> List.empty)) - ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) - } catch { - case e: IOException => - error("Failed to access to /$index/$estype/_search", e) - Nil - } + val json = + ("query" -> + ("match_all" -> List.empty)) + import scala.concurrent.ExecutionContext.Implicits.global + + Await.result(ESUtils + .getAll[EngineInstance](client, index, estype, compact(render(json))) + .recover { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + }, 1 minute) } def getCompleted( engineId: String, engineVersion: String, engineVariant: String): Seq[EngineInstance] = { - try { - val json = - ("query" -> - ("bool" -> - ("must" -> List( - ("term" -> - ("status" -> "COMPLETED")), - ("term" -> - ("engineId" -> engineId)), - ("term" -> - ("engineVersion" -> engineVersion)), - ("term" -> - ("engineVariant" -> engineVariant)))))) ~ - ("sort" -> List( - ("startTime" -> - ("order" -> "desc")))) - ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) - } catch { - case e: IOException => - error(s"Failed to access to /$index/$estype/_search", e) - Nil - } + val json = + ("query" -> + ("bool" -> + ("must" -> List( + ("term" -> + ("status" -> "COMPLETED")), + ("term" -> + ("engineId" -> engineId)), + ("term" -> + ("engineVersion" -> engineVersion)), + ("term" -> + ("engineVariant" -> engineVariant)))))) ~ + ("sort" -> List( + ("startTime" -> + ("order" -> "desc")))) + import scala.concurrent.ExecutionContext.Implicits.global + Await.result(ESUtils + .getAll[EngineInstance](client, index, estype, compact(render(json))) + .recover { + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + Nil + }, 1 minute) } def getLatestCompleted( diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 03b851d496..7baa0c60f5 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -20,7 +20,6 @@ package org.apache.predictionio.data.storage.elasticsearch import java.io.IOException import scala.collection.JavaConverters._ - import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity import org.apache.http.util.EntityUtils @@ -34,8 +33,10 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write - import grizzled.slf4j.Logging +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.concurrent.Await class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String) extends EvaluationInstances with Logging { @@ -107,33 +108,35 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind } def getAll(): Seq[EvaluationInstance] = { - try { - val json = - ("query" -> - ("match_all" -> List.empty)) - ESUtils.getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json))) - } catch { - case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) - Nil - } + val json = + ("query" -> + ("match_all" -> List.empty)) + import scala.concurrent.ExecutionContext.Implicits.global + Await.result(ESUtils + .getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json))) + .recover { + case e: IOException => + error("Failed to access to /$internalIndex/$estype/_search", e) + Nil + }, 1 minute) } def getCompleted(): Seq[EvaluationInstance] = { - try { - val json = - ("query" -> - ("term" -> - ("status" -> "EVALCOMPLETED"))) ~ - ("sort" -> - ("startTime" -> - ("order" -> "desc"))) - ESUtils.getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json))) - } catch { - case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) - Nil - } + val json = + ("query" -> + ("term" -> + ("status" -> "EVALCOMPLETED"))) ~ + ("sort" -> + ("startTime" -> + ("order" -> "desc"))) + import scala.concurrent.ExecutionContext.Implicits.global + Await.result(ESUtils + .getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json))) + .recover { + case e: IOException => + error("Failed to access to /$internalIndex/$estype/_search", e) + Nil + }, 1 minute) } def update(i: EvaluationInstance): Unit = { diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index f275ec9210..eedbd417a2 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -37,6 +37,7 @@ import org.json4s.native.Serialization.write import org.json4s.ext.JodaTimeSerializers import grizzled.slf4j.Logging import org.apache.http.message.BasicHeader +import org.apache.predictionio.data.storage.elasticsearch.ScalaRestClient.ExtendedScalaRestClient class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseIndex: String) extends LEvents with Logging { @@ -107,7 +108,6 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { - Future { val estype = getEsType(appId, channelId) val index = baseIndex + "_" + estype try { @@ -127,33 +127,39 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~ ("properties" -> write(event.properties.toJObject)) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val futureResponse = client.performRequestFuture( "POST", s"/$index/$estype/$id", - Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, + Map("refresh" -> ESUtils.getEventDataRefresh(config)), entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "created" => id - case "updated" => id - case _ => - error(s"[$result] Failed to update $index/$estype/$id") + + futureResponse.map { + response => + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => id + case "updated" => id + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + "" + } + }.recover { + case t => + error(s"Failed to update $index/$estype/", t) "" } } catch { case e: IOException => error(s"Failed to update $index/$estype/", e) - "" + Future.successful("") } - } } override def futureInsertBatch( events: Seq[Event], appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = { - Future { val estype = getEsType(appId, channelId) val index = baseIndex + "_" + estype try { @@ -187,34 +193,40 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd }.mkString("", "\n", "\n") val entity = new StringEntity(json) - val response = client.performRequest( + val responseFuture = client.performRequestFuture( "POST", "/_bulk", - Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, + Map("refresh" -> ESUtils.getEventDataRefresh(config)), entity, new BasicHeader("Content-Type", "application/x-ndjson")) - val responseJson = parse(EntityUtils.toString(response.getEntity)) - val items = (responseJson \ "items").asInstanceOf[JArray] + responseFuture.map { + response => + val responseJson = parse(EntityUtils.toString(response.getEntity)) + val items = (responseJson \ "items").asInstanceOf[JArray] - items.arr.map { case value: JObject => - val result = (value \ "index" \ "result").extract[String] - val id = (value \ "index" \ "_id").extract[String] + items.arr.map { case value: JObject => + val result = (value \ "index" \ "result").extract[String] + val id = (value \ "index" \ "_id").extract[String] - result match { - case "created" => id - case "updated" => id - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - "" - } + result match { + case "created" => id + case "updated" => id + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + "" + } + } + }.recover { + case t => + error(s"Failed to update $index/$estype/", t) + Nil } } catch { case e: IOException => error(s"Failed to update $index/$estype/", e) - Nil + Future.successful(Nil) } - } } private def exists(client: RestClient, estype: String, id: Int): Boolean = { @@ -245,7 +257,6 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { - Future { val estype = getEsType(appId, channelId) val index = baseIndex + "_" + estype try { @@ -254,32 +265,37 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd ("term" -> ("eventId" -> eventId))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val responseFuture = client.performRequestFuture( "POST", s"/$index/$estype/_search", - Map.empty[String, String].asJava, + Map.empty[String, String], entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "hits" \ "total").extract[Long] match { - case 0 => None - case _ => - val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] - val result = (results.head \ "_source").extract[Event] - Some(result) + responseFuture.map { + response => + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "hits" \ "total").extract[Long] match { + case 0 => None + case _ => + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + val result = (results.head \ "_source").extract[Event] + Some(result) + } + }.recover { + case t => + error("Failed to access to /$index/$estype/_search", t) + None } } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) - None + Future.successful(None) } - } } override def futureDelete( eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { - Future { val estype = getEsType(appId, channelId) val index = baseIndex + "_" + estype try { @@ -288,19 +304,25 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd ("term" -> ("eventId" -> eventId))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val responseFuture = client.performRequestFuture( "POST", s"/$index/$estype/_delete_by_query", - Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, + Map("refresh" -> ESUtils.getEventDataRefresh(config)), entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "deleted").extract[Int] > 0 + responseFuture.map { + response => + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "deleted").extract[Int] > 0 + }.recover { + case t => + error(s"Failed to delete $index/$estype:$eventId", t) + false + } } catch { case e: IOException => error(s"Failed to delete $index/$estype:$eventId", e) - false + Future.successful(false) } - } } override def futureFind( @@ -316,23 +338,26 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd limit: Option[Int] = None, reversed: Option[Boolean] = None) (implicit ec: ExecutionContext): Future[Iterator[Event]] = { - Future { val estype = getEsType(appId, channelId) val index = baseIndex + "_" + estype try { val query = ESUtils.createEventQuery( startTime, untilTime, entityType, entityId, eventNames, targetEntityType, targetEntityId, reversed) - limit.getOrElse(20) match { - case -1 => ESUtils.getEventAll(client, index, estype, query).toIterator - case size => ESUtils.getEvents(client, index, estype, query, size).toIterator + val eventsFuture = limit.getOrElse(20) match { + case -1 => ESUtils.getEventAll(client, index, estype, query).map(_.toIterator) + case size => ESUtils.getEvents(client, index, estype, query, size).map(_.toIterator) + } + eventsFuture.recover { + case t => + error(t.getMessage) + Iterator.empty } } catch { case e: IOException => error(e.getMessage) - Iterator.empty + Future.successful(Iterator.empty) } - } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index cd9aa53a7c..8f439582ce 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -19,7 +19,6 @@ package org.apache.predictionio.data.storage.elasticsearch import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ - import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity import org.apache.http.nio.entity.NStringEntity @@ -36,6 +35,10 @@ import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.http.HttpHost import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.elasticsearch.ScalaRestClient.ExtendedScalaRestClient + +import scala.concurrent.{ExecutionContext, Future} + object ESUtils { val scrollLife = "1m" @@ -86,8 +89,8 @@ object ESUtils { estype: String, query: String, size: Int)( - implicit formats: Formats): Seq[Event] = { - getDocList(client, index, estype, query, size).map(x => toEvent(x)) + implicit formats: Formats, ec: ExecutionContext): Future[Seq[Event]] = { + getDocList(client, index, estype, query, size).map(docs => docs.map(x => toEvent(x))) } def getDocList( @@ -96,16 +99,19 @@ object ESUtils { estype: String, query: String, size: Int)( - implicit formats: Formats): Seq[JValue] = { + implicit formats: Formats, ec: ExecutionContext): Future[Seq[JValue]] = { val entity = new NStringEntity(query, ContentType.APPLICATION_JSON) - val response = client.performRequest( + val responseFuture = client.performRequestFuture( "POST", s"/$index/$estype/_search", Map("size" -> s"${size}"), entity) - val responseJValue = parse(EntityUtils.toString(response.getEntity)) - val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]] - hits.map(h => (h \ "_source")) + responseFuture.map { + response => + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]] + hits.map(h => (h \ "_source")) + } } def getAll[T: Manifest]( @@ -113,8 +119,9 @@ object ESUtils { index: String, estype: String, query: String)( - implicit formats: Formats): Seq[T] = { - getDocAll(client, index, estype, query).map(x => x.extract[T]) + implicit formats: Formats, ec: ExecutionContext): Future[Seq[T]] = { + getDocAll(client, index, estype, query) + .map(docs => docs.map(x => x.extract[T])) } def getEventAll( @@ -122,8 +129,8 @@ object ESUtils { index: String, estype: String, query: String)( - implicit formats: Formats): Seq[Event] = { - getDocAll(client, index, estype, query).map(x => toEvent(x)) + implicit formats: Formats, ec: ExecutionContext): Future[Seq[Event]] = { + getDocAll(client, index, estype, query).map(docs => docs.map(x => toEvent(x))) } def getDocAll( @@ -131,7 +138,7 @@ object ESUtils { index: String, estype: String, query: String)( - implicit formats: Formats): Seq[JValue] = { + implicit formats: Formats, ec: ExecutionContext): Future[Seq[JValue]] = { @scala.annotation.tailrec def scroll(scrollId: String, hits: Seq[JValue], results: Seq[JValue]): Seq[JValue] = { @@ -152,15 +159,18 @@ object ESUtils { } val entity = new NStringEntity(query, ContentType.APPLICATION_JSON) - val response = client.performRequest( + val responseFuture = client.performRequestFuture( "POST", s"/$index/$estype/_search", Map("scroll" -> scrollLife), entity) - val responseJValue = parse(EntityUtils.toString(response.getEntity)) - scroll((responseJValue \ "_scroll_id").extract[String], - (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], - Nil) + responseFuture.map { + response => + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + scroll((responseJValue \ "_scroll_id").extract[String], + (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], + Nil) + } } def createIndex( diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ScalaRestClient.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ScalaRestClient.scala new file mode 100644 index 0000000000..08a23f3f71 --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ScalaRestClient.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import org.apache.http.{Header, HttpEntity} +import org.elasticsearch.client.{Response, ResponseListener, RestClient} + +import scala.collection.JavaConverters._ +import scala.concurrent.{Future, Promise} + +object ScalaRestClient { + + implicit class ExtendedScalaRestClient(restClient: RestClient) { + + def performRequestFuture(method: String, endpoint: String, params: Map[String, String], + entity: HttpEntity, headers: Header*): Future[Response] = { + val promise: Promise[Response] = Promise() + val responseListener = new ResponseListener { + override def onSuccess(response: Response): Unit = promise.success(response) + override def onFailure(exception: Exception): Unit = promise.failure(exception) + } + restClient.performRequestAsync( + method, endpoint, params.asJava, entity, responseListener, headers: _*) + promise.future + } + } +} diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala index 4b0ad9a5a2..d7b006bdbb 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala @@ -21,7 +21,6 @@ package org.apache.predictionio.data.storage.hbase import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.storage.EventValidation import org.apache.predictionio.data.storage.DataMap - import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.Scan @@ -32,30 +31,28 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.filter.BinaryComparator import org.apache.hadoop.hbase.filter.QualifierFilter import org.apache.hadoop.hbase.filter.SkipFilter - import org.json4s.DefaultFormats import org.json4s.JObject -import org.json4s.native.Serialization.{ read, write } - +import org.json4s.native.Serialization.{read, write} import org.joda.time.DateTime import org.joda.time.DateTimeZone - import org.apache.commons.codec.binary.Base64 import java.security.MessageDigest - import java.util.UUID +import org.apache.hadoop.hbase.TableName + /* common utility function for accessing EventsStore in HBase */ object HBEventsUtil { implicit val formats = DefaultFormats - def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): String = { - channelId.map { ch => + def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): TableName = { + TableName.valueOf(channelId.map { ch => s"${namespace}:events_${appId}_${ch}" }.getOrElse { s"${namespace}:events_${appId}" - } + }) } // column names for "e" column family diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala index e95e7e82b1..5a66caffad 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala @@ -18,6 +18,8 @@ package org.apache.predictionio.data.storage.hbase +import java.util.concurrent.Executors + import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.storage.LEvents @@ -26,23 +28,26 @@ import org.apache.predictionio.data.storage.hbase.HBEventsUtil.RowKey import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.HTableDescriptor import org.apache.hadoop.hbase.NamespaceDescriptor -import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client._ import org.joda.time.DateTime import scala.collection.JavaConversions._ import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.blocking class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace: String) extends LEvents with Logging { + private val blockingThreadPoolExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) + // implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer def resultToEvent(result: Result, appId: Int): Event = HBEventsUtil.resultToEvent(result, appId) - def getTable(appId: Int, channelId: Option[Int] = None): HTableInterface = + def getTable(appId: Int, channelId: Option[Int] = None): Table = client.connection.getTable(HBEventsUtil.tableName(namespace, appId, channelId)) override @@ -56,7 +61,7 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace client.admin.createNamespace(nameDesc) } - val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId)) + val tableName = HBEventsUtil.tableName(namespace, appId, channelId) if (!client.admin.tableExists(tableName)) { info(s"The table ${tableName.getNameAsString()} doesn't exist yet." + " Creating now...") @@ -70,7 +75,7 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { - val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId)) + val tableName = HBEventsUtil.tableName(namespace, appId, channelId) try { if (client.admin.tableExists(tableName)) { info(s"Removing table ${tableName.getNameAsString()}...") @@ -100,13 +105,14 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { Future { - val table = getTable(appId, channelId) - val (put, rowKey) = HBEventsUtil.eventToPut(event, appId) - table.put(put) - table.flushCommits() - table.close() - rowKey.toString - } + blocking { + val table = getTable(appId, channelId) + val (put, rowKey) = HBEventsUtil.eventToPut(event, appId) + table.put(put) + table.close() + rowKey.toString + } + }(blockingThreadPoolExecutor) } override @@ -114,13 +120,14 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace events: Seq[Event], appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = { Future { - val table = getTable(appId, channelId) - val (puts, rowKeys) = events.map { event => HBEventsUtil.eventToPut(event, appId) }.unzip - table.put(puts) - table.flushCommits() - table.close() - rowKeys.map(_.toString) - } + blocking { + val table = getTable(appId, channelId) + val (puts, rowKeys) = events.map { event => HBEventsUtil.eventToPut(event, appId) }.unzip + table.put(puts) + table.close() + rowKeys.map(_.toString) + } + }(blockingThreadPoolExecutor) } override @@ -128,20 +135,22 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { Future { - val table = getTable(appId, channelId) - val rowKey = RowKey(eventId) - val get = new Get(rowKey.toBytes) - - val result = table.get(get) - table.close() - - if (!result.isEmpty()) { - val event = resultToEvent(result, appId) - Some(event) - } else { - None + blocking { + val table = getTable(appId, channelId) + val rowKey = RowKey(eventId) + val get = new Get(rowKey.toBytes) + + val result = table.get(get) + table.close() + + if (!result.isEmpty()) { + val event = resultToEvent(result, appId) + Some(event) + } else { + None + } } - } + }(blockingThreadPoolExecutor) } override @@ -149,13 +158,15 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { Future { - val table = getTable(appId, channelId) - val rowKey = RowKey(eventId) - val exists = table.exists(new Get(rowKey.toBytes)) - table.delete(new Delete(rowKey.toBytes)) - table.close() - exists - } + blocking { + val table = getTable(appId, channelId) + val rowKey = RowKey(eventId) + val exists = table.exists(new Get(rowKey.toBytes)) + table.delete(new Delete(rowKey.toBytes)) + table.close() + exists + } + }(blockingThreadPoolExecutor) } override @@ -173,37 +184,39 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace reversed: Option[Boolean] = None)(implicit ec: ExecutionContext): Future[Iterator[Event]] = { Future { - - require(!((reversed == Some(true)) && (entityType.isEmpty || entityId.isEmpty)), - "the parameter reversed can only be used with both entityType and entityId specified.") - - val table = getTable(appId, channelId) - - val scan = HBEventsUtil.createScan( - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - reversed = reversed) - val scanner = table.getScanner(scan) - table.close() - - val eventsIter = scanner.iterator() - - // Get all events if None or Some(-1) - val results: Iterator[Result] = limit match { - case Some(-1) => eventsIter - case None => eventsIter - case Some(x) => eventsIter.take(x) + blocking { + require(!(reversed.contains(true) && (entityType.isEmpty || entityId.isEmpty)), + "the parameter reversed can only be used with both entityType and entityId specified.") + + val table = getTable(appId, channelId) + + val scan = HBEventsUtil.createScan( + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + reversed = reversed) + val scanner = table.getScanner(scan) + table.close() + + val eventsIter = scanner.iterator() + + // Get all events if None or Some(-1) + val results: Iterator[Result] = limit match { + case Some(-1) => eventsIter + case None => eventsIter + case Some(x) => eventsIter.take(x) + } + + val eventsIt = results.map { + resultToEvent(_, appId) + } + + eventsIt } - - val eventsIt = results.map { resultToEvent(_, appId) } - - eventsIt - } + }(blockingThreadPoolExecutor) } - } diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala index 7324fa68e9..1afea36f14 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala @@ -61,7 +61,7 @@ class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, - HBEventsUtil.tableName(namespace, appId, channelId)) + HBEventsUtil.tableName(namespace, appId, channelId).getNameAsString) val scan = HBEventsUtil.createScan( startTime = startTime, @@ -95,7 +95,7 @@ class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String val conf = HBaseConfiguration.create() conf.set(TableOutputFormat.OUTPUT_TABLE, - HBEventsUtil.tableName(namespace, appId, channelId)) + HBEventsUtil.tableName(namespace, appId, channelId).getNameAsString) conf.setClass("mapreduce.outputformat.class", classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]]) @@ -117,9 +117,9 @@ class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String eventIds.foreachPartition{ iter => val conf = HBaseConfiguration.create() conf.set(TableOutputFormat.OUTPUT_TABLE, - tableName) + tableName.getNameAsString) - val table = new HTable(conf, tableName) + val table = client.connection.getTable(tableName) iter.foreach { id => val rowKey = HBEventsUtil.RowKey(id) val delete = new Delete(rowKey.b) diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala index 1720410150..ff9086d79c 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala @@ -20,27 +20,23 @@ package org.apache.predictionio.data.storage.hbase import org.apache.predictionio.data.storage.BaseStorageClient import org.apache.predictionio.data.storage.StorageClientConfig - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.MasterNotRunningException import org.apache.hadoop.hbase.ZooKeeperConnectionException -import org.apache.hadoop.hbase.client.HConnectionManager -import org.apache.hadoop.hbase.client.HConnection -import org.apache.hadoop.hbase.client.HBaseAdmin - +import org.apache.hadoop.hbase.client._ import grizzled.slf4j.Logging case class HBClient( - val conf: Configuration, - val connection: HConnection, - val admin: HBaseAdmin + conf: Configuration, + connection: Connection, + admin: Admin ) class StorageClient(val config: StorageClientConfig) extends BaseStorageClient with Logging { - val conf = HBaseConfiguration.create() + val conf: Configuration = HBaseConfiguration.create() if (config.test) { // use fewer retries and shorter timeout for test mode @@ -73,12 +69,12 @@ class StorageClient(val config: StorageClientConfig) } } - val connection = HConnectionManager.createConnection(conf) + val connection: Connection = ConnectionFactory.createConnection(conf) val client = HBClient( conf = conf, connection = connection, - admin = new HBaseAdmin(connection) + admin = connection.getAdmin ) override diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala index 795cf7e290..0ee8c0eabd 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala @@ -19,24 +19,17 @@ package org.apache.predictionio.data.storage.hbase.upgrade import org.apache.predictionio.annotation.Experimental - import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.storage.EventValidation import org.apache.predictionio.data.storage.DataMap - -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.client.HConnection -import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.client.{Connection, HConnection, Result, Scan} import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.util.Bytes - import org.joda.time.DateTime import org.joda.time.DateTimeZone - import org.json4s.DefaultFormats import org.json4s.JObject -import org.json4s.native.Serialization.{ read, write } - +import org.json4s.native.Serialization.{read, write} import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConversions._ @@ -48,7 +41,7 @@ object HB_0_8_0 { implicit val formats = DefaultFormats def getByAppId( - connection: HConnection, + connection: Connection, namespace: String, appId: Int): Iterator[Event] = { val tableName = TableName.valueOf(namespace, "events") diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala index 1759561207..11eed00fd0 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala @@ -67,7 +67,6 @@ object Upgrade { newTable.put(puts.toList) } - newTable.flushCommits() newTable.close() println("Done.") } diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala index b4230ccd11..0ffa18786d 100644 --- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala @@ -32,6 +32,7 @@ import scalikejdbc._ import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.blocking /** JDBC implementation of [[LEvents]] */ class JDBCLEvents( @@ -103,10 +104,11 @@ class JDBCLEvents( override def futureInsert(event: Event, appId: Int, channelId: Option[Int])( implicit ec: ExecutionContext): Future[String] = Future { - DB localTx { implicit session => - val id = event.eventId.getOrElse(JDBCUtils.generateId) - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - sql""" + blocking { + DB localTx { implicit session => + val id = event.eventId.getOrElse(JDBCUtils.generateId) + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" insert into $tableName values( $id, ${event.event}, @@ -123,34 +125,36 @@ class JDBCLEvents( ${event.creationTime.getZone.getID} ) """.update().apply() - id + id + } } } override def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])( implicit ec: ExecutionContext): Future[Seq[String]] = Future { - DB localTx { implicit session => - val ids = events.map(_.eventId.getOrElse(JDBCUtils.generateId)) - val params = events.zip(ids).map { case (event, id) => - Seq( - 'id -> id, - 'event -> event.event, - 'entityType -> event.entityType, - 'entityId -> event.entityId, - 'targetEntityType -> event.targetEntityType, - 'targetEntityId -> event.targetEntityId, - 'properties -> write(event.properties.toJObject), - 'eventTime -> event.eventTime, - 'eventTimeZone -> event.eventTime.getZone.getID, - 'tags -> (if(event.tags.nonEmpty) Some(event.tags.mkString(",")) else None), - 'prId -> event.prId, - 'creationTime -> event.creationTime, - 'creationTimeZone -> event.creationTime.getZone.getID - ) - } + blocking { + DB localTx { implicit session => + val ids = events.map(_.eventId.getOrElse(JDBCUtils.generateId)) + val params = events.zip(ids).map { case (event, id) => + Seq( + 'id -> id, + 'event -> event.event, + 'entityType -> event.entityType, + 'entityId -> event.entityId, + 'targetEntityType -> event.targetEntityType, + 'targetEntityId -> event.targetEntityId, + 'properties -> write(event.properties.toJObject), + 'eventTime -> event.eventTime, + 'eventTimeZone -> event.eventTime.getZone.getID, + 'tags -> (if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None), + 'prId -> event.prId, + 'creationTime -> event.creationTime, + 'creationTimeZone -> event.creationTime.getZone.getID + ) + } - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - sql""" + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" insert into $tableName values( {id}, {event}, @@ -168,15 +172,17 @@ class JDBCLEvents( ) """.batchByName(params: _*).apply() - ids + ids + } } } override def futureGet(eventId: String, appId: Int, channelId: Option[Int])( implicit ec: ExecutionContext): Future[Option[Event]] = Future { - DB readOnly { implicit session => - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - sql""" + blocking { + DB readOnly { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" select id, event, @@ -194,17 +200,20 @@ class JDBCLEvents( from $tableName where id = $eventId """.map(resultToEvent).single().apply() + } } } override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])( implicit ec: ExecutionContext): Future[Boolean] = Future { - DB localTx { implicit session => - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - sql""" + blocking { + DB localTx { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" delete from $tableName where id = $eventId """.update().apply() - true + true + } } } @@ -221,30 +230,32 @@ class JDBCLEvents( limit: Option[Int] = None, reversed: Option[Boolean] = None )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future { - DB readOnly { implicit session => - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - val whereClause = sqls.toAndConditionOpt( - startTime.map(x => sqls"eventTime >= $x"), - untilTime.map(x => sqls"eventTime < $x"), - entityType.map(x => sqls"entityType = $x"), - entityId.map(x => sqls"entityId = $x"), - eventNames.map(x => - sqls.toOrConditionOpt(x.map(y => - Some(sqls"event = $y") - ): _*) - ).getOrElse(None), - targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y") + blocking { + DB readOnly { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + val whereClause = sqls.toAndConditionOpt( + startTime.map(x => sqls"eventTime >= $x"), + untilTime.map(x => sqls"eventTime < $x"), + entityType.map(x => sqls"entityType = $x"), + entityId.map(x => sqls"entityId = $x"), + eventNames.map(x => + sqls.toOrConditionOpt(x.map(y => + Some(sqls"event = $y") + ): _*) + ).getOrElse(None), + targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y") .getOrElse(sqls"targetEntityType IS NULL")), - targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y") + targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y") .getOrElse(sqls"targetEntityId IS NULL")) - ).map(sqls.where(_)).getOrElse(sqls"") - val orderByClause = reversed.map(x => - if (x) sqls"eventTime desc" else sqls"eventTime asc" - ).getOrElse(sqls"eventTime asc") - val limitClause = limit.map(x => - if (x < 0) sqls"" else sqls.limit(x) - ).getOrElse(sqls"") - val q = sql""" + ).map(sqls.where(_)).getOrElse(sqls"") + val orderByClause = reversed.map(x => + if (x) sqls"eventTime desc" else sqls"eventTime asc" + ).getOrElse(sqls"eventTime asc") + val limitClause = limit.map(x => + if (x < 0) sqls"" else sqls.limit(x) + ).getOrElse(sqls"") + val q = + sql""" select id, event, @@ -264,7 +275,8 @@ class JDBCLEvents( order by $orderByClause $limitClause """ - q.map(resultToEvent).list().apply().toIterator + q.map(resultToEvent).list().apply().toIterator + } } } diff --git a/tests/pio_tests/engines/recommendation-engine/build.sbt b/tests/pio_tests/engines/recommendation-engine/build.sbt index 4688165c1e..d958972560 100644 --- a/tests/pio_tests/engines/recommendation-engine/build.sbt +++ b/tests/pio_tests/engines/recommendation-engine/build.sbt @@ -26,6 +26,6 @@ name := "template-scala-parallel-recommendation" organization := "org.apache.predictionio" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.13.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", "org.apache.spark" %% "spark-core" % sys.env.getOrElse("PIO_SPARK_VERSION", "2.1.1") % "provided", "org.apache.spark" %% "spark-mllib" % sys.env.getOrElse("PIO_SPARK_VERSION", "2.1.1") % "provided") diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala index f22d2f7acc..b51b0f48bd 100644 --- a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala +++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala @@ -20,16 +20,16 @@ package org.template.recommendation import org.apache.predictionio.controller.PAlgorithm import org.apache.predictionio.controller.Params import org.apache.predictionio.data.storage.BiMap - import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import org.apache.spark.mllib.recommendation.ALSModel - import grizzled.slf4j.Logger +import scala.concurrent.{ExecutionContext, Future} + case class ALSAlgorithmParams( rank: Int, numIterations: Int, @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) itemStringIntMap = itemStringIntMap) } - def predict(model: ALSModel, query: Query): PredictedResult = { + override def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { // Convert String ID to Int index for Mllib model.userStringIntMap.get(query.user).map { userInt => // create inverse view of itemStringIntMap @@ -101,10 +101,10 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams) // index. Convert it to String ID for returning PredictedResult val itemScores = model.recommendProducts(userInt, query.num) .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) - new PredictedResult(itemScores) + Future.successful(new PredictedResult(itemScores)) }.getOrElse{ logger.info(s"No prediction for unknown user ${query.user}.") - new PredictedResult(Array.empty) + Future.successful(new PredictedResult(Array.empty)) } }