diff --git a/make-distribution.sh b/make-distribution.sh index 7d94a797ac..a14cd0ce01 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -90,6 +90,9 @@ mkdir -p ${DISTDIR}/project mkdir -p ${DISTDIR}/sbt +mkdir -p ${DISTDIR}/examples +mkdir -p ${DISTDIR}/templates + cp ${FWDIR}/bin/* ${DISTDIR}/bin || : cp ${FWDIR}/conf/* ${DISTDIR}/conf cp -r ${FWDIR}/python/* ${DISTDIR}/python @@ -103,6 +106,9 @@ rm -f ${DISTDIR}/lib/*sources.jar rm -f ${DISTDIR}/conf/pio-env.sh mv ${DISTDIR}/conf/pio-env.sh.template ${DISTDIR}/conf/pio-env.sh +cp -r ${FWDIR}/examples/* ${DISTDIR}/examples +cp -r ${FWDIR}/templates/* ${DISTDIR}/templates + touch ${DISTDIR}/RELEASE TARNAME="PredictionIO-$VERSION.tar.gz" diff --git a/templates/template-scala-parallel-recommendation/.gitignore b/templates/template-scala-parallel-recommendation/.gitignore new file mode 100644 index 0000000000..9662372868 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/.gitignore @@ -0,0 +1,14 @@ +.DS_Store +manifest.json +target/ +pio.log +/pio.sbt + +# Eclipse +.project +.classpath +.settings/ + +# IntelliJ +*.iml +.idea/ \ No newline at end of file diff --git a/templates/template-scala-parallel-recommendation/LICENSE.txt b/templates/template-scala-parallel-recommendation/LICENSE.txt new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/templates/template-scala-parallel-recommendation/README.md b/templates/template-scala-parallel-recommendation/README.md new file mode 100644 index 0000000000..4c3cd6d280 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/README.md @@ -0,0 +1,79 @@ + +# Recommendation Template + +## Documentation + +Please refer to +https://predictionio.apache.org/templates/recommendation/quickstart/. + +## Versions + +### v0.13.0 + +Update for Apache PredictionIO 0.13.0 + +### v0.12.0-incubating + +- Bump version number to track PredictionIO version +- Sets default build targets according to PredictionIO +- Add checkpoint parameters +- Fix warnings and use of case class + +### v0.11.0-incubating + +- Bump version number to track PredictionIO version +- Rename Scala package name +- Update SBT version +- Fix typo + +### v0.4.0 + +- Compatible with Apache PredictionIO 0.10.0-incubating + +### v0.3.2 + +- Fix incorrect top items in batchPredict() (issue #5) + +### v0.3.1 + +- Add Evaluation module and modify DataSource for it + +### v0.3.0 + +- update for PredictionIO 0.9.2, including: + + - use new PEventStore API + - use appName in DataSource parameter + +### v0.2.0 + +- update build.sbt and template.json for PredictionIO 0.9.2 + +### v0.1.2 + +- update for PredictionIO 0.9.0 + +### v0.1.1 + +- Persist RDD to memory (.cache()) in DataSource for better performance and quick fix for new user/item ID BiMap error issue. + +### v0.1.0 + +- initial version +- known issue: + * If importing new events of new users/itesm during training, the new user/item id can't be found in the BiMap. diff --git a/templates/template-scala-parallel-recommendation/build.sbt b/templates/template-scala-parallel-recommendation/build.sbt new file mode 100644 index 0000000000..339cb1cbd9 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/build.sbt @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "template-scala-parallel-recommendation" + +scalaVersion := "2.11.12" +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0-SNAPSHOT" % "provided", + "org.apache.spark" %% "spark-mllib" % "2.1.3" % "provided") diff --git a/templates/template-scala-parallel-recommendation/data/import_eventserver.py b/templates/template-scala-parallel-recommendation/data/import_eventserver.py new file mode 100644 index 0000000000..63694cff00 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/data/import_eventserver.py @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Import sample data for recommendation engine +""" + +import predictionio +import argparse +import random + +RATE_ACTIONS_DELIMITER = "::" +SEED = 3 + +def import_events(client, file): + f = open(file, 'r') + random.seed(SEED) + count = 0 + print("Importing data...") + for line in f: + data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER) + # For demonstration purpose, randomly mix in some buy events + if (random.randint(0, 1) == 1): + client.create_event( + event="rate", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1], + properties= { "rating" : float(data[2]) } + ) + else: + client.create_event( + event="buy", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1] + ) + count += 1 + f.close() + print("%s events are imported." % count) + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Import sample data for recommendation engine") + parser.add_argument('--access_key', default='invald_access_key') + parser.add_argument('--url', default="http://localhost:7070") + parser.add_argument('--file', default="./data/sample_movielens_data.txt") + + args = parser.parse_args() + print(args) + + client = predictionio.EventClient( + access_key=args.access_key, + url=args.url, + threads=5, + qsize=500) + import_events(client, args.file) diff --git a/templates/template-scala-parallel-recommendation/data/send_query.py b/templates/template-scala-parallel-recommendation/data/send_query.py new file mode 100644 index 0000000000..f6ec9abd3a --- /dev/null +++ b/templates/template-scala-parallel-recommendation/data/send_query.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Send sample query to prediction engine +""" + +import predictionio +engine_client = predictionio.EngineClient(url="http://localhost:8000") +print(engine_client.send_query({"user": "1", "num": 4})) diff --git a/templates/template-scala-parallel-recommendation/engine.json b/templates/template-scala-parallel-recommendation/engine.json new file mode 100644 index 0000000000..acaab5b1d5 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/engine.json @@ -0,0 +1,21 @@ +{ + "id": "default", + "description": "Default settings", + "engineFactory": "package org.apache.predictionio.recommendation.RecommendationEngine", + "datasource": { + "params" : { + "appName": "INVALID_APP_NAME" + } + }, + "algorithms": [ + { + "name": "als", + "params": { + "rank": 10, + "numIterations": 20, + "lambda": 0.01, + "seed": 3 + } + } + ] +} diff --git a/templates/template-scala-parallel-recommendation/project/assembly.sbt b/templates/template-scala-parallel-recommendation/project/assembly.sbt new file mode 100644 index 0000000000..d95475f16f --- /dev/null +++ b/templates/template-scala-parallel-recommendation/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7") diff --git a/templates/template-scala-parallel-recommendation/project/build.properties b/templates/template-scala-parallel-recommendation/project/build.properties new file mode 100644 index 0000000000..c091b86ca4 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.16 diff --git a/templates/template-scala-parallel-recommendation/src/main/scala/ALSAlgorithm.scala b/templates/template-scala-parallel-recommendation/src/main/scala/ALSAlgorithm.scala new file mode 100644 index 0000000000..4bdef36d88 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/src/main/scala/ALSAlgorithm.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.recommendation + +import org.apache.predictionio.controller.PAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} +import org.apache.spark.mllib.recommendation.ALSModel + +import grizzled.slf4j.Logger + +case class ALSAlgorithmParams( + rank: Int, + numIterations: Int, + lambda: Double, + seed: Option[Long]) extends Params + +class ALSAlgorithm(val ap: ALSAlgorithmParams) + extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { + + @transient lazy val logger = Logger[this.type] + + if (ap.numIterations > 30) { + logger.warn( + s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " + + s"There is a chance of running to StackOverflowException." + + s"To remedy it, set lower numIterations or checkpoint parameters.") + } + + def train(sc: SparkContext, data: PreparedData): ALSModel = { + // MLLib ALS cannot handle empty training data. + require(!data.ratings.take(1).isEmpty, + s"RDD[Rating] in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preparator generates PreparedData correctly.") + // Convert user and item String IDs to Int index for MLlib + + val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user)) + val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item)) + val mllibRatings = data.ratings.map( r => + // MLlibRating requires integer index for user and item + MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating) + ) + + // seed for MLlib ALS + val seed = ap.seed.getOrElse(System.nanoTime) + + // Set checkpoint directory + // sc.setCheckpointDir("checkpoint") + + // If you only have one type of implicit event (Eg. "view" event only), + // set implicitPrefs to true + val implicitPrefs = false + val als = new ALS() + als.setUserBlocks(-1) + als.setProductBlocks(-1) + als.setRank(ap.rank) + als.setIterations(ap.numIterations) + als.setLambda(ap.lambda) + als.setImplicitPrefs(implicitPrefs) + als.setAlpha(1.0) + als.setSeed(seed) + als.setCheckpointInterval(10) + val m = als.run(mllibRatings) + + new ALSModel( + rank = m.rank, + userFeatures = m.userFeatures, + productFeatures = m.productFeatures, + userStringIntMap = userStringIntMap, + itemStringIntMap = itemStringIntMap) + } + + def predict(model: ALSModel, query: Query): PredictedResult = { + // Convert String ID to Int index for Mllib + model.userStringIntMap.get(query.user).map { userInt => + // create inverse view of itemStringIntMap + val itemIntStringMap = model.itemStringIntMap.inverse + // recommendProducts() returns Array[MLlibRating], which uses item Int + // index. Convert it to String ID for returning PredictedResult + val itemScores = model.recommendProducts(userInt, query.num) + .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) + PredictedResult(itemScores) + }.getOrElse{ + logger.info(s"No prediction for unknown user ${query.user}.") + PredictedResult(Array.empty) + } + } + + // This function is used by the evaluation module, where a batch of queries is sent to this engine + // for evaluation purpose. + override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = { + val userIxQueries: RDD[(Int, (Long, Query))] = queries + .map { case (ix, query) => { + // If user not found, then the index is -1 + val userIx = model.userStringIntMap.get(query.user).getOrElse(-1) + (userIx, (ix, query)) + }} + + // Cross product of all valid users from the queries and products in the model. + val usersProducts: RDD[(Int, Int)] = userIxQueries + .keys + .filter(_ != -1) + .cartesian(model.productFeatures.map(_._1)) + + // Call mllib ALS's predict function. + val ratings: RDD[MLlibRating] = model.predict(usersProducts) + + // The following code construct predicted results from mllib's ratings. + // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue + val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user) + + userIxQueries.leftOuterJoin(userRatings) + .map { + // When there are ratings + case (userIx, ((ix, query), Some(ratings))) => { + val topItemScores: Array[ItemScore] = ratings + .toArray + .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering + .take(query.num) + .map { rating => ItemScore( + model.itemStringIntMap.inverse(rating.product), + rating.rating) } + + (ix, PredictedResult(itemScores = topItemScores)) + } + // When user doesn't exist in training data + case (userIx, ((ix, query), None)) => { + require(userIx == -1) + (ix, PredictedResult(itemScores = Array.empty)) + } + } + } +} diff --git a/templates/template-scala-parallel-recommendation/src/main/scala/ALSModel.scala b/templates/template-scala-parallel-recommendation/src/main/scala/ALSModel.scala new file mode 100644 index 0000000000..aafe5e4d28 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/src/main/scala/ALSModel.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation +// This must be the same package as Spark's MatrixFactorizationModel because +// MatrixFactorizationModel's constructor is private and we are using +// its constructor in order to save and load the model + +import org.apache.predictionio.recommendation.ALSAlgorithmParams + +import org.apache.predictionio.controller.PersistentModel +import org.apache.predictionio.controller.PersistentModelLoader +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +class ALSModel( + override val rank: Int, + override val userFeatures: RDD[(Int, Array[Double])], + override val productFeatures: RDD[(Int, Array[Double])], + val userStringIntMap: BiMap[String, Int], + val itemStringIntMap: BiMap[String, Int]) + extends MatrixFactorizationModel(rank, userFeatures, productFeatures) + with PersistentModel[ALSAlgorithmParams] { + + def save(id: String, params: ALSAlgorithmParams, + sc: SparkContext): Boolean = { + + sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank") + userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures") + productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") + sc.parallelize(Seq(userStringIntMap)) + .saveAsObjectFile(s"/tmp/${id}/userStringIntMap") + sc.parallelize(Seq(itemStringIntMap)) + .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap") + true + } + + override def toString = { + s"userFeatures: [${userFeatures.count()}]" + + s"(${userFeatures.take(2).toList}...)" + + s" productFeatures: [${productFeatures.count()}]" + + s"(${productFeatures.take(2).toList}...)" + + s" userStringIntMap: [${userStringIntMap.size}]" + + s"(${userStringIntMap.take(2)}...)" + + s" itemStringIntMap: [${itemStringIntMap.size}]" + + s"(${itemStringIntMap.take(2)}...)" + } +} + +object ALSModel + extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] { + def apply(id: String, params: ALSAlgorithmParams, + sc: Option[SparkContext]) = { + new ALSModel( + rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first, + userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"), + productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"), + userStringIntMap = sc.get + .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first, + itemStringIntMap = sc.get + .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first) + } +} diff --git a/templates/template-scala-parallel-recommendation/src/main/scala/DataSource.scala b/templates/template-scala-parallel-recommendation/src/main/scala/DataSource.scala new file mode 100644 index 0000000000..2e9aa5d5d6 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/src/main/scala/DataSource.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.recommendation + +import org.apache.predictionio.controller.PDataSource +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.store.PEventStore + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +import grizzled.slf4j.Logger + +case class DataSourceEvalParams(kFold: Int, queryNum: Int) + +case class DataSourceParams( + appName: String, + evalParams: Option[DataSourceEvalParams]) extends Params + +class DataSource(val dsp: DataSourceParams) + extends PDataSource[TrainingData, + EmptyEvaluationInfo, Query, ActualResult] { + + @transient lazy val logger = Logger[this.type] + + def getRatings(sc: SparkContext): RDD[Rating] = { + + val eventsRDD: RDD[Event] = PEventStore.find( + appName = dsp.appName, + entityType = Some("user"), + eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event + // targetEntityType is optional field of an event. + targetEntityType = Some(Some("item")))(sc) + + val ratingsRDD: RDD[Rating] = eventsRDD.map { event => + val rating = try { + val ratingValue: Double = event.event match { + case "rate" => event.properties.get[Double]("rating") + case "buy" => 4.0 // map buy event to rating value of 4 + case _ => throw new Exception(s"Unexpected event ${event} is read.") + } + // entityId and targetEntityId is String + Rating(event.entityId, + event.targetEntityId.get, + ratingValue) + } catch { + case e: Exception => { + logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.") + throw e + } + } + rating + }.cache() + + ratingsRDD + } + + override + def readTraining(sc: SparkContext): TrainingData = { + new TrainingData(getRatings(sc)) + } + + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { + require(!dsp.evalParams.isEmpty, "Must specify evalParams") + val evalParams = dsp.evalParams.get + + val kFold = evalParams.kFold + val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId + ratings.cache + + (0 until kFold).map { idx => { + val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) + val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) + + val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user) + + (new TrainingData(trainingRatings), + new EmptyEvaluationInfo(), + testingUsers.map { + case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray)) + } + ) + }} + } +} + +case class Rating( + user: String, + item: String, + rating: Double +) + +class TrainingData( + val ratings: RDD[Rating] +) extends Serializable { + override def toString = { + s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" + } +} diff --git a/templates/template-scala-parallel-recommendation/src/main/scala/Engine.scala b/templates/template-scala-parallel-recommendation/src/main/scala/Engine.scala new file mode 100644 index 0000000000..727f2e3ebc --- /dev/null +++ b/templates/template-scala-parallel-recommendation/src/main/scala/Engine.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.recommendation + +import org.apache.predictionio.controller.EngineFactory +import org.apache.predictionio.controller.Engine + +case class Query( + user: String, + num: Int +) + +case class PredictedResult( + itemScores: Array[ItemScore] +) + +case class ActualResult( + ratings: Array[Rating] +) + +case class ItemScore( + item: String, + score: Double +) + +object RecommendationEngine extends EngineFactory { + def apply() = { + new Engine( + classOf[DataSource], + classOf[Preparator], + Map("als" -> classOf[ALSAlgorithm]), + classOf[Serving]) + } +} diff --git a/templates/template-scala-parallel-recommendation/src/main/scala/Evaluation.scala b/templates/template-scala-parallel-recommendation/src/main/scala/Evaluation.scala new file mode 100644 index 0000000000..7972d734f1 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/src/main/scala/Evaluation.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.recommendation + +import org.apache.predictionio.controller.Evaluation +import org.apache.predictionio.controller.OptionAverageMetric +import org.apache.predictionio.controller.AverageMetric +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.EngineParamsGenerator +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.MetricEvaluator + +// Usage: +// $ pio eval org.example.recommendation.RecommendationEvaluation \ +// org.example.recommendation.EngineParamsList + +case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + require(k > 0, "k must be greater than 0") + + override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { + val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet + + // If there is no positive results, Precision is undefined. We don't consider this case in the + // metrics, hence we return None. + if (positives.size == 0) { + None + } else { + val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size + Some(tpCount.toDouble / math.min(k, positives.size)) + } + } +} + +case class PositiveCount(ratingThreshold: Double = 2.0) + extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + override def header = s"PositiveCount (threshold=$ratingThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = { + a.ratings.filter(_.rating >= ratingThreshold).size + } +} + +object RecommendationEvaluation extends Evaluation { + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 10, ratingThreshold = 4.0), + otherMetrics = Seq( + PositiveCount(ratingThreshold = 4.0), + PrecisionAtK(k = 10, ratingThreshold = 2.0), + PositiveCount(ratingThreshold = 2.0), + PrecisionAtK(k = 10, ratingThreshold = 1.0), + PositiveCount(ratingThreshold = 1.0) + ))) +} + + +object ComprehensiveRecommendationEvaluation extends Evaluation { + val ratingThresholds = Seq(0.0, 2.0, 4.0) + val ks = Seq(1, 3, 10) + + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 3, ratingThreshold = 2.0), + otherMetrics = ( + (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++ + (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r)) + ))) +} + + +trait BaseEngineParamsList extends EngineParamsGenerator { + protected val baseEP = EngineParams( + dataSourceParams = DataSourceParams( + appName = "INVALID_APP_NAME", + evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10)))) +} + +object EngineParamsList extends BaseEngineParamsList { + engineParamsList = for( + rank <- Seq(5, 10, 20); + numIterations <- Seq(1, 5, 10)) + yield baseEP.copy( + algorithmParamsList = Seq( + ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3))))) +} diff --git a/templates/template-scala-parallel-recommendation/src/main/scala/Preparator.scala b/templates/template-scala-parallel-recommendation/src/main/scala/Preparator.scala new file mode 100644 index 0000000000..fe1290b3ef --- /dev/null +++ b/templates/template-scala-parallel-recommendation/src/main/scala/Preparator.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.recommendation + +import org.apache.predictionio.controller.PPreparator + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +class Preparator + extends PPreparator[TrainingData, PreparedData] { + + def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { + new PreparedData(ratings = trainingData.ratings) + } +} + +class PreparedData( + val ratings: RDD[Rating] +) extends Serializable diff --git a/templates/template-scala-parallel-recommendation/src/main/scala/Serving.scala b/templates/template-scala-parallel-recommendation/src/main/scala/Serving.scala new file mode 100644 index 0000000000..d1d7f6b2e0 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/src/main/scala/Serving.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.recommendation + +import org.apache.predictionio.controller.LServing + +class Serving + extends LServing[Query, PredictedResult] { + + override + def serve(query: Query, + predictedResults: Seq[PredictedResult]): PredictedResult = { + predictedResults.head + } +} diff --git a/templates/template-scala-parallel-recommendation/template.json b/templates/template-scala-parallel-recommendation/template.json new file mode 100644 index 0000000000..cb5ace6ec5 --- /dev/null +++ b/templates/template-scala-parallel-recommendation/template.json @@ -0,0 +1 @@ +{"pio": {"version": { "min": "0.11.0-incubating" }}} diff --git a/tests/check_templates.sh b/tests/check_templates.sh new file mode 100755 index 0000000000..f7aba817f3 --- /dev/null +++ b/tests/check_templates.sh @@ -0,0 +1,63 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Go to PredictionIO directory +FWDIR="$(cd "`dirname "$0"`"/..; pwd)" +cd ${FWDIR} + +# Extract information from PIO's build configuration +PIO_VERSION=$(grep ^version ${FWDIR}/build.sbt | grep ThisBuild | grep -o '".*"' | sed 's/"//g') +SCALA_VERSION=$(grep ^scalaVersion ${FWDIR}/build.sbt | grep ThisBuild | grep -o ', ".*"' | sed 's/[, "]//g') +SPARK_VERSION=$(grep ^sparkVersion ${FWDIR}/build.sbt | grep ThisBuild | grep -o ', ".*"' | sed 's/[, "]//g') + +echo "=======================================================================" +echo "PIO_VERSION: $PIO_VERSION" +echo "SCALA_VERSION: $SCALA_VERSION" +echo "SPARK_VERSION: $SPARK_VERSION" +echo "=======================================================================" + +"${FWDIR}/sbt/sbt" publishLocal + +function check_template(){ + TEMPLATE=$1 + + cd "${FWDIR}/templates/$TEMPLATE" + TEMPLATE_PIO_VERSION=$(grep apache-predictionio-core build.sbt | grep -o '".*"' | sed 's/"//g' | awk '{ print $5 }') + TEMPLATE_SCALA_VERSION=$(grep ^scalaVersion build.sbt | grep -o '".*"' | sed 's/"//g') + TEMPLATE_SPARK_VERSION=$(grep spark-mllib build.sbt | grep -o '".*"' | sed 's/"//g' | awk '{ print $5 }') + + echo "Checking $TEMPLATE..." + + if test "$PIO_VERSION" != "$TEMPLATE_PIO_VERSION" ; then + echo -e "\033[0;31m[error]\033[0;39m $TEMPLATE: PIO is $PIO_VERSION but template version is $TEMPLATE_PIO_VERSION." + exit 1 + fi + if test "$SCALA_VERSION" != "$TEMPLATE_SCALA_VERSION" ; then + echo -e "\033[0;31m[error]\033[0;39m $TEMPLATE: PIO's Scala version should be $SCALA_VERSION but template version is $TEMPLATE_SCALA_VERSION." + exit 1 + fi + if test "$SPARK_VERSION" != "$TEMPLATE_SPARK_VERSION" ; then + echo -e "\033[0;31m[error]\033[0;39m $TEMPLATE: PIO's Spark version should be $SPARK_VERSION but template version is $TEMPLATE_SPARK_VERSION." + exit 1 + fi + "${FWDIR}/sbt/sbt" clean test +} + +# Check templates +check_template template-scala-parallel-recommendation + diff --git a/tests/unit.sh b/tests/unit.sh index a9c84fcab6..31ee7d9ebb 100755 --- a/tests/unit.sh +++ b/tests/unit.sh @@ -43,4 +43,6 @@ sbt/sbt dataJdbc/compile test storage/test \ -Delasticsearch.version=$PIO_ELASTICSEARCH_VERSION \ -Dhbase.version=$PIO_HBASE_VERSION +./tests/check_templates.sh + popd