Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
making async support backward compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Wewerka committed Nov 9, 2018
1 parent 183c9a7 commit 2ef1e2c
Show file tree
Hide file tree
Showing 24 changed files with 159 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,38 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P]
cartesian.flatMap { case (m, qArray) =>
qArray.map {
case (qx, q) =>
(qx, Await.result(predict(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) )
(qx,
Await.result(predictAsync(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes) )
}
}
}

def predictBase(localBaseModel: Any, q: Q)(implicit ec: ExecutionContext): Future[P] = {
predict(localBaseModel.asInstanceOf[M], q)(ec)
}
override def predictBaseAsync(localBaseModel: Any, q: Q)(implicit ec: ExecutionContext)
: Future[P] =
predictAsync(localBaseModel.asInstanceOf[M], q)(ec)

@deprecated(message =
"this method is just here for backward compatibility, predictBaseAsync() is called now",
since = "0.14.0")
override def predictBase(localBaseModel: Any, q: Q): P =
predict(localBaseModel.asInstanceOf[M], q)

/** Implement this method to produce a Future of a prediction in a non blocking way
* from a query and trained model.
*
* This method is implemented to just delegate to blocking predict() for
* backward compatibility reasons.
* Definitely overwrite it to implement your blocking prediction method, and leave
* the old blocking predict() as it is (throwing an exception), it won't be called from
* now on.
*
* @param model Trained model produced by [[train]].
* @param query An input query.
* @param ec ExecutionContext to use for async operations
* @return A Future of a prediction.
*/
def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] =
Future.successful(blocking(predict(model, query)))

/** Implement this method to produce a prediction from a query and trained
* model.
Expand All @@ -93,7 +117,9 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P]
* @param q An input query.
* @return A prediction.
*/
def predict(m: M, q: Q)(implicit ec: ExecutionContext): Future[P]
@deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0")
def predict(m: M, q: Q): P =
throw new NotImplementedError("predict() is deprecated, override predictAsync() instead")

/** :: DeveloperApi ::
* Engine developers should not use this directly (read on to see how local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,34 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P]
*/
def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = {
qs.mapValues { q =>
Await.result(predict(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes)
Await.result(predictAsync(m, q)(scala.concurrent.ExecutionContext.global), 60 minutes)
}
}

def predictBase(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] =
predict(bm.asInstanceOf[M], q)(ec)
override def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] =
predictAsync(bm.asInstanceOf[M], q)(ec)

@deprecated(message =
"this method is just here for backward compatibility, predictBaseAsync() is called now",
since = "0.14.0")
override def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q)

/** Implement this method to produce a Future of a prediction in a non blocking way
* from a query and trained model.
*
* This method is implemented to just delegate to blocking predict() for
* backward compatibility reasons.
* Definitely overwrite it to implement your blocking prediction method, and leave
* the old blocking predict() as it is (throwing an exception), it won't be called from
* now on.
*
* @param model Trained model produced by [[train]].
* @param query An input query.
* @param ec ExecutionContext to use for async operations
* @return A Future of a prediction.
*/
def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] =
Future.successful(blocking(predict(model, query)))

/** Implement this method to produce a prediction from a query and trained
* model.
Expand All @@ -85,7 +107,9 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P]
* @param query An input query.
* @return A prediction.
*/
def predict(model: M, query: Q)(implicit ec: ExecutionContext): Future[P]
@deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0")
def predict(model: M, query: Q): P =
throw new NotImplementedError("predict() is deprecated, override predictAsync() instead")

/** :: DeveloperApi ::
* Engine developers should not use this directly (read on to see how
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.predictionio.workflow.PersistentModelManifest
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, blocking}

/** Base class of a parallel algorithm.
*
Expand Down Expand Up @@ -74,9 +74,31 @@ abstract class PAlgorithm[PD, M, Q, P]
def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] =
throw new NotImplementedError("batchPredict not implemented")

def predictBase(baseModel: Any, query: Q)(implicit ec: ExecutionContext): Future[P] = {
predict(baseModel.asInstanceOf[M], query)(ec)
}
override def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] =
predictAsync(bm.asInstanceOf[M], q)(ec)

@deprecated(message =
"this method is just here for backward compatibility, predictBaseAsync() is called now",
since = "0.14.0")
override def predictBase(baseModel: Any, query: Q): P =
predict(baseModel.asInstanceOf[M], query)

/** Implement this method to produce a Future of a prediction in a non blocking way
* from a query and trained model.
*
* This method is implemented to just delegate to blocking predict() for
* backward compatibility reasons.
* Definitely overwrite it to implement your blocking prediction method, and leave
* the old blocking predict() as it is (throwing an exception), it won't be called from
* now on.
*
* @param model Trained model produced by [[train]].
* @param query An input query.
* @param ec ExecutionContext to use for async operations
* @return A Future of a prediction.
*/
def predictAsync(model: M, query: Q)(implicit ec: ExecutionContext): Future[P] =
Future.successful(blocking(predict(model, query)))

/** Implement this method to produce a prediction from a query and trained
* model.
Expand All @@ -85,7 +107,9 @@ abstract class PAlgorithm[PD, M, Q, P]
* @param query An input query.
* @return A prediction.
*/
def predict(model: M, query: Q)(implicit ec: ExecutionContext): Future[P]
@deprecated(message = "override non blocking predictAsync() instead", since = "0.14.0")
def predict(model: M, query: Q): P =
throw new NotImplementedError("predict() is deprecated, override predictAsync() instead")

/** :: DeveloperApi ::
* Engine developers should not use this directly (read on to see how parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import net.jodah.typetools.TypeResolver
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, blocking}

/** :: DeveloperApi ::
* Base trait with default custom query serializer, exposed to engine developer
Expand Down Expand Up @@ -83,6 +83,19 @@ abstract class BaseAlgorithm[PD, M, Q, P]
def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
: RDD[(Long, P)]

/** :: DeveloperApi ::
* Engine developers should not use this directly. Called by serving to
* perform a single prediction.
*
* @param bm Model
* @param q Query
* @param ec ExecutionContext to use for async operations
* @return Future of a Predicted result
*/
@DeveloperApi
def predictBaseAsync(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P] =
Future.successful(blocking {predictBase(bm, q)})

/** :: DeveloperApi ::
* Engine developers should not use this directly. Called by serving to
* perform a single prediction.
Expand All @@ -92,7 +105,11 @@ abstract class BaseAlgorithm[PD, M, Q, P]
* @return Predicted result
*/
@DeveloperApi
def predictBase(bm: Any, q: Q)(implicit ec: ExecutionContext): Future[P]
@deprecated(message = "override non blocking predictBaseAsync() instead", since = "0.14.0")
def predictBase(bm: Any, q: Q): P =
throw new NotImplementedError(
"predictBase() is deprecated, override predictBaseAsync() instead"
)

/** :: DeveloperApi ::
* Engine developers should not use this directly. Prepare a model for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ object BatchPredict extends Logging {
// finally Serving.serve.
val supplementedQuery = serving.supplementBase(query)
val predictionsFuture = Future.sequence(algorithms.zip(models).map { case (a, m) =>
a.predictBase(m, supplementedQuery)
a.predictBaseAsync(m, supplementedQuery)
})
// Notice that it is by design to call Serving.serve with the
// *original* query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ class PredictionServer[Q, P](
val supplementedQuery = serving.supplementBase(query)

val predictionsFuture = Future.sequence(algorithms.zip(models).map { case (a, m) =>
a.predictBase(m, supplementedQuery)
a.predictBaseAsync(m, supplementedQuery)
})
// Notice that it is by design to call Serving.serve with the
// *original* query.
Expand Down
Loading

0 comments on commit 2ef1e2c

Please sign in to comment.