From f4cc66bba7e0ab616a8180d67eaea19e26e813d1 Mon Sep 17 00:00:00 2001 From: Tiberiu Popa Date: Mon, 23 Sep 2024 13:40:44 -0700 Subject: [PATCH] Build aerosolve using Scala 2.12 --- airlearner/airlearner-strategy/build.gradle | 8 +- airlearner/airlearner-utils/build.gradle | 6 +- .../airbnb/common/ml/util/PipelineUtil.scala | 48 +----------- .../airbnb/common/ml/util/ScalaLogging.scala | 2 +- airlearner/airlearner-xgboost/build.gradle | 8 +- airlearner/configs/scala210.gradle | 14 ---- airlearner/configs/scala211.gradle | 14 ---- airlearner/configs/scala212.gradle | 75 +++++++++++++++++++ airlearner/libraries.gradle | 5 +- build.gradle | 4 +- core/build.gradle | 1 + .../aerosolve/core/models/LinearModel.java | 4 +- .../com/airbnb/aerosolve/core/util/Debug.java | 2 +- .../com/airbnb/aerosolve/core/util/Util.java | 2 +- thrift-cli.gradle | 2 +- training/build.gradle | 14 ++-- .../training/AdditiveModelTrainer.scala | 23 +++--- .../training/LinearRankerUtils.scala | 10 ++- 18 files changed, 124 insertions(+), 118 deletions(-) create mode 100644 airlearner/configs/scala212.gradle diff --git a/airlearner/airlearner-strategy/build.gradle b/airlearner/airlearner-strategy/build.gradle index b0f5c90b..89edd49f 100644 --- a/airlearner/airlearner-strategy/build.gradle +++ b/airlearner/airlearner-strategy/build.gradle @@ -1,7 +1,7 @@ apply plugin: 'com.jfrog.bintray' // Includes 'scala' plugin -apply from: "$rootDir/airlearner/configs/scala211.gradle" +apply from: "$rootDir/airlearner/configs/scala212.gradle" repositories { mavenLocal() @@ -10,9 +10,9 @@ repositories { dependencies { compile libraries.typesafe_config - provided "org.apache.spark:spark-core_2.11:${sparkVersion}" - provided "org.apache.spark:spark-hive_2.11:${sparkVersion}" - provided "org.apache.spark:spark-mllib_2.11:${sparkVersion}" + provided "org.apache.spark:spark-core_2.12:${sparkVersion}" + provided "org.apache.spark:spark-hive_2.12:${sparkVersion}" + provided "org.apache.spark:spark-mllib_2.12:${sparkVersion}" compile project(':airlearner:airlearner-utils') diff --git a/airlearner/airlearner-utils/build.gradle b/airlearner/airlearner-utils/build.gradle index 5cf34154..ee60811b 100644 --- a/airlearner/airlearner-utils/build.gradle +++ b/airlearner/airlearner-utils/build.gradle @@ -6,7 +6,7 @@ apply plugin: 'java' apply plugin: 'com.jfrog.bintray' // Includes 'scala' plugin -apply from: "$rootDir/airlearner/configs/scala211.gradle" +apply from: "$rootDir/airlearner/configs/scala212.gradle" repositories { mavenLocal() @@ -20,8 +20,8 @@ dependencies { compile libraries.slf4j_simple compile libraries.typesafe_config - provided "org.apache.spark:spark-core_2.11:${sparkVersion}" - provided "org.apache.spark:spark-hive_2.11:${sparkVersion}" + provided "org.apache.spark:spark-core_2.12:${sparkVersion}" + provided "org.apache.spark:spark-hive_2.12:${sparkVersion}" } bintray { diff --git a/airlearner/airlearner-utils/src/main/scala/com/airbnb/common/ml/util/PipelineUtil.scala b/airlearner/airlearner-utils/src/main/scala/com/airbnb/common/ml/util/PipelineUtil.scala index b8a831f2..a2e22d24 100644 --- a/airlearner/airlearner-utils/src/main/scala/com/airbnb/common/ml/util/PipelineUtil.scala +++ b/airlearner/airlearner-utils/src/main/scala/com/airbnb/common/ml/util/PipelineUtil.scala @@ -15,7 +15,7 @@ import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.{Accumulator, SparkContext} +import org.apache.spark.SparkContext import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} import org.joda.time.{DateTime, Days} @@ -299,52 +299,6 @@ object PipelineUtil extends ScalaLogging { true } - // Create a pair of spark accumulators that track (success, failure) counts. - def addStatusAccumulators(sc: SparkContext, - accName: String, - accumulators: mutable.Map[String, Accumulator[Int]]): Unit = { - accumulators.put(accName + ".success", sc.accumulator(0, accName + ".success")) - accumulators.put(accName + ".failure", sc.accumulator(0, accName + ".failure")) - } - - - def countAllFailureCounters(accumulators: mutable.Map[String, Accumulator[Int]]): Long = { - var failureCount = 0 - for ( (name, accumulator) <- accumulators) { - logger.info("- Accumulator {} : {}", name, accumulator.value.toString ) - if (name.endsWith(".failure")) { - failureCount += accumulator.value - } - } - failureCount - } - - def validateSuccessCounters(accumulators: mutable.Map[String, Accumulator[Int]], - minSuccess: Int): Boolean = { - for ( (name, accumulator) <- accumulators) { - if (name.endsWith(".success") && accumulator.value < minSuccess) { - logger.error("Failed counter: {} = {} < {}", name, accumulator.value.toString, minSuccess.toString) - return false - } - } - true - } - - // TODO(kim): cleanup, write to hdfs directly instead of via SparkContext. - def saveCountersAsTextFile(accumulators: mutable.Map[String, Accumulator[Int]], - sc: SparkContext, - hdfsFilePath: String): Unit = { - var summary = Array("Summarizing counters:") - - for ( (name, accumulator) <- accumulators) { - val logLine = "- %s = %d".format(name, accumulator.value ) - summary :+= logLine - logger.info(logLine) - } - - saveAndCommitAsTextFile(sc.parallelize(summary), hdfsFilePath, 1, true) - } - def getHDFSBufferedOutputStream(output: String): BufferedOutputStream = { new BufferedOutputStream(getHDFSOutputStream(output)) } diff --git a/airlearner/airlearner-utils/src/main/scala/com/airbnb/common/ml/util/ScalaLogging.scala b/airlearner/airlearner-utils/src/main/scala/com/airbnb/common/ml/util/ScalaLogging.scala index 417ef930..cedb9bbd 100644 --- a/airlearner/airlearner-utils/src/main/scala/com/airbnb/common/ml/util/ScalaLogging.scala +++ b/airlearner/airlearner-utils/src/main/scala/com/airbnb/common/ml/util/ScalaLogging.scala @@ -1,7 +1,7 @@ package com.airbnb.common.ml.util // scalastyle:off ban.logger.factory -import com.typesafe.scalalogging.slf4j.Logger +import com.typesafe.scalalogging.Logger import org.slf4j.LoggerFactory diff --git a/airlearner/airlearner-xgboost/build.gradle b/airlearner/airlearner-xgboost/build.gradle index 97956c37..0446c71a 100644 --- a/airlearner/airlearner-xgboost/build.gradle +++ b/airlearner/airlearner-xgboost/build.gradle @@ -1,7 +1,7 @@ apply plugin: 'com.jfrog.bintray' // Includes 'scala' plugin -apply from: "$rootDir/airlearner/configs/scala211.gradle" +apply from: "$rootDir/airlearner/configs/scala212.gradle" repositories { mavenLocal() @@ -10,9 +10,9 @@ repositories { dependencies { compile libraries.typesafe_config - provided "org.apache.spark:spark-core_2.11:${sparkVersion}" - provided "org.apache.spark:spark-hive_2.11:${sparkVersion}" - provided "org.apache.spark:spark-mllib_2.11:${sparkVersion}" + provided "org.apache.spark:spark-core_2.12:${sparkVersion}" + provided "org.apache.spark:spark-hive_2.12:${sparkVersion}" + provided "org.apache.spark:spark-mllib_2.12:${sparkVersion}" compile project(':airlearner:airlearner-utils') compile files('local-lib/xgboost4j-0.7.jar') diff --git a/airlearner/configs/scala210.gradle b/airlearner/configs/scala210.gradle index 01fbba7f..46a3b32f 100644 --- a/airlearner/configs/scala210.gradle +++ b/airlearner/configs/scala210.gradle @@ -9,8 +9,6 @@ ext { // Language support apply plugin: 'java' apply plugin: 'scala' -// Tooling support -apply plugin: 'scalaStyle' /** @@ -61,18 +59,6 @@ tasks.withType(ScalaCompile) { } -/** - * Automated Scala style checking as part of the build check task - */ -check.dependsOn << ['scalaStyle'] -scalaStyle { - configLocation = "$rootDir/airlearner/configs/scalastyle_config.xml" - source = 'src/main/scala' - testSource = 'src/test/scala' - includeTestSourceDirectory = true -} - - /** * Task: `repl` * diff --git a/airlearner/configs/scala211.gradle b/airlearner/configs/scala211.gradle index 7de48b2f..4806db60 100644 --- a/airlearner/configs/scala211.gradle +++ b/airlearner/configs/scala211.gradle @@ -9,8 +9,6 @@ ext { // Language support apply plugin: 'java' apply plugin: 'scala' -// Tooling support -apply plugin: 'scalaStyle' /** @@ -58,18 +56,6 @@ tasks.withType(ScalaCompile) { } -/** - * Automated Scala style checking as part of the build check task - */ -check.dependsOn << ['scalaStyle'] -scalaStyle { - configLocation = "$rootDir/airlearner/configs/scalastyle_config.xml" - source = 'src/main/scala' - testSource = 'src/test/scala' - includeTestSourceDirectory = true -} - - /** * Task: `repl` * diff --git a/airlearner/configs/scala212.gradle b/airlearner/configs/scala212.gradle new file mode 100644 index 00000000..7345e1b7 --- /dev/null +++ b/airlearner/configs/scala212.gradle @@ -0,0 +1,75 @@ +/** + * This Gradle file sets up common build settings for Scala 2.10.x projects + */ + +ext { + scala212Version = '2.12.7' +} + +// Language support +apply plugin: 'java' +apply plugin: 'scala' + + +/** + * Include basic package dependencies that ALL Airbnb Scala projects will need. + * This should include no team or project specific code. That will only bloat other + * Scala projects. + * + * Additionally, don't include domain specific libraries such as Spark. + */ +dependencies { + compile libraries.scala_library_212 + compile libraries.scala_logging_slf4j_212 + compile libraries.org_scala_lang_modules_scala_java8_compat_2_12 +} + +/** + * Set standard Scala compilation options + */ +def getScalaCompileAdditionalParameters = { + def compileOptions = [ + // Emit warning and location for usages of features that should be imported explicitly. + '-feature', + // Output messages about what the compiler is doing. + '-verbose', + // Enable recommended additional warnings. + '-Xlint', + ] + if (project.hasProperty('SkipWarning')) { + compileOptions << '-nowarn' + } + compileOptions +} +tasks.withType(ScalaCompile) { + // Target Java 1.8 level compatibility + sourceCompatibility = '1.8' + targetCompatibility = '1.8' + + scalaCompileOptions.with { + deprecation = true + unchecked = true + optimize = true + debugLevel = 'vars' + additionalParameters = getScalaCompileAdditionalParameters() + } +} + + +/** + * Task: `repl` + * + * Will initiate a command-line Scala console that you can use + * to interact live with your project's code. + * + * Run with `gradlew repl --console plain --no-daemon` + */ +task repl(type: JavaExec) { + dependencies { + compile group: 'org.scala-lang', name: 'scala-compiler', version: scala212Version + } + main = 'scala.tools.nsc.MainGenericRunner' + classpath = sourceSets.main.runtimeClasspath + standardInput System.in + args '-usejavacp' +} diff --git a/airlearner/libraries.gradle b/airlearner/libraries.gradle index 577e27e9..da3a0f56 100644 --- a/airlearner/libraries.gradle +++ b/airlearner/libraries.gradle @@ -1,18 +1,21 @@ allprojects { ext { - sparkVersion = '2.3.0' + sparkVersion = '3.1.1' libraries = [ lombok: 'org.projectlombok:lombok:1.16.8', joda_time: 'joda-time:joda-time:2.8.2', joda_convert: 'org.joda:joda-convert:1.8', scala_library_211: 'org.scala-lang:scala-library:2.11.8', + scala_library_212: 'org.scala-lang:scala-library:2.12.7', scala_logging_slf4j_211: 'com.typesafe.scala-logging:scala-logging-slf4j_2.11:2.1.2', + scala_logging_slf4j_212: 'com.typesafe.scala-logging:scala-logging_2.12:3.9.2', slf4j_api: 'org.slf4j:slf4j-api:1.7.9', slf4j_simple: 'org.slf4j:slf4j-simple:1.7.9', typesafe_config: 'com.typesafe:config:1.3.0', org_typelevel_cats_core_2_11_1_0_1: 'org.typelevel:cats-core_2.11:1.0.1', org_scala_lang_modules_scala_java8_compat_2_11: 'org.scala-lang.modules:scala-java8-compat_2.11:0.8.0', + org_scala_lang_modules_scala_java8_compat_2_12: 'org.scala-lang.modules:scala-java8-compat_2.12:0.8.0', ] } } diff --git a/build.gradle b/build.gradle index 41a5cb23..e921bef7 100755 --- a/build.gradle +++ b/build.gradle @@ -5,6 +5,7 @@ buildscript { repositories { jcenter() maven { url "https://plugins.gradle.org/m2/" } + maven { url "https://repo.grails.org/grails/core/" } } dependencies { @@ -17,9 +18,6 @@ buildscript { classpath group: 'gradle.plugin.com.palantir.gradle.gitversion', name: 'gradle-git-version', version: '0.7.3' - classpath group: 'org.github.ngbinh.scalastyle', - name: 'gradle-scalastyle-plugin_2.10', - version: '0.8.2' } } diff --git a/core/build.gradle b/core/build.gradle index 1aac4f5c..28544d82 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -39,5 +39,6 @@ dependencies { compile 'org.projectlombok:lombok:1.14.8' compile 'org.apache.commons:commons-lang3:3.4' compile 'org.apache.commons:commons-math3:3.6.1' + compile 'org.apache.httpcomponents:httpcore:4.4.16' testCompile 'org.slf4j:slf4j-log4j12:1.7.21' } diff --git a/core/src/main/java/com/airbnb/aerosolve/core/models/LinearModel.java b/core/src/main/java/com/airbnb/aerosolve/core/models/LinearModel.java index fdb91b78..8df89aaa 100644 --- a/core/src/main/java/com/airbnb/aerosolve/core/models/LinearModel.java +++ b/core/src/main/java/com/airbnb/aerosolve/core/models/LinearModel.java @@ -14,12 +14,12 @@ import com.google.common.hash.HashCode; import lombok.Getter; import lombok.Setter; -import org.apache.http.annotation.NotThreadSafe; +import org.apache.http.annotation.Contract; /** * A linear model backed by a hash map. */ -@NotThreadSafe +@Contract (threading = org.apache.http.annotation.ThreadingBehavior.UNSAFE) public class LinearModel extends AbstractModel { @Getter @Setter diff --git a/core/src/main/java/com/airbnb/aerosolve/core/util/Debug.java b/core/src/main/java/com/airbnb/aerosolve/core/util/Debug.java index e9789af9..ba7c85ca 100644 --- a/core/src/main/java/com/airbnb/aerosolve/core/util/Debug.java +++ b/core/src/main/java/com/airbnb/aerosolve/core/util/Debug.java @@ -144,8 +144,8 @@ public static Example loadExampleFromResource(String name) { // Save example to path // If you hit permission error, touch and chmod the file public static void saveExample(Example example, String path) { - TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); try { + TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); byte[] buf = serializer.serialize(example); FileOutputStream fos = new FileOutputStream(path); fos.write(buf); diff --git a/core/src/main/java/com/airbnb/aerosolve/core/util/Util.java b/core/src/main/java/com/airbnb/aerosolve/core/util/Util.java index 75873b24..08290c44 100644 --- a/core/src/main/java/com/airbnb/aerosolve/core/util/Util.java +++ b/core/src/main/java/com/airbnb/aerosolve/core/util/Util.java @@ -49,8 +49,8 @@ public class Util implements Serializable { // manipulate in spark. e.g. if we wanted to see the 50 weights in a model // val top50 = sc.textFile("model.bz2").map(Util.decodeModel).sortBy(x => -x.weight).take(50); public static String encode(TBase obj) { - TSerializer serializer = new TSerializer(); try { + TSerializer serializer = new TSerializer(); byte[] bytes = serializer.serialize(obj); return new String(Base64.encodeBase64(bytes)); } catch (Exception e) { diff --git a/thrift-cli.gradle b/thrift-cli.gradle index 2d6edb9e..45b1e127 100644 --- a/thrift-cli.gradle +++ b/thrift-cli.gradle @@ -1,7 +1,7 @@ apply plugin: "java" dependencies { - compile 'org.apache.thrift:libthrift:0.9.1' + compile 'org.apache.thrift:libthrift:0.20.0' } def genJavaDir = new File("${->buildDir}/gen-java") diff --git a/training/build.gradle b/training/build.gradle index 3f323904..68107504 100644 --- a/training/build.gradle +++ b/training/build.gradle @@ -31,7 +31,7 @@ bintray { } dependencies { - compile 'org.scala-lang:scala-library:2.11.8' + compile 'org.scala-lang:scala-library:2.12.7' compile project(':core') compile ('org.apache.avro:avro:1.7.7') { @@ -41,18 +41,18 @@ dependencies { transitive = false } - compile 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.7.8' + compile 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.11.4' compile 'joda-time:joda-time:2.5' compile 'org.apache.hadoop:hadoop-client:2.2.0' compile 'com.databricks:spark-csv_2.10:1.4.0' - provided 'org.apache.spark:spark-core_2.11:2.3.0' - provided 'org.apache.spark:spark-hive_2.11:2.3.0' + provided 'org.apache.spark:spark-core_2.12:3.1.1' + provided 'org.apache.spark:spark-hive_2.12:3.1.1' - testCompile 'org.apache.spark:spark-core_2.11:2.3.0' - testCompile 'org.apache.spark:spark-core_2.11:2.3.0:tests' + testCompile 'org.apache.spark:spark-core_2.12:3.1.1' + testCompile 'org.apache.spark:spark-core_2.12:3.1.1:tests' testCompile 'org.scalatest:scalatest_2.11:3.0.5' - testCompile 'org.apache.spark:spark-hive_2.11:2.3.0' + testCompile 'org.apache.spark:spark-hive_2.12:3.1.1' testCompile 'org.mockito:mockito-all:1.9.5' testCompile 'org.slf4j:slf4j-log4j12:1.7.21' } diff --git a/training/src/main/scala/com/airbnb/aerosolve/training/AdditiveModelTrainer.scala b/training/src/main/scala/com/airbnb/aerosolve/training/AdditiveModelTrainer.scala index c590380c..760bcae9 100644 --- a/training/src/main/scala/com/airbnb/aerosolve/training/AdditiveModelTrainer.scala +++ b/training/src/main/scala/com/airbnb/aerosolve/training/AdditiveModelTrainer.scala @@ -11,7 +11,8 @@ import com.typesafe.config.Config import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.{Accumulator, SparkContext} +import org.apache.spark.SparkContext +import org.apache.spark.util.{DoubleAccumulator, LongAccumulator} import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConversions._ @@ -138,8 +139,8 @@ object AdditiveModelTrainer { import LossFunctions._ case class SgdParams(params: AdditiveTrainerParams, - exampleCount: Accumulator[Long], - loss: Accumulator[Double]) + exampleCount: LongAccumulator, + loss: DoubleAccumulator) case class LossParams(function: LossFunction, lossMod: Int, @@ -250,7 +251,7 @@ object AdditiveModelTrainer { var validationLosses = List(Double.MaxValue) var bestValidationLoss = Double.MaxValue var bestIteration = 0 - val sgdParams = SgdParams(params, sc.accumulator(0), sc.accumulator(0)) + val sgdParams = SgdParams(params, sc.longAccumulator("exampleCount"), sc.doubleAccumulator("loss")) var i = 0 while (i < iterations && (params.loss.earlyStopping == 0 || (i - bestIteration) < params.loss.earlyStopping) && @@ -273,8 +274,8 @@ object AdditiveModelTrainer { log.info(s"iterations $i Loss = $loss count = ${sgdParams.exampleCount.value} lr = $learningRate") // reset loss accumulator - sgdParams.loss.setValue(0.0) - sgdParams.exampleCount.setValue(0L) + sgdParams.loss.reset() + sgdParams.exampleCount.reset() // validate loss on validation dataset modelBC = sc.broadcast(model) @@ -284,8 +285,8 @@ object AdditiveModelTrainer { log.info(s"iterations $i Loss = $validationLoss count = ${sgdParams.exampleCount.value} (validation)") // reset loss accumulator - sgdParams.loss.setValue(0.0) - sgdParams.exampleCount.setValue(0L) + sgdParams.loss.reset() + sgdParams.exampleCount.reset() // update validation loss and save model if we get a better result if (validationLoss < bestValidationLoss) { @@ -515,12 +516,12 @@ object AdditiveModelTrainer { lossCount += 1 if (lossCount % params.loss.lossMod == 0) { log.info(s"Loss = ${lossSum / params.loss.lossMod}, samples = $lossCount") - sgdParams.loss += lossSum + sgdParams.loss.add(lossSum) lossSum = 0.0 } }) - sgdParams.loss += lossSum - sgdParams.exampleCount += lossCount + sgdParams.loss.add(lossSum) + sgdParams.exampleCount.add(lossCount) if (lossOnly) Iterator.empty else diff --git a/training/src/main/scala/com/airbnb/aerosolve/training/LinearRankerUtils.scala b/training/src/main/scala/com/airbnb/aerosolve/training/LinearRankerUtils.scala index 5127b919..d676f119 100644 --- a/training/src/main/scala/com/airbnb/aerosolve/training/LinearRankerUtils.scala +++ b/training/src/main/scala/com/airbnb/aerosolve/training/LinearRankerUtils.scala @@ -128,7 +128,8 @@ object LinearRankerUtils { modelBC: Broadcast[AdditiveModel], isTraining: Example => Boolean = _ => true, groupSize: Int = 100): RDD[SparseLabeledPoint] = { - val assemblerTimer = examples.sparkContext.accumulator(0L, "pointAssembler") + val assemblerTimer = examples.sparkContext.longAccumulator("pointAssembler") + assemblerTimer.reset() makePointwiseFloat(examples, transformer, groupSize) .mapPartitions { @@ -200,7 +201,7 @@ object LinearRankerUtils { TrainingUtils.getLabel(featureVector, params.rankKey, params.threshold) } - assemblerTimer += (System.nanoTime() - t0) + assemblerTimer.add((System.nanoTime() - t0)) new SparseLabeledPoint( isTraining(example), @@ -221,7 +222,8 @@ object LinearRankerUtils { examples: RDD[Example], transformer: Transformer, groupSize: Int = 100): RDD[Example] = { - val transformerTimer = examples.sparkContext.accumulator(0L, "transformer") + val transformerTimer = examples.sparkContext.longAccumulator("transformer") + transformerTimer.reset() examples.flatMap(example => { transformer.transformContext(example.context) example.example.iterator().map(x => { @@ -242,7 +244,7 @@ object LinearRankerUtils { val features = examples.iterator.flatMap(_.example).toIterable.asJava transformer.transformCombined(features) - transformerTimer += (System.nanoTime() - t0) + transformerTimer.add((System.nanoTime() - t0)) examples }