-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Column Stats #42
Column Stats #42
Conversation
see #44 |
…ent ColStats as TwoPassCheapCheck
|
@c-horn and @phpisciuneri What's the status of this? |
Waiting on reviews @JayGaynor |
@JayGaynor Yep, waiting on me. Sorry @c-horn. I've been messing around with hive3. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I touch a lot on bins in these comments, but it looks like they're likely already to be in order. The question becomes this: do we want to guarantee order contractually? ¯\_(ツ)_/¯
I'm gonna run it when I get a chance. I'll ✅ after I've done that.
@@ -0,0 +1,3 @@ | |||
package com.target.data_validator.stats | |||
|
|||
case class Histogram(bins: Seq[Bin]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought: Should the collection passed into this be sorted going in or upon use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If bins are non-overlapping, should this be a sorted collection? SortedSet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering we have no fine grain control over Spark's serialziation and deserialization of these types (to and from internal Row, and network formats like Kryo) I would say no.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can get it to compile with this:
diff --git a/src/main/scala/com/target/data_validator/stats/Bin.scala b/src/main/scala/com/target/data_validator/stats/Bin.scala
index e392153..fec308c 100644
--- a/src/main/scala/com/target/data_validator/stats/Bin.scala
+++ b/src/main/scala/com/target/data_validator/stats/Bin.scala
@@ -1,3 +1,7 @@
package com.target.data_validator.stats
case class Bin(lowerBound: Double, upperBound: Double, count: Long)
+
+object Bin {
+ implicit val ordering = Ordering by { (_: Bin).lowerBound }
+}
diff --git a/src/main/scala/com/target/data_validator/stats/Histogram.scala b/src/main/scala/com/target/data_validator/stats/Histogram.scala
index 52a194b..4b09898 100644
--- a/src/main/scala/com/target/data_validator/stats/Histogram.scala
+++ b/src/main/scala/com/target/data_validator/stats/Histogram.scala
@@ -1,3 +1,5 @@
package com.target.data_validator.stats
-case class Histogram(bins: Seq[Bin])
+import scala.collection.SortedSet
+
+case class Histogram(bins: SortedSet[Bin])
diff --git a/src/main/scala/com/target/data_validator/stats/SecondPassStats.scala b/src/main/scala/com/target/data_validator/stats/SecondPassStats.scala
index 33ab191..b736919 100644
--- a/src/main/scala/com/target/data_validator/stats/SecondPassStats.scala
+++ b/src/main/scala/com/target/data_validator/stats/SecondPassStats.scala
@@ -4,6 +4,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.DataType
+import scala.collection.SortedSet
+
case class SecondPassStats(stdDev: Double, histogram: Histogram)
object SecondPassStats {
@@ -21,13 +23,13 @@ object SecondPassStats {
SecondPassStats(
stdDev = row.getDouble(0),
histogram = Histogram(
- row.getStruct(1).getSeq[Row](0) map {
+ row.getStruct(1).getSeq[Row](0).map {
bin => Bin(
lowerBound = bin.getDouble(0),
upperBound = bin.getDouble(1),
count = bin.getLong(2)
)
- }
+ } (collection.breakOut)
)
)
}
diff --git a/src/main/scala/com/target/data_validator/stats/SecondPassStatsAggregator.scala b/src/main/scala/com/target/data_validator/stats/SecondPassStatsAggregator.scala
index 9c6e474..9567304 100644
--- a/src/main/scala/com/target/data_validator/stats/SecondPassStatsAggregator.scala
+++ b/src/main/scala/com/target/data_validator/stats/SecondPassStatsAggregator.scala
@@ -4,6 +4,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
+import scala.collection.SortedSet
+
/**
* Calculate the standard deviation and histogram of a numeric column
*/
@@ -100,7 +102,7 @@ class SecondPassStatsAggregator(firstPassStats: FirstPassStats) extends UserDefi
}
SecondPassStats(
math.sqrt(buffer.getDouble(sumOfSquares) / (buffer.getLong(count) - 1)),
- Histogram(bins)
+ Histogram(bins.map(x => x)(collection.breakOut))
)
}
diff --git a/src/test/scala/com/target/data_validator/stats/NumericData.scala b/src/test/scala/com/target/data_validator/stats/NumericData.scala
index 5209952..31559c7 100644
--- a/src/test/scala/com/target/data_validator/stats/NumericData.scala
+++ b/src/test/scala/com/target/data_validator/stats/NumericData.scala
@@ -1,5 +1,7 @@
package com.target.data_validator.stats
+import scala.collection.SortedSet
+
case class NumericData(value1: Double)
object NumericData {
@@ -22,7 +24,7 @@ object NumericData {
val secondPassStats = SecondPassStats(
3.0276503540974917,
Histogram(
- Seq(
+ SortedSet(
Bin(0.0, 0.9, 1),
Bin(0.9, 1.8, 1),
Bin(1.8, 2.7, 1),
diff --git a/src/test/scala/com/target/data_validator/validator/ColStatsSpec.scala b/src/test/scala/com/target/data_validator/validator/ColStatsSpec.scala
index fe58a8e..da48836 100644
--- a/src/test/scala/com/target/data_validator/validator/ColStatsSpec.scala
+++ b/src/test/scala/com/target/data_validator/validator/ColStatsSpec.scala
@@ -6,6 +6,8 @@ import com.target.data_validator.stats._
import io.circe.Json
import org.scalatest._
+import scala.collection.SortedSet
+
// scalastyle:off magic.number
class ColStatsSpec extends FunSpec with Matchers with TestingSparkSession {
import spark.implicits._
@@ -86,7 +88,7 @@ object ColStatsSpec {
21.0,
5.916079783099616,
Histogram(
- Seq(
+ SortedSet(
Bin(2.0, 3.9, 2),
Bin(3.9, 5.8, 2),
Bin(5.8, 7.699999999999999, 2),
@@ -109,7 +111,7 @@ object ColStatsSpec {
0.958,
0.26725316654123255,
Histogram(
- Seq(
+ SortedSet(
Bin(0.0536, 0.14404, 3),
Bin(0.14404, 0.23448, 3),
Bin(0.23448, 0.32492, 2),
But spark doesn't like non-"Seq" collections, and I assume circe might have similar issues:
[info] - should correctly calculate the standard deviation and histogram *** FAILED ***
[info] java.lang.UnsupportedOperationException: Schema for type scala.collection.SortedSet[com.target.data_validator.stats.Bin] is not supported
[info] at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:780)
[info] at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:715)
[info] at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, OK, that settles it then. Ordering is an exercise left to the consumer! It might be wise to say that in the docs (cc @phpisciuneri)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I wasn't too concerned about this. We can revisit if it is a problem. The JSON output really isn't intended to be read by eyeballs, instead it should be used to plot the histogram. In which case the bin boundaries and counts should be sufficient, no matter the order.
*/ | ||
class SecondPassStatsAggregator(firstPassStats: FirstPassStats) extends UserDefinedAggregateFunction { | ||
|
||
val NUMBER_OF_BINS = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to make this configurable?
val NUMBER_OF_BINS = 10 | |
final val NUMBER_OF_BINS = 10 |
* evaluate the standard deviation and define bins of histogram | ||
*/ | ||
override def evaluate(buffer: Row): Any = { | ||
val bins: Seq[Bin] = for (i <- binStart to binEnd) yield { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Bins will already be in order de facto. Maybe we don't need to explicitly sort or guarantee sort order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is any guarantee of a specific order when Spark serializes/deserializes from row format (DataFrame -> Dataset).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually there probably is consistency with columns containing arrays, but we definitely can't guarantee it with a type like SortedSet
. The aggregator will always output them low to high.
I think the fact that Histogram
makes no guarantees that the bins are continuous and do not overlap is a larger "logical hole" than the bin sorting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@c-horn yeah it might make sense to guard against overlap. Gaps are less concerning as they can be interpreted as empty bins, but maybe we should check that the bins have equal width? It doesn't have to be a part of this PR, we can create an issue and do separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice implementation @c-horn. The only thing I would add is some doc on usage in the README. I can do that if you'd like.
src/main/scala/com/target/data_validator/stats/CompleteStats.scala
Outdated
Show resolved
Hide resolved
* evaluate the standard deviation and define bins of histogram | ||
*/ | ||
override def evaluate(buffer: Row): Any = { | ||
val bins: Seq[Bin] = for (i <- binStart to binEnd) yield { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@c-horn yeah it might make sense to guard against overlap. Gaps are less concerning as they can be interpreted as empty bins, but maybe we should check that the bins have equal width? It doesn't have to be a part of this PR, we can create an issue and do separately.
…cala Co-authored-by: phpisciuneri <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to go for now, needs docs in this PR or in a quick follow up before a release.
No description provided.