Skip to content

Commit

Permalink
[HUDI-8379] Expression Index should be used with the partition prunin…
Browse files Browse the repository at this point in the history
…g rule
  • Loading branch information
lokeshj1703 committed Dec 17, 2024
1 parent 704ccb2 commit 951ac48
Showing 1 changed file with 87 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import org.apache.hudi.util.JFunction

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand All @@ -45,8 +44,8 @@ import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
import java.util.stream.Collectors
import javax.annotation.concurrent.NotThreadSafe

import scala.collection.JavaConverters._
import scala.util.control.Breaks.{break, breakable}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -226,8 +225,13 @@ case class HoodieFileIndex(spark: SparkSession,
def filterFileSlices(dataFilters: Seq[Expression], partitionFilters: Seq[Expression], isPartitionPruned: Boolean = false)
: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {

val (isPruned, prunedPartitionsAndFileSlices) =
var (isPruned, prunedPartitionsAndFileSlices) =
prunePartitionsAndGetFileSlices(dataFilters, partitionFilters)
if (!isPruned && partitionFilters.nonEmpty && isExpressionIndexEnabled) {
// val expressionIndexSupport = getExpressionIndexSupport()
// lazy val queryReferencedColumns = collectReferencedColumns(spark, partitionFilters, schema)
prunedPartitionsAndFileSlices = filterFileSlices(partitionFilters, false, prunedPartitionsAndFileSlices)
}
hasPushedDownPartitionPredicates = true

// If there are no data filters, return all the file slices.
Expand All @@ -236,64 +240,59 @@ case class HoodieFileIndex(spark: SparkSession,
if (prunedPartitionsAndFileSlices.isEmpty || dataFilters.isEmpty || isPartitionPruned ) {
prunedPartitionsAndFileSlices
} else {
// Look up candidate files names in the col-stats or record level index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - Record-level Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, isPruned) match {
case Success(opt) => opt
case Failure(e) =>
logError("Failed to lookup candidate files in File Index", e)

spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match {
case DataSkippingFailureMode.Fallback.value => Option.empty
case DataSkippingFailureMode.Strict.value => throw new HoodieException(e);
}
}
filterFileSlices(dataFilters, isPruned, prunedPartitionsAndFileSlices)
}
}

private def filterFileSlices(dataFilters: Seq[Expression], isPruned: Boolean, prunedPartitionsAndFileSlices: Seq[(Option[PartitionPath], Seq[FileSlice])]) = {
// Look up candidate files names in the col-stats or record level index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - Record-level Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, isPruned)

logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")

var totalFileSliceSize = 0
var candidateFileSliceSize = 0
var totalFileSliceSize = 0
var candidateFileSliceSize = 0

val prunedPartitionsAndFilteredFileSlices = prunedPartitionsAndFileSlices.map {
case (partitionOpt, fileSlices) =>
// Filter in candidate files based on the col-stats or record level index lookup
val candidateFileSlices: Seq[FileSlice] = {
fileSlices.filter(fs => {
val prunedPartitionsAndFilteredFileSlices = prunedPartitionsAndFileSlices.map {
case (partitionOpt, fileSlices) =>
// Filter in candidate files based on the col-stats or record level index lookup
val candidateFileSlices: Seq[FileSlice] = {
fileSlices.filter(fs => {
if (candidateFilesNamesOpt.isDefined) {
// if any file in the file slice is part of candidate file names, we need to inclue the file slice.
// in other words, if all files in the file slice is not present in candidate file names, we can filter out the file slice.
val fileSliceFiles = fs.getLogFiles.map[String](JFunction.toJavaFunction[HoodieLogFile, String](lf => lf.getPath.getName))
.collect(Collectors.toSet[String])
val baseFileStatusOpt = getBaseFileInfo(Option.apply(fs.getBaseFile.orElse(null)))
baseFileStatusOpt.exists(f => fileSliceFiles.add(f.getPath.getName))
if (candidateFilesNamesOpt.isDefined) {
// if any file in the file slice is part of candidate file names, we need to inclue the file slice.
// in other words, if all files in the file slice is not present in candidate file names, we can filter out the file slice.
fileSliceFiles.stream().filter(fileSliceFile => candidateFilesNamesOpt.get.contains(fileSliceFile)).findAny().isPresent
} else {
true
}
})
}
fileSliceFiles.stream().filter(fileSliceFile => candidateFilesNamesOpt.get.contains(fileSliceFile)).findAny().isPresent
} else {
true
}
})
}

totalFileSliceSize += fileSlices.size
candidateFileSliceSize += candidateFileSlices.size
(partitionOpt, candidateFileSlices)
}
totalFileSliceSize += fileSlices.size
candidateFileSliceSize += candidateFileSlices.size
(partitionOpt, candidateFileSlices)
}

val skippingRatio =
if (!areAllFileSlicesCached) -1
else if (getAllFiles().nonEmpty && totalFileSliceSize > 0)
(totalFileSliceSize - candidateFileSliceSize) / totalFileSliceSize.toDouble
else 0
val skippingRatio =
if (!areAllFileSlicesCached) -1
else if (getAllFiles().nonEmpty && totalFileSliceSize > 0)
(totalFileSliceSize - candidateFileSliceSize) / totalFileSliceSize.toDouble
else 0

logInfo(s"Total file slices: $totalFileSliceSize; " +
s"candidate file slices after data skipping: $candidateFileSliceSize; " +
s"skipping percentage $skippingRatio")
logInfo(s"Total file slices: $totalFileSliceSize; " +
s"candidate file slices after data skipping: $candidateFileSliceSize; " +
s"skipping percentage $skippingRatio")

prunedPartitionsAndFilteredFileSlices
}
prunedPartitionsAndFilteredFileSlices
}

/**
Expand Down Expand Up @@ -346,6 +345,10 @@ case class HoodieFileIndex(spark: SparkSession,
(Option.apply(partition), fileSlices.asScala.map(f => f.withLogFiles(includeLogFiles)).toSeq) }).toSeq)
}

private def getExpressionIndexSupport(): ExpressionIndexSupport = {
indicesSupport.collectFirst({ case indexSupport: ExpressionIndexSupport => indexSupport }).get
}

/**
* In the fast bootstrap read code path, it gets the path info for the bootstrap base file instead of
* skeleton file. Returns path info for the base file if available.
Expand Down Expand Up @@ -379,31 +382,44 @@ case class HoodieFileIndex(spark: SparkSession,
// scalastyle:off return
private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression],
prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
shouldPushDownFilesFilter: Boolean): Try[Option[Set[String]]] = Try {
// NOTE: For column stats, Data Skipping is only effective when it references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case for "struct")
// - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
// For record index, Data Skipping is only effective when one of the query filter is of type EqualTo
// or IN query on simple record keys. In such a case the record index is used to filter the file slices
// and candidate files are obtained from these file slices.

lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
if (isDataSkippingEnabled) {
for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
if (indexSupport.isIndexAvailable && indexSupport.supportsQueryType(options)) {
val prunedFileNames = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns,
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
if (prunedFileNames.nonEmpty) {
return Try(prunedFileNames)
shouldPushDownFilesFilter: Boolean): Option[Set[String]] = {
Try {
// NOTE: For column stats, Data Skipping is only effective when it references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case for "struct")
// - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
// For record index, Data Skipping is only effective when one of the query filter is of type EqualTo
// or IN query on simple record keys. In such a case the record index is used to filter the file slices
// and candidate files are obtained from these file slices.
lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
var prunedFileNamesOpt: Option[Set[String]] = Option.empty
if (isDataSkippingEnabled) {
breakable {
for (indexSupport: SparkBaseIndexSupport <- indicesSupport) {
if (indexSupport.isIndexAvailable && indexSupport.supportsQueryType(options)) {
prunedFileNamesOpt = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns,
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
if (prunedFileNamesOpt.nonEmpty) {
break()
}
}
}
}
}
validateConfig()
prunedFileNamesOpt
} match {
case Success(opt) => opt
case Failure(e) =>
logError("Failed to lookup candidate files in File Index", e)

spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match {
case DataSkippingFailureMode.Fallback.value => Option.empty
case DataSkippingFailureMode.Strict.value => throw new HoodieException(e);
}
}
validateConfig()
Option.empty
}

override def refresh(): Unit = {
Expand Down

0 comments on commit 951ac48

Please sign in to comment.