Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
making async support backward compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Wewerka committed Nov 9, 2018
1 parent 183c9a7 commit c7aba5a
Show file tree
Hide file tree
Showing 24 changed files with 136 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,38 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P]
cartesian.flatMap { case (m, qArray) =>
qArray.map {
case (qx, q) =>
(qx, Await.result(predict(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) )
(qx,
Await.result(predictAsync(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) )
}
}
}

def predictBase(localBaseModel: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = {
predict(localBaseModel.asInstanceOf[M], q)(ec)
}
override def predictBaseAsync(localBaseModel: Any, q: Q)(implicit ec: ExecutionContext)
: Future[P] =
predictAsync(localBaseModel.asInstanceOf[M], q)(ec)

@deprecated(message =
"this method is just here for backward compatibility, predictBaseAsync() is called now",
since = "0.14.0")
override def predictBase(localBaseModel: Any, q: Q): P =
predict(localBaseModel.asInstanceOf[M], q)

/** Implement this method to produce a Future of a prediction in a non blocking way
* from a query and trained model.
*
* This method is implemented to just delegate to blocking predict() for
* backward compatibility reasons.
* Definitely overwrite it to implement your blocking prediction method, and leave
* the old blocking predict() as it is (throwing an exception), it won't be called from
* now on.
*
* @param model Trained model produced by [[train]].
* @param query An input query.
* @param ec ExecutionContext to use for async operations
* @return A Future of a prediction.
*/
def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] =
Future.successful(blocking(predict(model, query)))

/** Implement this method to produce a prediction from a query and trained
* model.
Expand All @@ -93,7 +117,9 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P]
* @param q An input query.
* @return A prediction.
*/
def predict(m: M, q: Q)(implicit ec: ExecutionContext): Future[P]
@deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0")
def predict(m: M, q: Q): P =
throw new NotImplementedError("predict() is deprecated, override predictAsync() instead")

/** :: DeveloperApi ::
* Engine developers should not use this directly (read on to see how local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,34 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P]
*/
def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = {
qs.mapValues { q =>
Await.result(predict(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes)
Await.result(predictAsync(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes)
}
}

def predictBase(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] =
predict(bm.asInstanceOf[M], q)(ec)
override def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] =
predictAsync(bm.asInstanceOf[M], q)(ec)

@deprecated(message =
"this method is just here for backward compatibility, predictBaseAsync() is called now",
since = "0.14.0")
override def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q)

/** Implement this method to produce a Future of a prediction in a non blocking way
* from a query and trained model.
*
* This method is implemented to just delegate to blocking predict() for
* backward compatibility reasons.
* Definitely overwrite it to implement your blocking prediction method, and leave
* the old blocking predict() as it is (throwing an exception), it won't be called from
* now on.
*
* @param model Trained model produced by [[train]].
* @param query An input query.
* @param ec ExecutionContext to use for async operations
* @return A Future of a prediction.
*/
def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] =
Future.successful(blocking(predict(model, query)))

/** Implement this method to produce a prediction from a query and trained
* model.
Expand All @@ -85,7 +107,9 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P]
* @param query An input query.
* @return A prediction.
*/
def predict(model: M, query: Q)(implicit ec: ExecutionContext): Future[P]
@deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0")
def predict(model: M, query: Q): P =
throw new NotImplementedError("predict() is deprecated, override predictAsync() instead")

/** :: DeveloperApi ::
* Engine developers should not use this directly (read on to see how
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.predictionio.workflow.PersistentModelManifest
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, blocking}

/** Base class of a parallel algorithm.
*
Expand Down Expand Up @@ -74,9 +74,31 @@ abstract class PAlgorithm[PD, M, Q, P]
def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] =
throw new NotImplementedError("batchPredict not implemented")

def predictBase(baseModel: Any, query: Q)(implicit ec: ExecutionContext): Future[P] = {
predict(baseModel.asInstanceOf[M], query)(ec)
}
override def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] =
predictAsync(bm.asInstanceOf[M], q)(ec)

@deprecated(message =
"this method is just here for backward compatibility, predictBaseAsync() is called now",
since = "0.14.0")
override def predictBase(baseModel: Any, query: Q): P =
predict(baseModel.asInstanceOf[M], query)

/** Implement this method to produce a Future of a prediction in a non blocking way
* from a query and trained model.
*
* This method is implemented to just delegate to blocking predict() for
* backward compatibility reasons.
* Definitely overwrite it to implement your blocking prediction method, and leave
* the old blocking predict() as it is (throwing an exception), it won't be called from
* now on.
*
* @param model Trained model produced by [[train]].
* @param query An input query.
* @param ec ExecutionContext to use for async operations
* @return A Future of a prediction.
*/
def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] =
Future.successful(blocking(predict(model, query)))

/** Implement this method to produce a prediction from a query and trained
* model.
Expand All @@ -85,7 +107,9 @@ abstract class PAlgorithm[PD, M, Q, P]
* @param query An input query.
* @return A prediction.
*/
def predict(model: M, query: Q)(implicit ec: ExecutionContext): Future[P]
@deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0")
def predict(model: M, query: Q): P =
throw new NotImplementedError("predict() is deprecated, override predictAsync() instead")

/** :: DeveloperApi ::
* Engine developers should not use this directly (read on to see how parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import net.jodah.typetools.TypeResolver
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, blocking}

/** :: DeveloperApi ::
* Base trait with default custom query serializer, exposed to engine developer
Expand Down Expand Up @@ -83,6 +83,19 @@ abstract class BaseAlgorithm[PD, M, Q, P]
def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
: RDD[(Long, P)]

/** :: DeveloperApi ::
* Engine developers should not use this directly. Called by serving to
* perform a single prediction.
*
* @param bm Model
* @param q Query
* @param ec ExecutionContext to use for async operations
* @return Future of a Predicted result
*/
@DeveloperApi
def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] =
Future.successful(blocking {predictBase(bm, q)})

/** :: DeveloperApi ::
* Engine developers should not use this directly. Called by serving to
* perform a single prediction.
Expand All @@ -92,7 +105,11 @@ abstract class BaseAlgorithm[PD, M, Q, P]
* @return Predicted result
*/
@DeveloperApi
def predictBase(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P]
@deprecated(message = "override non blocking predictBaseAsync() instead", since = "0.14.0")
def predictBase(bm: Any, q: Q): P =
throw new NotImplementedError(
"predictBase() is deprecated, override predictBaseAsync() instead"
)

/** :: DeveloperApi ::
* Engine developers should not use this directly. Prepare a model for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ object BatchPredict extends Logging {
// finally Serving.serve.
val supplementedQuery = serving.supplementBase(query)
val predictionsFuture = Future.sequence(algorithms.zip(models).map { case (a, m) =>
a.predictBase(m, supplementedQuery)
a.predictBaseAsync(m, supplementedQuery)
})
// Notice that it is by design to call Serving.serve with the
// *original* query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ class PredictionServer[Q, P](
val supplementedQuery = serving.supplementBase(query)

val predictionsFuture = Future.sequence(algorithms.zip(models).map { case (a, m) =>
a.predictBase(m, supplementedQuery)
a.predictBaseAsync(m, supplementedQuery)
})
// Notice that it is by design to call Serving.serve with the
// *original* query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ object Engine0 {
def train(sc: SparkContext, pd: ProcessedData)
: NAlgo0.Model = NAlgo0.Model(id, pd)

def predict(m: NAlgo0.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = {
override def predictAsync(m: NAlgo0.Model, q: Query)
(implicit ec: ExecutionContext): Future[Prediction] = {
Future.successful(Prediction(id, q, Some(m)))
}
}
Expand All @@ -363,7 +364,8 @@ object Engine0 {
def train(sc: SparkContext, pd: ProcessedData)
: NAlgo1.Model = NAlgo1.Model(id, pd)

def predict(m: NAlgo1.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = {
override def predictAsync(m: NAlgo1.Model, q: Query)
(implicit ec: ExecutionContext): Future[Prediction] = {
Future.successful(Prediction(id, q, Some(m)))
}
}
Expand All @@ -377,12 +379,12 @@ object Engine0 {
object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model]
}

class NAlgo2(params: NAlgo2.Params)
class NAlgo2(params: NAlgo2.Params)
extends P2LAlgorithm[ProcessedData, NAlgo2.Model, Query, Prediction] {
def train(sc: SparkContext, pd: ProcessedData)
: NAlgo2.Model = NAlgo2.Model(params.id, pd)

def predict(m: NAlgo2.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = {
override def predictAsync(m: NAlgo2.Model, q: Query)
(implicit ec: ExecutionContext): Future[Prediction] = {
Future.successful(Prediction(params.id, q, Some(m)))
}
}
Expand All @@ -393,12 +395,13 @@ object Engine0 {
case class Model(id: Int, pd: ProcessedData)
}

class NAlgo3(params: NAlgo3.Params)
class NAlgo3(params: NAlgo3.Params)
extends P2LAlgorithm[ProcessedData, NAlgo3.Model, Query, Prediction] {
def train(sc: SparkContext, pd: ProcessedData)
: NAlgo3.Model = NAlgo3.Model(params.id, pd)

def predict(m: NAlgo3.Model, q: Query)(implicit ec: ExecutionContext): Future[Prediction] = {
override def predictAsync(m: NAlgo3.Model, q: Query)
(implicit ec: ExecutionContext): Future[Prediction] = {
Future.successful(Prediction(params.id, q, Some(m)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class NaiveBayesAlgorithm(val ap: AlgorithmParams)
NaiveBayes.train(data.labeledPoints, ap.lambda)
}

def predict(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
val label = model.predict(Vectors.dense(
Array(query.attr0, query.attr1, query.attr2)
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class NaiveBayesAlgorithm(val ap: AlgorithmParams)
NaiveBayes.train(data.labeledPoints, ap.lambda)
}

def predict(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: NaiveBayesModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
val label = model.predict(Vectors.dense(
// MODIFIED
Array(query.featureA, query.featureB, query.featureC, query.featureD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class ECommAlgorithm(val ap: ECommAlgorithmParams)
buyCountsRDD.collectAsMap.toMap
}

def predict(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {

val userFeatures = model.userFeatures
val productModels = model.productModels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class ECommAlgorithm(val ap: ECommAlgorithmParams)
buyCountsRDD.collectAsMap.toMap
}

def predict(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ECommModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {

val userFeatures = model.userFeatures
val productModels = model.productModels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
itemStringIntMap = itemStringIntMap)
}

def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
// Convert String ID to Int index for Mllib
model.userStringIntMap.get(query.user).map { userInt =>
// create inverse view of itemStringIntMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
itemStringIntMap = itemStringIntMap)
}

def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
// Convert String ID to Int index for Mllib
model.userStringIntMap.get(query.user).map { userInt =>
// create inverse view of itemStringIntMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
itemStringIntMap = itemStringIntMap)
}

def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
// Convert String ID to Int index for Mllib
model.userStringIntMap.get(query.user).map { userInt =>
// create inverse view of itemStringIntMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
itemStringIntMap = itemStringIntMap)
}

def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
// Convert String ID to Int index for Mllib
model.userStringIntMap.get(query.user).map { userInt =>
// create inverse view of itemStringIntMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
itemStringIntMap = itemStringIntMap)
}

def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
Future.successful(
// Convert String ID to Int index for Mllib
model.userStringIntMap.get(query.user).map { userInt =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
)
}

def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
val productFeatures = model.productFeatures

// convert items to Int index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams)
topCooccurrences
}

def predict(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {

// convert items to Int index
val queryList: Set[Int] = query.items
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
)
}

def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {

val similarUserFeatures = model.similarUserFeatures

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
)
}

def predict(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: ALSModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
val productFeatures = model.productFeatures

// convert items to Int index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams)
topCooccurrences
}

def predict(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {
def predictAsync(model: CooccurrenceModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = {

// convert items to Int index
val queryList: Set[Int] = query.items
Expand Down
Loading

0 comments on commit c7aba5a

Please sign in to comment.