Skip to content

Commit

Permalink
Backfill validation: Add data availability check (#526)
Browse files Browse the repository at this point in the history
* Backfill validation: Add data availability check

* update

* update

---------

Co-authored-by: Sophie Wang <[email protected]>
  • Loading branch information
SophieYu41 and Sophie Wang authored Jul 24, 2023
1 parent 4f4a6d8 commit 45b9cae
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 8 deletions.
4 changes: 4 additions & 0 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ object Extensions {
}
}

def isCumulative: Boolean = {
if (source.isSetEntities) false else source.getEvents.isCumulative
}

def topic: String = {
if (source.isSetEntities) source.getEntities.getMutationTopic else source.getEvents.getTopic
}
Expand Down
43 changes: 39 additions & 4 deletions spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ class Analyzer(tableUtils: TableUtils,
val aggregationsMetadata = ListBuffer[AggregationMetadata]()
val keysWithError: ListBuffer[(String, String)] = ListBuffer.empty[(String, String)]
val gbTables = ListBuffer[String]()
// Pair of (table name, group_by name, expected_start) which indicate that the table no not have data available for the required group_by
val dataAvailabilityErrors: ListBuffer[(String, String, String)] = ListBuffer.empty[(String, String, String)]

joinConf.joinParts.toScala.foreach { part =>
val (aggMetadata, gbKeySchema) =
Expand All @@ -239,9 +241,9 @@ class Analyzer(tableUtils: TableUtils,
part.getGroupBy.getMetaData.getName)
}
// Run validation checks.
// TODO: more validations on the way
keysWithError ++= runSchemaValidation(leftSchema, gbKeySchema, part.rightToLeft)
gbTables ++= part.groupBy.sources.toScala.map(_.table)
dataAvailabilityErrors ++= runDataAvailabilityCheck(part.groupBy, joinConf.left)
}
val noAccessTables = runTablePermissionValidation((gbTables.toList ++ List(joinConf.left.table)).toSet)

Expand Down Expand Up @@ -272,12 +274,18 @@ class Analyzer(tableUtils: TableUtils,
} else {
println(s"----- Schema validation completed. Found ${keysWithError.size} errors")
println(keysWithError.map { case (key, errorMsg) => s"$key => $errorMsg" }.mkString("\n"))
println(s"---- No permissions to access following ${noAccessTables.size} tables ----")
println(s"---- Table permission check completed. Found permission errors in ${noAccessTables.size} tables ----")
println(noAccessTables.mkString("\n"))
println(s"---- Data availability check completed. Found issue in ${dataAvailabilityErrors.size} tables ----")
dataAvailabilityErrors.foreach(error =>
println(s" Table ${error._1} : Group_By ${error._2}. Expected start ${error._3}"))
}

if (validationAssert) {
assert(keysWithError.isEmpty, "ERROR: Join validation failed. Please check error message for details.")
assert(
keysWithError.isEmpty && noAccessTables.isEmpty,
"ERROR: Join validation failed. Please check error message for details."
)
}
// (schema map showing the names and datatypes, right side feature aggregations metadata for metadata upload)
(leftSchema ++ rightSchema, aggregationsMetadata, statsSchema.unpack.toMap)
Expand Down Expand Up @@ -306,7 +314,7 @@ class Analyzer(tableUtils: TableUtils,
}
}

// validate the table permissions for the sources of the group by
// validate the table permissions for given list of tables
// return a list of tables that the user doesn't have access to
def runTablePermissionValidation(sources: Set[String]): Set[String] = {
println(s"Validating ${sources.size} tables permissions ...")
Expand All @@ -317,6 +325,33 @@ class Analyzer(tableUtils: TableUtils,
}
}

// validate that data is available for the group by
// - For aggregation case, gb table earliest partition should go back to (left_start_partition - max_window) date
// - For none aggregation case or unbounded window, no earliest partition is required
// return a list of (table, gb_name, expected_start) that don't have data available
def runDataAvailabilityCheck(groupBy: api.GroupBy, leftSource: api.Source): List[(String, String, String)] = {
val leftStart = Option(leftSource.query.startPartition)
.getOrElse(tableUtils.firstAvailablePartition(leftSource.table, leftSource.subPartitionFilters).get)
lazy val groupByOps = new GroupByOps(groupBy)
val maxWindow = groupByOps.maxWindow
maxWindow match {
case Some(window) =>
val expectedStart = tableUtils.partitionSpec.minus(leftStart, window)
groupBy.sources.toScala.flatMap { source =>
val table = source.table
println(s"Checking table $table for data availability ... Expected start partition: $expectedStart")
//check if partition available or table is cumulative
if (!tableUtils.ifPartitionExistsInTable(table, expectedStart) && !source.isCumulative) {
Some((table, groupBy.getMetaData.getName, expectedStart))
} else {
None
}
}
case None =>
List.empty
}
}

def run(): Unit =
conf match {
case confPath: String =>
Expand Down
5 changes: 4 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/TableUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ case class TableUtils(sparkSession: SparkSession) {
def checkTablePermission(tableName: String,
fallbackPartition: String =
partitionSpec.before(partitionSpec.at(System.currentTimeMillis()))): Boolean = {
println(s"checking permission for table $tableName...")
println(s"Checking permission for table $tableName...")
try {
// retrieve one row from the table
val partitionFilter = lastAvailablePartition(tableName).getOrElse(fallbackPartition)
Expand Down Expand Up @@ -181,6 +181,9 @@ case class TableUtils(sparkSession: SparkSession) {
def firstAvailablePartition(tableName: String, subPartitionFilters: Map[String, String] = Map.empty): Option[String] =
partitions(tableName, subPartitionFilters).reduceOption((x, y) => Ordering[String].min(x, y))

def ifPartitionExistsInTable(tableName: String, partition: String): Boolean =
partitions(tableName).contains(partition)

def insertPartitions(df: DataFrame,
tableName: String,
tableProperties: Map[String, String] = null,
Expand Down
44 changes: 42 additions & 2 deletions spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,46 @@ class AnalyzerTest {
analyzer.analyzeJoin(joinConf, validationAssert = true)
}

@Test
def testJoinAnalyzerValidationDataAvailability(): Unit = {
// left side
val itemQueries = List(Column("item", api.StringType, 100), Column("guest", api.StringType, 100))
val itemQueriesTable = s"$namespace.item_queries_with_user_table"
DataFrameGen
.events(spark, itemQueries, 500, partitions = 100)
.save(itemQueriesTable)

val start = tableUtils.partitionSpec.minus(today, new Window(90, TimeUnit.DAYS))

val viewsGroupBy = Builders.GroupBy(
sources = Seq(viewsSource),
keyColumns = Seq("item_id"),
aggregations = Seq(
Builders.Aggregation( windows = Seq(new Window(365, TimeUnit.DAYS)), // greater than one year
operation = Operation.AVERAGE,
inputColumn = "time_spent_ms")
),
metaData = Builders.MetaData(name = "join_analyzer_test.item_data_avail_gb", namespace = namespace),
accuracy = Accuracy.SNAPSHOT
)

val joinConf = Builders.Join(
left = Builders.Source.events(Builders.Query(startPartition = start), table = itemQueriesTable),
joinParts = Seq(
Builders.JoinPart(groupBy = viewsGroupBy, prefix = "validation", keyMapping = Map("item" -> "item_id"))
),
metaData = Builders.MetaData(name = "test_join_analyzer.item_validation", namespace = namespace, team = "chronon")
)

//run analyzer and validate data availability
val analyzer = new Analyzer(tableUtils, joinConf, oneMonthAgo, today, enableHitter = true)
analyzer.analyzeJoin(joinConf, validationAssert = true)

val result = analyzer.runDataAvailabilityCheck(viewsGroupBy, joinConf.left)
println(result)
assertTrue(result.size == 1)
}

def getTestGBSource(): api.Source = {
val viewsSchema = List(
Column("user", api.StringType, 10000),
Expand All @@ -95,7 +135,7 @@ class AnalyzerTest {
Column("user_review", api.LongType, 5000)
)

val viewsTable = s"$namespace.view_events_gb_table"
val viewsTable = s"$namespace.view_events_gb_table_2"
DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable)

Builders.Source.events(
Expand All @@ -111,7 +151,7 @@ class AnalyzerTest {
Column("time_spent_ms", api.LongType, 5000)
)

val viewsTable = s"$namespace.view_events_source_table"
val viewsTable = s"$namespace.view_events_gb_table"
DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable)

Builders.Source.events(
Expand Down
11 changes: 10 additions & 1 deletion spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ai.chronon.online.SparkConversions
import ai.chronon.spark.{IncompatibleSchemaException, PartitionRange, SparkSessionBuilder, TableUtils}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.Test

import scala.util.Try
Expand Down Expand Up @@ -375,4 +375,13 @@ class TableUtilsTest {
prepareTestDataWithSubPartitions(tableName)
assertTrue(tableUtils.checkTablePermission(tableName))
}

@Test
def testIfPartitionExistsInTable(): Unit = {
val tableName = "db.test_if_partition_exists"
prepareTestDataWithSubPartitions(tableName)
assertTrue(tableUtils.ifPartitionExistsInTable(tableName, "2022-11-03"))
assertFalse(tableUtils.ifPartitionExistsInTable(tableName, "2023-01-01"))
}

}

0 comments on commit 45b9cae

Please sign in to comment.