Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to modern spark (3.4) #166

Merged
merged 16 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ jobs:
- uses: coursier/setup-action@v1
with:
jvm: adopt:1.8
- name: Build, test, and package project
run: bin/sbt clean compile test package makePom
- name: Build, test, and package project on Spark 3.3
colindean marked this conversation as resolved.
Show resolved Hide resolved
run: bin/sbt clean compile test package makePom -DsparkVersion=3.3.1
colindean marked this conversation as resolved.
Show resolved Hide resolved
- name: Build and package project on "legacy" Spark
run: bin/sbt clean compile package makePom
2 changes: 2 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ jobs:
# uses sbt-github-packages, see build.sbt
- name: Publish with SBT
run: bin/sbt publish
- name: Publish with SBT
run: bin/sbt publish -DsparkVersion=3.3.1
colindean marked this conversation as resolved.
Show resolved Hide resolved
64 changes: 54 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
name := "data-validator"
organization := "com.target"

scalaVersion := "2.11.12"
val sparkVersion = settingKey[String]("Spark version")

val sparkVersion = "2.3.4"
sparkVersion := System.getProperty("sparkVersion", "2.3.4")

val circeVersion = "0.11.2"
scalaVersion := {
if (sparkVersion.value > "3.0") {
"2.12.17"
colindean marked this conversation as resolved.
Show resolved Hide resolved
} else {
"2.11.12"
}
}

val sparkValidationVersion = settingKey[String]("Version of package")

sparkValidationVersion := "0.15.0"

version := sparkVersion.value + "_" + sparkValidationVersion.value

val circeVersion = settingKey[String]("Circe version")
val circeYamlVersion = settingKey[String]("Circe YAML version")

circeVersion := {
if (sparkVersion.value > "3.0") {
"0.14.2"
colindean marked this conversation as resolved.
Show resolved Hide resolved
} else {
"0.11.2"
}
}

circeYamlVersion := {
if (sparkVersion.value > "3.0") {
"0.14.2"
colindean marked this conversation as resolved.
Show resolved Hide resolved
} else {
"0.10.1"
}
}

//addDependencyTreePlugin
enablePlugins(GitVersioning)
Expand Down Expand Up @@ -35,18 +66,26 @@ libraryDependencies ++= Seq(
"com.github.scopt" %% "scopt" % "4.1.0",
"com.sun.mail" % "javax.mail" % "1.6.2",
"com.lihaoyi" %% "scalatags" % "0.12.0",
"io.circe" %% "circe-yaml" % "0.10.1",
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"io.circe" %% "circe-yaml" % circeYamlVersion.value,
"io.circe" %% "circe-core" % circeVersion.value,
"io.circe" %% "circe-generic" % circeVersion.value,
"io.circe" %% "circe-parser" % circeVersion.value,
"org.apache.spark" %% "spark-sql" % sparkVersion.value % Provided,
"junit" % "junit" % "4.13.2" % Test,
"org.scalatest" %% "scalatest" % "3.2.15" % Test,
"com.github.sbt" % "junit-interface" % "0.13.3" % Test exclude ("junit", "junit-dep")
)

Test / fork := true
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:+CMSClassUnloadingEnabled")
javaOptions ++= (if (sparkVersion.value > "3.0") {
Seq("-Xms4048M", "-Xmx4048M",
"-Dio.netty.tryReflectionSetAccessible=true",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.io=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED")
colindean marked this conversation as resolved.
Show resolved Hide resolved
} else {
Seq("-Xms4048M", "-Xmx4048M")
})
Test / parallelExecution := false
// required for unit tests, but not set in some environments
Test / envVars ++= Map(
Expand All @@ -57,6 +96,11 @@ Test / envVars ++= Map(

assembly / mainClass := Some("com.target.data_validator.Main")

assembly / assemblyShadeRules := Seq(
ShadeRule.rename("shapeless.**" -> "new_shapeless.@1").inAll,
ShadeRule.rename("cats.kernel.**" -> s"new_cats.kernel.@1").inAll
)

// Enforces scalastyle checks
val compileScalastyle = TaskKey[Unit]("compileScalastyle")
scalastyleFailOnWarning := true
Expand All @@ -75,6 +119,6 @@ compileScalastyle := (Compile / scalastyle).toTask("").value

(Compile / runMain) := Defaults.runMainTask(Compile / fullClasspath, Compile / run / runner).evaluated
TaskKey[Unit]("generateTestData") := {
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion.value
(Compile / runMain).toTask(" com.target.data_validator.GenTestData").value
}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.1")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
addSbtPlugin("com.codecommit" % "sbt-github-packages" % "0.5.3")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.4")
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ case class MinNumRows(minNumRows: Json) extends ColumnBased("", ValidatorBase.L0
}

case class ColumnMaxCheck(column: String, value: Json)
extends ColumnBased(column, Max(UnresolvedAttribute(column)).toAggregateExpression()) {
extends ColumnBased(column, Max(UnresolvedAttribute.quoted(column)).toAggregateExpression()) {

override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
val ret = copy(column = getVarSub(column, "column", dict), value = getVarSubJson(value, "value", dict))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ case class ColumnSumCheck(
minValue: Option[Json] = None,
maxValue: Option[Json] = None,
inclusive: Option[Json] = None
) extends ColumnBased(column, Sum(UnresolvedAttribute(column)).toAggregateExpression()) {
) extends ColumnBased(column, Sum(UnresolvedAttribute.quoted(column)).toAggregateExpression()) {

private val minOrMax: Either[String, Unit] = if (minValue.isEmpty && maxValue.isEmpty) {
Left("'minValue' or 'maxValue' or both must be defined")
Expand Down Expand Up @@ -78,13 +78,14 @@ case class ColumnSumCheck(
}

def getData(pctError: String): ListMap[String, String] = {
((minValue, maxValue) match {
val initial: ListMap[String, String] = ((minValue, maxValue) match {
case (Some(x), Some(y)) =>
ListMap("lower_bound" -> x.asNumber.get.toString, "upper_bound" -> y.asNumber.get.toString)
case (None, Some(y)) => ListMap("upper_bound" -> y.asNumber.get.toString)
case (Some(x), None) => ListMap("lower_bound" -> x.asNumber.get.toString)
case (None, None) => throw new RuntimeException("Must define at least one of minValue or maxValue.")
}) + ("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError)
})
initial ++ List("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError)
}

val actualSum: Double = dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ object ValidatorBase extends LazyLogging {
private val backtick = "`"
val I0: Literal = Literal.create(0, IntegerType)
val D0: Literal = Literal.create(0.0, DoubleType)
val L0: Literal = Literal.create(0, LongType)
val L1: Literal = Literal.create(1, LongType)
val L0: Literal = Literal.create(0L, LongType)
val L1: Literal = Literal.create(1L, LongType)

def isValueColumn(v: String): Boolean = v.startsWith(backtick)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class RangeCheckSpec extends AnyFunSpec with Matchers with TestingSparkSession {
ValidatorQuickCheckError(
("item", "Eggs") :: Nil,
5.99,
"RangeCheck failed! max = 5.99 and (('max <= 6.0) || ('max >= 10.0))"
"RangeCheck failed! max = 5.99 and (('max <= 6.0) OR ('max >= 10.0))"
)
)
}
Expand Down Expand Up @@ -341,7 +341,7 @@ class RangeCheckSpec extends AnyFunSpec with Matchers with TestingSparkSession {
val dict = new VarSubstitution
val df = mkDataFrame(spark, defData)
assert(!sut.configCheck(df))
assert(sut.colTest(df.schema, dict).sql == "((`avg` < `min`) OR (`avg` > `max`))")
assert(sut.colTest(df.schema, dict).sql == "((avg < min) OR (avg > max))")
}

it("bad minValue column") {
Expand Down Expand Up @@ -395,7 +395,7 @@ class RangeCheckSpec extends AnyFunSpec with Matchers with TestingSparkSession {
ValidatorQuickCheckError(
("item", "Bread") :: Nil,
0.99,
"RangeCheck failed! avg = 0.99 and (('avg <= 'min) || ('avg >= 'max))"
"RangeCheck failed! avg = 0.99 and (('avg <= 'min) OR ('avg >= 'max))"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,13 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringLengthCheck failed! item = I and ((length('item) < 5) || (length('item) > 6))"
"StringLengthCheck failed! item = I and ((length('item) < 5) OR (length('item) > 6))"
)) ^
(sut.getEvents contains
ValidatorQuickCheckError(
("item", "") :: Nil,
"",
"StringLengthCheck failed! item = and ((length('item) < 5) || (length('item) > 6))"
"StringLengthCheck failed! item = and ((length('item) < 5) OR (length('item) > 6))"
))
)
}
Expand Down Expand Up @@ -364,7 +364,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringLengthCheck failed! item = I and ((length('item) < 5) || (length('item) > 6))"
"StringLengthCheck failed! item = I and ((length('item) < 5) OR (length('item) > 6))"
)
)

Expand All @@ -373,7 +373,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "") :: Nil,
"",
"StringLengthCheck failed! item = and ((length('item) < 5) || (length('item) > 6))"
"StringLengthCheck failed! item = and ((length('item) < 5) OR (length('item) > 6))"
)
)
}
Expand Down Expand Up @@ -404,7 +404,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringLengthCheck failed! item = I and ((length('item) < 5) || (length('item) > 5))"
"StringLengthCheck failed! item = I and ((length('item) < 5) OR (length('item) > 5))"
)
)

Expand All @@ -413,7 +413,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "") :: Nil,
"",
"StringLengthCheck failed! item = and ((length('item) < 5) || (length('item) > 5))"
"StringLengthCheck failed! item = and ((length('item) < 5) OR (length('item) > 5))"
)
)

Expand All @@ -422,7 +422,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "Item23") :: Nil,
"Item23",
"StringLengthCheck failed! item = Item23 and ((length('item) < 5) || (length('item) > 5))"
"StringLengthCheck failed! item = Item23 and ((length('item) < 5) OR (length('item) > 5))"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class StringRegexCheckSpec extends AnyFunSpec with Matchers with TestingSparkSes
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringRegexCheck failed! item = I and (NOT 'item RLIKE ^It && isnotnull('item))"
"StringRegexCheck failed! item = I and (NOT RLIKE('item, ^It) AND isnotnull('item))"
)
)
}
Expand Down Expand Up @@ -214,7 +214,7 @@ class StringRegexCheckSpec extends AnyFunSpec with Matchers with TestingSparkSes
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringRegexCheck failed! item = I and (NOT 'item RLIKE ^Item2 && isnotnull('item))"
"StringRegexCheck failed! item = I and (NOT RLIKE('item, ^Item2) AND isnotnull('item))"
)
)

Expand All @@ -223,7 +223,7 @@ class StringRegexCheckSpec extends AnyFunSpec with Matchers with TestingSparkSes
ValidatorQuickCheckError(
("item", "Item1") :: Nil,
"Item1",
"StringRegexCheck failed! item = Item1 and (NOT 'item RLIKE ^Item2 && isnotnull('item))"
"StringRegexCheck failed! item = Item1 and (NOT RLIKE('item, ^Item2) AND isnotnull('item))"
)
)
}
Expand Down Expand Up @@ -254,13 +254,13 @@ class StringRegexCheckSpec extends AnyFunSpec with Matchers with TestingSparkSes
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringRegexCheck failed! item = I and (NOT 'item RLIKE ^Item2 && isnotnull('item))"
"StringRegexCheck failed! item = I and (NOT RLIKE('item, ^Item2) AND isnotnull('item))"
)) ^
(sut.getEvents contains
ValidatorQuickCheckError(
("item", "Item1") :: Nil,
"Item1",
"StringRegexCheck failed! item = Item1 and (NOT 'item RLIKE ^Item2 && isnotnull('item))"
"StringRegexCheck failed! item = Item1 and (NOT RLIKE('item, ^Item2) AND isnotnull('item))"
))
)
}
Expand Down
Loading