diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index 08704184f..1dc474211 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -312,6 +312,10 @@ object Extensions { } } + def isCumulative: Boolean = { + if (source.isSetEntities) false else source.getEvents.isCumulative + } + def topic: String = { if (source.isSetEntities) source.getEntities.getMutationTopic else source.getEvents.getTopic } diff --git a/spark/src/main/scala/ai/chronon/spark/Analyzer.scala b/spark/src/main/scala/ai/chronon/spark/Analyzer.scala index 674a179c2..5893a7ec0 100644 --- a/spark/src/main/scala/ai/chronon/spark/Analyzer.scala +++ b/spark/src/main/scala/ai/chronon/spark/Analyzer.scala @@ -226,6 +226,8 @@ class Analyzer(tableUtils: TableUtils, val aggregationsMetadata = ListBuffer[AggregationMetadata]() val keysWithError: ListBuffer[(String, String)] = ListBuffer.empty[(String, String)] val gbTables = ListBuffer[String]() + // Pair of (table name, group_by name, expected_start) which indicate that the table no not have data available for the required group_by + val dataAvailabilityErrors: ListBuffer[(String, String, String)] = ListBuffer.empty[(String, String, String)] joinConf.joinParts.toScala.foreach { part => val (aggMetadata, gbKeySchema) = @@ -239,9 +241,9 @@ class Analyzer(tableUtils: TableUtils, part.getGroupBy.getMetaData.getName) } // Run validation checks. - // TODO: more validations on the way keysWithError ++= runSchemaValidation(leftSchema, gbKeySchema, part.rightToLeft) gbTables ++= part.groupBy.sources.toScala.map(_.table) + dataAvailabilityErrors ++= runDataAvailabilityCheck(part.groupBy, joinConf.left) } val noAccessTables = runTablePermissionValidation((gbTables.toList ++ List(joinConf.left.table)).toSet) @@ -272,12 +274,18 @@ class Analyzer(tableUtils: TableUtils, } else { println(s"----- Schema validation completed. Found ${keysWithError.size} errors") println(keysWithError.map { case (key, errorMsg) => s"$key => $errorMsg" }.mkString("\n")) - println(s"---- No permissions to access following ${noAccessTables.size} tables ----") + println(s"---- Table permission check completed. Found permission errors in ${noAccessTables.size} tables ----") println(noAccessTables.mkString("\n")) + println(s"---- Data availability check completed. Found issue in ${dataAvailabilityErrors.size} tables ----") + dataAvailabilityErrors.foreach(error => + println(s" Table ${error._1} : Group_By ${error._2}. Expected start ${error._3}")) } if (validationAssert) { - assert(keysWithError.isEmpty, "ERROR: Join validation failed. Please check error message for details.") + assert( + keysWithError.isEmpty && noAccessTables.isEmpty, + "ERROR: Join validation failed. Please check error message for details." + ) } // (schema map showing the names and datatypes, right side feature aggregations metadata for metadata upload) (leftSchema ++ rightSchema, aggregationsMetadata, statsSchema.unpack.toMap) @@ -306,7 +314,7 @@ class Analyzer(tableUtils: TableUtils, } } - // validate the table permissions for the sources of the group by + // validate the table permissions for given list of tables // return a list of tables that the user doesn't have access to def runTablePermissionValidation(sources: Set[String]): Set[String] = { println(s"Validating ${sources.size} tables permissions ...") @@ -317,6 +325,33 @@ class Analyzer(tableUtils: TableUtils, } } + // validate that data is available for the group by + // - For aggregation case, gb table earliest partition should go back to (left_start_partition - max_window) date + // - For none aggregation case or unbounded window, no earliest partition is required + // return a list of (table, gb_name, expected_start) that don't have data available + def runDataAvailabilityCheck(groupBy: api.GroupBy, leftSource: api.Source): List[(String, String, String)] = { + val leftStart = Option(leftSource.query.startPartition) + .getOrElse(tableUtils.firstAvailablePartition(leftSource.table, leftSource.subPartitionFilters).get) + lazy val groupByOps = new GroupByOps(groupBy) + val maxWindow = groupByOps.maxWindow + maxWindow match { + case Some(window) => + val expectedStart = tableUtils.partitionSpec.minus(leftStart, window) + groupBy.sources.toScala.flatMap { source => + val table = source.table + println(s"Checking table $table for data availability ... Expected start partition: $expectedStart") + //check if partition available or table is cumulative + if (!tableUtils.ifPartitionExistsInTable(table, expectedStart) && !source.isCumulative) { + Some((table, groupBy.getMetaData.getName, expectedStart)) + } else { + None + } + } + case None => + List.empty + } + } + def run(): Unit = conf match { case confPath: String => diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 01dc74b7f..f08f05ab0 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -153,7 +153,7 @@ case class TableUtils(sparkSession: SparkSession) { def checkTablePermission(tableName: String, fallbackPartition: String = partitionSpec.before(partitionSpec.at(System.currentTimeMillis()))): Boolean = { - println(s"checking permission for table $tableName...") + println(s"Checking permission for table $tableName...") try { // retrieve one row from the table val partitionFilter = lastAvailablePartition(tableName).getOrElse(fallbackPartition) @@ -181,6 +181,9 @@ case class TableUtils(sparkSession: SparkSession) { def firstAvailablePartition(tableName: String, subPartitionFilters: Map[String, String] = Map.empty): Option[String] = partitions(tableName, subPartitionFilters).reduceOption((x, y) => Ordering[String].min(x, y)) + def ifPartitionExistsInTable(tableName: String, partition: String): Boolean = + partitions(tableName).contains(partition) + def insertPartitions(df: DataFrame, tableName: String, tableProperties: Map[String, String] = null, diff --git a/spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala b/spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala index e98196931..66f49dfd0 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala @@ -87,6 +87,46 @@ class AnalyzerTest { analyzer.analyzeJoin(joinConf, validationAssert = true) } + @Test + def testJoinAnalyzerValidationDataAvailability(): Unit = { + // left side + val itemQueries = List(Column("item", api.StringType, 100), Column("guest", api.StringType, 100)) + val itemQueriesTable = s"$namespace.item_queries_with_user_table" + DataFrameGen + .events(spark, itemQueries, 500, partitions = 100) + .save(itemQueriesTable) + + val start = tableUtils.partitionSpec.minus(today, new Window(90, TimeUnit.DAYS)) + + val viewsGroupBy = Builders.GroupBy( + sources = Seq(viewsSource), + keyColumns = Seq("item_id"), + aggregations = Seq( + Builders.Aggregation( windows = Seq(new Window(365, TimeUnit.DAYS)), // greater than one year + operation = Operation.AVERAGE, + inputColumn = "time_spent_ms") + ), + metaData = Builders.MetaData(name = "join_analyzer_test.item_data_avail_gb", namespace = namespace), + accuracy = Accuracy.SNAPSHOT + ) + + val joinConf = Builders.Join( + left = Builders.Source.events(Builders.Query(startPartition = start), table = itemQueriesTable), + joinParts = Seq( + Builders.JoinPart(groupBy = viewsGroupBy, prefix = "validation", keyMapping = Map("item" -> "item_id")) + ), + metaData = Builders.MetaData(name = "test_join_analyzer.item_validation", namespace = namespace, team = "chronon") + ) + + //run analyzer and validate data availability + val analyzer = new Analyzer(tableUtils, joinConf, oneMonthAgo, today, enableHitter = true) + analyzer.analyzeJoin(joinConf, validationAssert = true) + + val result = analyzer.runDataAvailabilityCheck(viewsGroupBy, joinConf.left) + println(result) + assertTrue(result.size == 1) + } + def getTestGBSource(): api.Source = { val viewsSchema = List( Column("user", api.StringType, 10000), @@ -95,7 +135,7 @@ class AnalyzerTest { Column("user_review", api.LongType, 5000) ) - val viewsTable = s"$namespace.view_events_gb_table" + val viewsTable = s"$namespace.view_events_gb_table_2" DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable) Builders.Source.events( @@ -111,7 +151,7 @@ class AnalyzerTest { Column("time_spent_ms", api.LongType, 5000) ) - val viewsTable = s"$namespace.view_events_source_table" + val viewsTable = s"$namespace.view_events_gb_table" DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable) Builders.Source.events( diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index 21d2c4554..0534e6e38 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -8,7 +8,7 @@ import ai.chronon.online.SparkConversions import ai.chronon.spark.{IncompatibleSchemaException, PartitionRange, SparkSessionBuilder, TableUtils} import org.apache.spark.sql.functions.col import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} -import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.junit.Test import scala.util.Try @@ -375,4 +375,13 @@ class TableUtilsTest { prepareTestDataWithSubPartitions(tableName) assertTrue(tableUtils.checkTablePermission(tableName)) } + + @Test + def testIfPartitionExistsInTable(): Unit = { + val tableName = "db.test_if_partition_exists" + prepareTestDataWithSubPartitions(tableName) + assertTrue(tableUtils.ifPartitionExistsInTable(tableName, "2022-11-03")) + assertFalse(tableUtils.ifPartitionExistsInTable(tableName, "2023-01-01")) + } + }