Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8379] Expression Index should be used with the partition pruning rule #12509

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import org.apache.hudi.util.JFunction

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand All @@ -45,8 +44,8 @@ import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
import java.util.stream.Collectors
import javax.annotation.concurrent.NotThreadSafe

import scala.collection.JavaConverters._
import scala.util.control.Breaks.{break, breakable}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -226,8 +225,13 @@ case class HoodieFileIndex(spark: SparkSession,
def filterFileSlices(dataFilters: Seq[Expression], partitionFilters: Seq[Expression], isPartitionPruned: Boolean = false)
: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {

val (isPruned, prunedPartitionsAndFileSlices) =
var (isPruned, prunedPartitionsAndFileSlices) =
prunePartitionsAndGetFileSlices(dataFilters, partitionFilters)
if (!isPruned && partitionFilters.nonEmpty && isExpressionIndexEnabled) {
// val expressionIndexSupport = getExpressionIndexSupport()
// lazy val queryReferencedColumns = collectReferencedColumns(spark, partitionFilters, schema)
prunedPartitionsAndFileSlices = filterFileSlices(partitionFilters, false, prunedPartitionsAndFileSlices)
}
hasPushedDownPartitionPredicates = true

// If there are no data filters, return all the file slices.
Expand All @@ -236,64 +240,59 @@ case class HoodieFileIndex(spark: SparkSession,
if (prunedPartitionsAndFileSlices.isEmpty || dataFilters.isEmpty || isPartitionPruned ) {
prunedPartitionsAndFileSlices
} else {
// Look up candidate files names in the col-stats or record level index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - Record-level Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, isPruned) match {
case Success(opt) => opt
case Failure(e) =>
logError("Failed to lookup candidate files in File Index", e)

spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match {
case DataSkippingFailureMode.Fallback.value => Option.empty
case DataSkippingFailureMode.Strict.value => throw new HoodieException(e);
}
}
filterFileSlices(dataFilters, isPruned, prunedPartitionsAndFileSlices)
}
}

private def filterFileSlices(dataFilters: Seq[Expression], isPruned: Boolean, prunedPartitionsAndFileSlices: Seq[(Option[PartitionPath], Seq[FileSlice])]) = {
// Look up candidate files names in the col-stats or record level index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - Record-level Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, isPruned)

logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")

var totalFileSliceSize = 0
var candidateFileSliceSize = 0
var totalFileSliceSize = 0
var candidateFileSliceSize = 0

val prunedPartitionsAndFilteredFileSlices = prunedPartitionsAndFileSlices.map {
case (partitionOpt, fileSlices) =>
// Filter in candidate files based on the col-stats or record level index lookup
val candidateFileSlices: Seq[FileSlice] = {
fileSlices.filter(fs => {
val prunedPartitionsAndFilteredFileSlices = prunedPartitionsAndFileSlices.map {
case (partitionOpt, fileSlices) =>
// Filter in candidate files based on the col-stats or record level index lookup
val candidateFileSlices: Seq[FileSlice] = {
fileSlices.filter(fs => {
if (candidateFilesNamesOpt.isDefined) {
// if any file in the file slice is part of candidate file names, we need to inclue the file slice.
// in other words, if all files in the file slice is not present in candidate file names, we can filter out the file slice.
val fileSliceFiles = fs.getLogFiles.map[String](JFunction.toJavaFunction[HoodieLogFile, String](lf => lf.getPath.getName))
.collect(Collectors.toSet[String])
val baseFileStatusOpt = getBaseFileInfo(Option.apply(fs.getBaseFile.orElse(null)))
baseFileStatusOpt.exists(f => fileSliceFiles.add(f.getPath.getName))
if (candidateFilesNamesOpt.isDefined) {
// if any file in the file slice is part of candidate file names, we need to inclue the file slice.
// in other words, if all files in the file slice is not present in candidate file names, we can filter out the file slice.
fileSliceFiles.stream().filter(fileSliceFile => candidateFilesNamesOpt.get.contains(fileSliceFile)).findAny().isPresent
} else {
true
}
})
}
fileSliceFiles.stream().filter(fileSliceFile => candidateFilesNamesOpt.get.contains(fileSliceFile)).findAny().isPresent
} else {
true
}
})
}

totalFileSliceSize += fileSlices.size
candidateFileSliceSize += candidateFileSlices.size
(partitionOpt, candidateFileSlices)
}
totalFileSliceSize += fileSlices.size
candidateFileSliceSize += candidateFileSlices.size
(partitionOpt, candidateFileSlices)
}

val skippingRatio =
if (!areAllFileSlicesCached) -1
else if (getAllFiles().nonEmpty && totalFileSliceSize > 0)
(totalFileSliceSize - candidateFileSliceSize) / totalFileSliceSize.toDouble
else 0
val skippingRatio =
if (!areAllFileSlicesCached) -1
else if (getAllFiles().nonEmpty && totalFileSliceSize > 0)
(totalFileSliceSize - candidateFileSliceSize) / totalFileSliceSize.toDouble
else 0

logInfo(s"Total file slices: $totalFileSliceSize; " +
s"candidate file slices after data skipping: $candidateFileSliceSize; " +
s"skipping percentage $skippingRatio")
logInfo(s"Total file slices: $totalFileSliceSize; " +
s"candidate file slices after data skipping: $candidateFileSliceSize; " +
s"skipping percentage $skippingRatio")

prunedPartitionsAndFilteredFileSlices
}
prunedPartitionsAndFilteredFileSlices
}

/**
Expand Down Expand Up @@ -346,6 +345,10 @@ case class HoodieFileIndex(spark: SparkSession,
(Option.apply(partition), fileSlices.asScala.map(f => f.withLogFiles(includeLogFiles)).toSeq) }).toSeq)
}

private def getExpressionIndexSupport(): ExpressionIndexSupport = {
indicesSupport.collectFirst({ case indexSupport: ExpressionIndexSupport => indexSupport }).get
}

/**
* In the fast bootstrap read code path, it gets the path info for the bootstrap base file instead of
* skeleton file. Returns path info for the base file if available.
Expand Down Expand Up @@ -379,31 +382,44 @@ case class HoodieFileIndex(spark: SparkSession,
// scalastyle:off return
private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression],
prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
shouldPushDownFilesFilter: Boolean): Try[Option[Set[String]]] = Try {
// NOTE: For column stats, Data Skipping is only effective when it references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case for "struct")
// - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
// For record index, Data Skipping is only effective when one of the query filter is of type EqualTo
// or IN query on simple record keys. In such a case the record index is used to filter the file slices
// and candidate files are obtained from these file slices.

lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
if (isDataSkippingEnabled) {
for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
if (indexSupport.isIndexAvailable && indexSupport.supportsQueryType(options)) {
val prunedFileNames = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns,
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
if (prunedFileNames.nonEmpty) {
return Try(prunedFileNames)
shouldPushDownFilesFilter: Boolean): Option[Set[String]] = {
Try {
// NOTE: For column stats, Data Skipping is only effective when it references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case for "struct")
// - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
// For record index, Data Skipping is only effective when one of the query filter is of type EqualTo
// or IN query on simple record keys. In such a case the record index is used to filter the file slices
// and candidate files are obtained from these file slices.
lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
var prunedFileNamesOpt: Option[Set[String]] = Option.empty
if (isDataSkippingEnabled) {
breakable {
for (indexSupport: SparkBaseIndexSupport <- indicesSupport) {
if (indexSupport.isIndexAvailable && indexSupport.supportsQueryType(options)) {
prunedFileNamesOpt = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns,
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
if (prunedFileNamesOpt.nonEmpty) {
break()
}
}
}
}
}
validateConfig()
prunedFileNamesOpt
} match {
case Success(opt) => opt
case Failure(e) =>
logError("Failed to lookup candidate files in File Index", e)

spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match {
case DataSkippingFailureMode.Fallback.value => Option.empty
case DataSkippingFailureMode.Strict.value => throw new HoodieException(e);
}
}
validateConfig()
Option.empty
}

override def refresh(): Unit = {
Expand Down
Loading