Skip to content

Commit

Permalink
update default limit & comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie Wang committed Oct 3, 2023
1 parent 46a746c commit 7f126e3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
13 changes: 7 additions & 6 deletions spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,26 +267,27 @@ object JoinUtils {
}

/**
* Generate bloomfilter for joinPart if backfill row count is below specified threshold
* Generate a Bloom filter for 'joinPart' when the row count to be backfilled falls below a specified threshold.
* This method anticipates that there will likely be a substantial number of rows on the right side that need to be filtered out.
* @return bloomfilter map option for right part
*/

def genBloomFilterIfNeeded(leftDf: DataFrame,
joinPart: ai.chronon.api.JoinPart,
joinConf: ai.chronon.api.Join,
rowCount: Long,
leftRowCount: Long,
unfilledRange: PartitionRange,
tableUtils: TableUtils): Option[Map[String, BloomFilter]] = {
println(
s"\nRow count to be filled for ${joinPart.groupBy.metaData.name}. BloomFilter Threshold: ${tableUtils.bloomFilterThreshold}")

// apply bloom filter when row count is below threshold
if (rowCount > tableUtils.bloomFilterThreshold) {
// apply bloom filter when left row count is below threshold
if (leftRowCount > tableUtils.bloomFilterThreshold) {
println("Row count is above threshold. Skip gen bloom filter.")
Option.empty
} else {
val leftBlooms = joinConf.leftKeyCols.toSeq.map { key =>
key -> leftDf.generateBloomFilter(key, rowCount, joinConf.left.table, unfilledRange)
key -> leftDf.generateBloomFilter(key, leftRowCount, joinConf.left.table, unfilledRange)
}.toMap

val rightBloomMap = joinPart.rightToLeft.mapValues(leftBlooms(_)).toMap
Expand All @@ -298,7 +299,7 @@ object JoinUtils {
| right type: ${joinPart.groupBy.dataModel},
| accuracy : ${joinPart.groupBy.inferredAccuracy},
| part unfilled range: $unfilledRange,
| left row count: $rowCount
| left row count: $leftRowCount
| bloom sizes: $bloomSizes
| groupBy: ${joinPart.groupBy.toString}
|""".stripMargin)
Expand Down
6 changes: 3 additions & 3 deletions spark/src/main/scala/ai/chronon/spark/TableUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ case class TableUtils(sparkSession: SparkSession) {
val partitionSpec: PartitionSpec = PartitionSpec(partitionFormat, WindowUtils.Day.millis)
val backfillValidationEnforced = sparkSession.conf.get("spark.chronon.backfill.validation.enabled", "true").toBoolean
// Threshold to control whether or not to use bloomfilter on join backfill. If the row approximate count is under this threshold, we will use bloomfilter.
// We are choosing approximate count so that optimal number of bits is at-least 1G for default fpp of 0.01
val bloomFilterThreshold = sparkSession.conf.get("spark.chronon.backfill.bloomfilter.threshold", "800000000").toLong
// default threshold is 1 million rows
val bloomFilterThreshold = sparkSession.conf.get("spark.chronon.backfill.bloomfilter.threshold", "1000000").toLong

sparkSession.sparkContext.setLogLevel("ERROR")
// converts String-s like "a=b/c=d" to Map("a" -> "b", "c" -> "d")
Expand Down Expand Up @@ -175,7 +175,7 @@ case class TableUtils(sparkSession: SparkSession) {
case ex: Exception =>
println(s"[Error] Encountered exception when reading table: $tableName.")
ex.printStackTrace()
false
true
}
}

Expand Down

0 comments on commit 7f126e3

Please sign in to comment.