Skip to content

Commit

Permalink
Cross-compile to Scala 2.12
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Mauch authored and mhamilton723 committed Nov 5, 2019
1 parent 8bb7d86 commit 64c0f04
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 31 deletions.
57 changes: 33 additions & 24 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import scala.sys.process.Process
val condaEnvName = "mmlspark"
name := "mmlspark"
organization := "com.microsoft.ml.spark"
scalaVersion := "2.11.12"
crossScalaVersions := Seq("2.12.10", "2.11.12")
scalaVersion := crossScalaVersions.value.head

val sparkVersion = "2.4.3"
val sparkVersion = "2.4.4"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "compile",
Expand All @@ -28,6 +29,7 @@ libraryDependencies ++= Seq(
"com.github.vowpalwabbit" % "vw-jni" % "8.7.0.3"
)

fork := true
//noinspection ScalaStyle
lazy val IntegrationTest2 = config("it").extend(Test)

Expand Down Expand Up @@ -76,22 +78,29 @@ def activateCondaEnv: Seq[String] = {
}

val packagePythonTask = TaskKey[Unit]("packagePython", "Package python sdk")
val genDir = join("target", "scala-2.11", "generated")
val unidocDir = join("target", "scala-2.11", "unidoc")
val pythonSrcDir = join(genDir.toString, "src", "python")
val unifiedDocDir = join(genDir.toString, "doc")
val pythonDocDir = join(unifiedDocDir.toString, "pyspark")
val pythonPackageDir = join(genDir.toString, "package", "python")
val pythonTestDir = join(genDir.toString, "test", "python")
val genDir = SettingKey[File]("genDir")
genDir := crossTarget.value / "generated"
val unidocDir = SettingKey[File]("unidocDir")
unidocDir := crossTarget.value / "unidoc"
val pythonSrcDir = SettingKey[File]("pythonSrcDir")
pythonSrcDir := genDir.value / "src" / "python"
val unifiedDocDir = SettingKey[File]("unifiedDocDir")
unifiedDocDir := genDir.value / "doc"
val pythonDocDir = SettingKey[File]("pythonDocDir")
pythonDocDir := unifiedDocDir.value / "pyspark"
val pythonPackageDir = SettingKey[File]("pythonPackageDir")
pythonPackageDir := genDir.value / "package" / "python"
val pythonTestDir = SettingKey[File]("pythonTestDir")
pythonTestDir := genDir.value / "test" / "python"

val generatePythonDoc = TaskKey[Unit]("generatePythonDoc", "Generate sphinx docs for python")
generatePythonDoc := {
val s = streams.value
installPipPackageTask.value
Process(activateCondaEnv ++ Seq("sphinx-apidoc", "-f", "-o", "doc", "."),
join(pythonSrcDir.toString, "mmlspark")) ! s.log
(pythonSrcDir.value / "mmlspark")) ! s.log
Process(activateCondaEnv ++ Seq("sphinx-build", "-b", "html", "doc", "../../../doc/pyspark"),
join(pythonSrcDir.toString, "mmlspark")) ! s.log
(pythonSrcDir.value / "mmlspark")) ! s.log

}

Expand Down Expand Up @@ -142,18 +151,18 @@ publishDocs := {
|<a href="scala/index.html">scala/</u>
|</pre></body></html>
""".stripMargin
val scalaDir = join(unifiedDocDir.toString, "scala")
val scalaDir = unifiedDocDir.value / "scala"
if (scalaDir.exists()) FileUtils.forceDelete(scalaDir)
FileUtils.copyDirectory(unidocDir, scalaDir)
FileUtils.writeStringToFile(join(unifiedDocDir.toString, "index.html"), html, "utf-8")
uploadToBlob(unifiedDocDir.toString, version.value, "docs", s.log)
FileUtils.copyDirectory(unidocDir.value, scalaDir)
FileUtils.writeStringToFile(unifiedDocDir.value / "index.html", html, "utf-8")
uploadToBlob(unifiedDocDir.value.toString, version.value, "docs", s.log)
}

val publishR = TaskKey[Unit]("publishR", "publish R package to blob")
publishR := {
val s = streams.value
(run in IntegrationTest2).toTask("").value
val rPackage = join("target", "scala-2.11", "generated", "package", "R")
val rPackage = join(crossTarget.value.toString, "generated", "package", "R")
.listFiles().head
singleUploadToBlob(rPackage.toString,rPackage.getName, "rrr", s.log)
}
Expand All @@ -170,14 +179,14 @@ packagePythonTask := {
val s = streams.value
(run in IntegrationTest2).toTask("").value
createCondaEnvTask.value
val destPyDir = join("target", "scala-2.11", "classes", "mmlspark")
val destPyDir = join(crossTarget.value.toString, "classes", "mmlspark")
if (destPyDir.exists()) FileUtils.forceDelete(destPyDir)
FileUtils.copyDirectory(join(pythonSrcDir.getAbsolutePath, "mmlspark"), destPyDir)
FileUtils.copyDirectory((pythonSrcDir.value / "mmlspark"), destPyDir)

Process(
activateCondaEnv ++
Seq(s"python", "setup.py", "bdist_wheel", "--universal", "-d", s"${pythonPackageDir.absolutePath}"),
pythonSrcDir,
Seq(s"python", "setup.py", "bdist_wheel", "--universal", "-d", s"${pythonPackageDir.value.absolutePath}"),
pythonSrcDir.value,
"MML_PY_VERSION" -> pythonizeVersion(version.value)) ! s.log
}

Expand All @@ -190,7 +199,7 @@ installPipPackageTask := {
Process(
activateCondaEnv ++ Seq("pip", "install",
s"mmlspark-${pythonizeVersion(version.value)}-py2.py3-none-any.whl"),
pythonPackageDir) ! s.log
pythonPackageDir.value) ! s.log
}

val testPythonTask = TaskKey[Unit]("testPython", "test python sdk")
Expand All @@ -203,7 +212,7 @@ testPythonTask := {
"--cov=mmlspark",
"--junitxml=target/python-test-results.xml",
"--cov-report=xml",
"target/scala-2.11/generated/test/python/mmlspark"
(crossTarget.value / "generated/test/python/mmlspark").toString
),
new File("."),
"MML_VERSION" -> version.value
Expand All @@ -215,7 +224,7 @@ val datasetName = "datasets-2019-05-02.tgz"
val datasetUrl = new URL(s"https://mmlspark.blob.core.windows.net/installers/$datasetName")
val datasetDir = settingKey[File]("The directory that holds the dataset")
datasetDir := {
join(target.value.toString, "scala-2.11", "datasets", datasetName.split(".".toCharArray.head).head)
join(crossTarget.value.toString, "datasets", datasetName.split(".".toCharArray.head).head)
}

getDatasetsTask := {
Expand All @@ -237,7 +246,7 @@ genBuildInfo := {
|---------------
|
|### Maven Coordinates
| `${organization.value}:${name.value}_2.11:${version.value}`
| `${organization.value}:${name.value}_${scalaBinaryVersion.value}:${version.value}`
|
|### Maven Resolver
| `https://mmlspark.azureedge.net/maven`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ object AzureSearchWriter extends IndexParser with SLogging {
}

def write(df: DataFrame, options: Map[String, String] = Map()): Unit = {
prepareDF(df, options).foreachPartition(it => it.foreach(_ => ()))
prepareDF(df, options).foreachPartition((it: Iterator[Row]) => it.foreach(_ => ()))
}

def stream(df: DataFrame, options: java.util.HashMap[String, String]): DataStreamWriter[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class BinaryFileFormat extends TextBasedFileFormat with DataSourceRegister {
assert(subsample >= 0.0 & subsample <= 1.0)
(file: PartitionedFile) => {
val fileReader = new HadoopFileReader(file, broadcastedHadoopConf.value.value, subsample, inspectZip, seed)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => fileReader.close()))
Option(TaskContext.get()).foreach((f: TaskContext) =>
f.addTaskCompletionListener({_ => fileReader.close(); true}: (TaskContext => Boolean)))
fileReader.map { record =>
val recordPath = record._1
val bytes = record._2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object PowerBIWriter {
}

def write(df: DataFrame, url: String, options: Map[String, String] = Map()): Unit = {
prepareDF(df, url, options).foreachPartition(it => it.foreach(_ => ()))
prepareDF(df, url, options).foreachPartition((it: Iterator[Row]) => it.foreach(_ => ()))
}

def stream(df: DataFrame, url: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,14 @@ class DistributedHTTPSource(name: String,
if (newOffset.offset < lastOffsetCommitted.offset) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}
serverInfoDF.foreachPartition(_ =>
serverInfoDF.foreachPartition((_: Iterator[Row]) =>
server.get.trimBatchesBefore(newOffset.offset))
lastOffsetCommitted = newOffset
}

/** Stop this source. */
override def stop(): Unit = synchronized {
serverInfoDF.foreachPartition(_ =>
serverInfoDF.foreachPartition((_: Iterator[Row]) =>
server.get.stop())
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ abstract class TestBase extends FunSuite with BeforeAndAfterEachTestData with Be
case t: Transformer => t
case _ => sys.error(s"Unknown PipelineStage value: $stage")
}
transformer.transform(data).foreachPartition { it => it.toList; () }
transformer.transform(data).foreachPartition { (it: Iterator[Row]) => it.toList; () }
}
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class StratifiedRepartitionSuite extends TestBase with TransformerFuzzing[Strati
}
})(inputEnc).cache()
// Some debug to understand what data is on which partition
trainData.foreachPartition { rows =>
trainData.foreachPartition { (rows: Iterator[Row]) =>
rows.foreach { row =>
val ctx = TaskContext.get
val partId = ctx.partitionId
Expand Down

0 comments on commit 64c0f04

Please sign in to comment.