Skip to content

Commit

Permalink
Cross-compile to Scala 2.12
Browse files Browse the repository at this point in the history
  • Loading branch information
nightscape committed Apr 21, 2020
1 parent 537b611 commit 612d811
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 30 deletions.
55 changes: 32 additions & 23 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import scala.sys.process.Process
val condaEnvName = "mmlspark"
name := "mmlspark"
organization := "com.microsoft.ml.spark"
scalaVersion := "2.11.12"
crossScalaVersions := Seq("2.12.11", "2.11.12")
scalaVersion := crossScalaVersions.value.head

val sparkVersion = "2.4.3"
val sparkVersion = "2.4.5"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "compile",
Expand All @@ -31,6 +32,7 @@ libraryDependencies ++= Seq(
)
resolvers += "Speech" at "https://mmlspark.blob.core.windows.net/maven/"

fork := true
//noinspection ScalaStyle
lazy val IntegrationTest2 = config("it").extend(Test)

Expand Down Expand Up @@ -79,22 +81,29 @@ def activateCondaEnv: Seq[String] = {
}

val packagePythonTask = TaskKey[Unit]("packagePython", "Package python sdk")
val genDir = join("target", "scala-2.11", "generated")
val unidocDir = join("target", "scala-2.11", "unidoc")
val pythonSrcDir = join(genDir.toString, "src", "python")
val unifiedDocDir = join(genDir.toString, "doc")
val pythonDocDir = join(unifiedDocDir.toString, "pyspark")
val pythonPackageDir = join(genDir.toString, "package", "python")
val pythonTestDir = join(genDir.toString, "test", "python")
val genDir = SettingKey[File]("genDir")
genDir := crossTarget.value / "generated"
val unidocDir = SettingKey[File]("unidocDir")
unidocDir := crossTarget.value / "unidoc"
val pythonSrcDir = SettingKey[File]("pythonSrcDir")
pythonSrcDir := genDir.value / "src" / "python"
val unifiedDocDir = SettingKey[File]("unifiedDocDir")
unifiedDocDir := genDir.value / "doc"
val pythonDocDir = SettingKey[File]("pythonDocDir")
pythonDocDir := unifiedDocDir.value / "pyspark"
val pythonPackageDir = SettingKey[File]("pythonPackageDir")
pythonPackageDir := genDir.value / "package" / "python"
val pythonTestDir = SettingKey[File]("pythonTestDir")
pythonTestDir := genDir.value / "test" / "python"

val generatePythonDoc = TaskKey[Unit]("generatePythonDoc", "Generate sphinx docs for python")
generatePythonDoc := {
val s = streams.value
installPipPackageTask.value
Process(activateCondaEnv ++ Seq("sphinx-apidoc", "-f", "-o", "doc", "."),
join(pythonSrcDir.toString, "mmlspark")) ! s.log
(pythonSrcDir.value / "mmlspark")) ! s.log
Process(activateCondaEnv ++ Seq("sphinx-build", "-b", "html", "doc", "../../../doc/pyspark"),
join(pythonSrcDir.toString, "mmlspark")) ! s.log
(pythonSrcDir.value / "mmlspark")) ! s.log

}

Expand Down Expand Up @@ -145,18 +154,18 @@ publishDocs := {
|<a href="scala/index.html">scala/</u>
|</pre></body></html>
""".stripMargin
val scalaDir = join(unifiedDocDir.toString, "scala")
val scalaDir = unifiedDocDir.value / "scala"
if (scalaDir.exists()) FileUtils.forceDelete(scalaDir)
FileUtils.copyDirectory(unidocDir, scalaDir)
FileUtils.writeStringToFile(join(unifiedDocDir.toString, "index.html"), html, "utf-8")
uploadToBlob(unifiedDocDir.toString, version.value, "docs", s.log)
FileUtils.copyDirectory(unidocDir.value, scalaDir)
FileUtils.writeStringToFile(unifiedDocDir.value / "index.html", html, "utf-8")
uploadToBlob(unifiedDocDir.value.toString, version.value, "docs", s.log)
}

val publishR = TaskKey[Unit]("publishR", "publish R package to blob")
publishR := {
val s = streams.value
(run in IntegrationTest2).toTask("").value
val rPackage = join("target", "scala-2.11", "generated", "package", "R")
val rPackage = join(crossTarget.value.toString, "generated", "package", "R")
.listFiles().head
singleUploadToBlob(rPackage.toString,rPackage.getName, "rrr", s.log)
}
Expand All @@ -173,14 +182,14 @@ packagePythonTask := {
val s = streams.value
(run in IntegrationTest2).toTask("").value
createCondaEnvTask.value
val destPyDir = join("target", "scala-2.11", "classes", "mmlspark")
val destPyDir = join(crossTarget.value.toString, "classes", "mmlspark")
if (destPyDir.exists()) FileUtils.forceDelete(destPyDir)
FileUtils.copyDirectory(join(pythonSrcDir.getAbsolutePath, "mmlspark"), destPyDir)
FileUtils.copyDirectory((pythonSrcDir.value / "mmlspark"), destPyDir)

Process(
activateCondaEnv ++
Seq(s"python", "setup.py", "bdist_wheel", "--universal", "-d", s"${pythonPackageDir.absolutePath}"),
pythonSrcDir,
Seq(s"python", "setup.py", "bdist_wheel", "--universal", "-d", s"${pythonPackageDir.value.absolutePath}"),
pythonSrcDir.value,
"MML_PY_VERSION" -> pythonizeVersion(version.value)) ! s.log
}

Expand All @@ -193,7 +202,7 @@ installPipPackageTask := {
Process(
activateCondaEnv ++ Seq("pip", "install",
s"mmlspark-${pythonizeVersion(version.value)}-py2.py3-none-any.whl"),
pythonPackageDir) ! s.log
pythonPackageDir.value) ! s.log
}

val testPythonTask = TaskKey[Unit]("testPython", "test python sdk")
Expand All @@ -220,7 +229,7 @@ val datasetName = "datasets-2020-01-20.tgz"
val datasetUrl = new URL(s"https://mmlspark.blob.core.windows.net/installers/$datasetName")
val datasetDir = settingKey[File]("The directory that holds the dataset")
datasetDir := {
join(target.value.toString, "scala-2.11", "datasets", datasetName.split(".".toCharArray.head).head)
join(crossTarget.value.toString, "datasets", datasetName.split(".".toCharArray.head).head)
}

getDatasetsTask := {
Expand All @@ -242,7 +251,7 @@ genBuildInfo := {
|---------------
|
|### Maven Coordinates
| `${organization.value}:${name.value}_2.11:${version.value}`
| `${organization.value}:${name.value}_${scalaBinaryVersion.value}:${version.value}`
|
|### Maven Resolver
| `https://mmlspark.azureedge.net/maven`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ object AzureSearchWriter extends IndexParser with SLogging {
}

def write(df: DataFrame, options: Map[String, String] = Map()): Unit = {
prepareDF(df, options).foreachPartition(it => it.foreach(_ => ()))
prepareDF(df, options).foreachPartition((it: Iterator[Row]) => it.foreach(_ => ()))
}

def stream(df: DataFrame, options: java.util.HashMap[String, String]): DataStreamWriter[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class BinaryFileFormat extends TextBasedFileFormat with DataSourceRegister {
assert(subsample >= 0.0 & subsample <= 1.0)
(file: PartitionedFile) => {
val fileReader = new HadoopFileReader(file, broadcastedHadoopConf.value.value, subsample, inspectZip, seed)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => fileReader.close()))
Option(TaskContext.get()).foreach((f: TaskContext) =>
f.addTaskCompletionListener({_ => fileReader.close(); true}: (TaskContext => Boolean)))
fileReader.map { record =>
val recordPath = record._1
val bytes = record._2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object PowerBIWriter {
}

def write(df: DataFrame, url: String, options: Map[String, String] = Map()): Unit = {
prepareDF(df, url, options).foreachPartition(it => it.foreach(_ => ()))
prepareDF(df, url, options).foreachPartition((it: Iterator[Row]) => it.foreach(_ => ()))
}

def stream(df: DataFrame, url: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,14 @@ class DistributedHTTPSource(name: String,
if (newOffset.offset < lastOffsetCommitted.offset) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}
serverInfoDF.foreachPartition(_ =>
serverInfoDF.foreachPartition((_: Iterator[Row]) =>
server.get.trimBatchesBefore(newOffset.offset))
lastOffsetCommitted = newOffset
}

/** Stop this source. */
override def stop(): Unit = synchronized {
serverInfoDF.foreachPartition(_ =>
serverInfoDF.foreachPartition((_: Iterator[Row]) =>
server.get.stop())
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ abstract class TestBase extends FunSuite with BeforeAndAfterEachTestData with Be
case t: Transformer => t
case _ => sys.error(s"Unknown PipelineStage value: $stage")
}
transformer.transform(data).foreachPartition { it => it.toList; () }
transformer.transform(data).foreachPartition { (it: Iterator[Row]) => it.toList; () }
}
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class StratifiedRepartitionSuite extends TestBase with TransformerFuzzing[Strati
}
})(inputEnc).cache()
// Some debug to understand what data is on which partition
trainData.foreachPartition { rows =>
trainData.foreachPartition { (rows: Iterator[Row]) =>
rows.foreach { row =>
val ctx = TaskContext.get
val partId = ctx.partitionId
Expand Down

0 comments on commit 612d811

Please sign in to comment.